JMS Consumer

Use the JMS Consumer step to receive streaming data from an Apache ActiveMQ Java Messaging Service (JMS) server or IBM MQ middleware.

The parent JMS Consumer step runs a child transformation that executes according to the message batch size or duration, letting you process a continuous stream of records in near real time. The child transformation must start with the Get records from stream step.

In the JMS Consumer step, you can define how many messages to accept for processing, as well as the data formats to stream activity data and system metrics. You can also select a step in the child transformation to stream records back to the parent transformation, which passes records downstream to other steps in the parent transformation.

circle-info

Because the JMS Consumer step continuously ingests streaming data, you may want to use the Abort step in either the parent or child transformation to stop consuming records for specific workflows.

Before you begin

Before you use the JMS Consumer step:

  • Be familiar with JMS messaging.

  • Have a message broker available, such as Apache ActiveMQ Artemis or IBM MQ.

  • This step supports JMS 2.0 and requires Apache ActiveMQ Artemisarrow-up-right.

  • If you need JMS 1.1 with ActiveMQ or Artemis, use the previous versions of the JMS Consumer and JMS Producer steps (Pentaho 8.1 and earlier).

IBM MQ client JARs

If you use IBM MQ, place IBM MQ client JARs in:

  • PDI client: data-integration/system/karaf/deploy

  • Pentaho Server: server/pentaho-server/pentaho-solutions/system/karaf/deploy

You can get the WebSphere MQ classes for JMS Java libraries from your IBM WebSphere MQ installation. You can also find these libraries in your IBM WebSphere MQ Client SupportPacarrow-up-right.

The WebSphere MQ Java libraries version the PDI plugin steps were built against is 9.0.0.3. The libraries you must provide are:

  • com.ibm.mq.osgi.allclientprereqs_9.0.0.3.jar

  • com.ibm.mq.osgi.allclient_9.0.0.3.jar

Because IBM licensing prevents Pentaho from distributing these libraries directly, you must add them to your environment.

Additional JMS library JARs

Place JMS library JARs for your ConnectionFactory and other supporting classes in:

  • PDI client: data-integration/system/karaf/deploy

  • Pentaho Server: server/pentaho-server/pentaho-solutions/system/karaf/deploy

General

The JMS Consumer step requires definitions for setup, batch, field, and result fields to stream messages.

JMS Consumer dialog box
  • Step name: Name of the step on the canvas. Default: JMS Consumer.

  • Transformation: Child transformation to execute.

    You can:

    • Enter the transformation path.

    • Select Browse to select an existing child transformation.

    • Select New to create and save a new child transformation.

    The selected child transformation must start with the Get records from stream step.

    If the selected transformation shares the same root path as the current transformation, PDI automatically inserts ${Internal.Entry.Current.Directory} in place of the common root path.

    Example:

    • Current transformation: /home/admin/transformation.ktr

    • Selected transformation: /home/admin/path/sub.ktr

    • Converted path: ${Internal.Entry.Current.Directory}/path/sub.ktr

    If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the JMS Consumer step.

When you select New, PDI generates the required Get records from stream step in a new canvas tab. The step’s fields and types match the fields and types you specify on the Fields tab of the parent JMS Consumer step.

  1. In the JMS Consumer step, select New.

  2. In the Save As dialog box, select a location and enter a file name.

  3. Select Save.

  4. Select the new transformation tab to view and edit the child transformation.

  5. When finished, return to the JMS Consumer step.

Options

The JMS Consumer step includes the following tabs: Setup, Security, Batch, Fields, and Result fields.

Setup tab

Define the connections used for receiving messages, topics, and queues.

  • IBM MQ: Select if you are using IBM MQ as your message broker.

  • ActiveMQ: Select if you are using Apache ActiveMQ Artemis (JMS 2.0) as your message broker.

  • JMS URL: Broker URL for the selected connection type.

  • Destination type: Delivery model to use:

    • Topic: Publish-and-subscribe model. One message can be delivered to multiple consumers.

    • Queue: Point-to-point model. One message is delivered from a single producer to a single consumer.

  • Destination name: Name of the topic or queue.

  • Receive timeout: Time to wait for incoming messages, in milliseconds. A timeout of 0 never expires.

Security tab

Security tab in JMS Consumer

Authentication:

  • Username: User name required to access the ActiveMQ or IBM MQ server.

  • Password: Password associated with the user name.

  • Use secure protocol: Select to secure the message stream with SSL. You can adjust settings in the SSL properties table.

SSL properties:

Name
Value

Ciphersuite

CipherSuite name. Values depend on the provider. For details, see Sun Providersarrow-up-right.

Context Algorithm

Name of the secure protocol you are using.

FIPS required

IBM MQ only. Set to True to enable FIPS support. Set to False if FIPS is not required.

Key Store Password

Password for the key store.

Key Store Path

File path to the key store.

Key Store Type

IBM MQ only. Key store format.

SSL Provider

ActiveMQ only. SSL implementation: JDK or OpenSSL. If you select OpenSSL, you must provide the OpenSSL libraries. For details, see the ActiveMQ documentation: Configuring transportsarrow-up-right.

Trust Store Password

IBM MQ only. Password for the trust store.

Trust All

ActiveMQ only. Set to True to trust all certificates without validation (not recommended). Set to False (recommended).

Trust Store Path

File path to the trust store certificates.

Trust Store Type

IBM MQ only. Trust store format.

Verify Host

ActiveMQ only. Set to True to verify the host server name matches its certificate. Set to False to omit host verification.

Batch tab

Batch tab in JMS Consumer

Specify how many messages to consume before processing. Messages are consumed when either Duration (ms) or Number of records is reached.

If you set either value to 0, PDI ignores that setting. To run the transformation, either Duration (ms) or Number of records must be greater than 0.

  • Duration (ms): Amount of time (in milliseconds) to collect records before executing the child transformation.

circle-info

You must set this field if you use Spark as your processing engine.

  • Number of records: Number of records to collect before executing the child transformation.

  • Maximum concurrent batches: Maximum number of batches to collect records at the same time. Default: 1.

    Use this option only when your consumer step cannot keep pace with the stream rate and your environment has adequate CPU and memory.

  • Message prefetch limit: Limit for the number of incoming messages to queue for processing. Default: 100000.

Fields tab

Fields tab in JMS Consumer

Define the fields in the record format.

  • Input name: Field name received from the JMS stream. Default fields include:

    • message: Individual message in a record.

    • destination: Name of the queue or topic that messages are consumed from.

    • messageId: Value that uniquely identifies each message.

    • jmsTimestamp: Time a message was generated.

    • jmsRedelivered: Whether the message was marked for redelivery.

  • Output name: Output field name.

  • Type: Data format for the field (for example, String).

Result fields tab

Result fields tab in JMS Consumer

Use Return fields from to select the step in the child transformation that streams records back to the parent transformation.

See also

Metadata injection support

All fields in this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.

Last updated

Was this helpful?