# JMS Consumer

Use the **JMS Consumer** step to receive [streaming data](https://docs.pentaho.com/pdia-data-integration/extracting-data-into-pdi/streaming-analytics) from an Apache ActiveMQ Java Messaging Service (JMS) server or IBM MQ middleware.

The parent JMS Consumer step runs a child transformation that executes according to the message batch size or duration, letting you process a continuous stream of records in near real time. The child transformation must start with the [Get records from stream](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream) step.

In the JMS Consumer step, you can define how many messages to accept for processing, as well as the data formats to stream activity data and system metrics. You can also select a step in the child transformation to stream records back to the parent transformation, which passes records downstream to other steps in the parent transformation.

{% hint style="info" %}
Because the JMS Consumer step continuously ingests streaming data, you may want to use the [Abort](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/abort) step in either the parent or child transformation to stop consuming records for specific workflows.
{% endhint %}

### Before you begin

Before you use the JMS Consumer step:

* Be familiar with JMS messaging.
* Have a message broker available, such as Apache ActiveMQ Artemis or IBM MQ.
* This step supports JMS 2.0 and requires [Apache ActiveMQ Artemis](https://activemq.apache.org/artemis/).
* If you need JMS 1.1 with ActiveMQ or Artemis, use the previous versions of the JMS Consumer and JMS Producer steps (Pentaho 8.1 and earlier).

#### IBM MQ client JAR

If you use IBM MQ, place the IBM MQ client JAR in:

* PDI client: `data-integration/plugins/pentaho-streaming-jms-plugin/lib`
* Pentaho Server: `server/pentaho-server/pentaho-solutions/system/karaf/deploy`

You can get the WebSphere MQ classes for the JMS Java library from your IBM WebSphere MQ installation. You can also find this library in your [IBM WebSphere MQ Client SupportPac](https://idaas.iam.ibm.com/idaas/mtfim/sps/authsvc?PolicyId=urn:ibm:security:authentication:asf:basicldapuser). The WebSphere MQ Java library version that the PDI plugin steps were built against is 9.4.0.0. The library you must provide is `com.ibm.mq.allclient-9.4.0.0.jar`. Because IBM licensing prevents Pentaho from distributing the library directly, you must add it to your environment.

#### Additional JMS library JAR

Place the JMS library JAR for your `ConnectionFactory` and other supporting classes in:

* PDI client: `data-integration/plugins/pentaho-streaming-jms-plugin/lib`
* Pentaho Server: `server/pentaho-server/pentaho-solutions/system/karaf/deploy`

#### OPT environment variable

Set the OPT environment variable for your operating system.

**Linux or Unix**

1. In a text editor, open the `...\data-integration\spoon.sh` file.
2. Locate the line that defines the `OPT` variable:

   ```bat
   OPT="$OPT $PENTAHO_DI_JAVA_OPTIONS
   ```
3. Append the following JVM option to the end of that line:

   ```shellscript
   -Dcom.ibm.mq.cfg.useIBMCipherMappings=false
   ```
4. &#x20;Save and close the `...\data-integration\spoon.sh` file.

**Windows**

1. In a text editor, open the `...\data-integration\Spoon.bat` file.
2. Locate the line that defines the `OPT` variable:

   ```bat
   set OPT=%OPT% %PENTAHO_DI_JAVA_OPTIONS%
   ```
3. Append the following JVM option to the end of that line:

   ```batch
   "-Dcom.ibm.mq.cfg.useIBMCipherMappings=false"
   ```
4. Save and close the `...\data-integration\Spoon.bat` file.

### General

The JMS Consumer step requires definitions for setup, batch, field, and result fields to stream messages.

![JMS Consumer dialog box](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-5b3841497a819e29f6e5e44143fc3df7147f8d6a%2FPDITransStep_JMSConsumer_PropertiesDialogBox.png?alt=media)

* **Step name**: Name of the step on the canvas. Default: `JMS Consumer`.
* **Transformation**: Child transformation to execute.

  You can:

  * Enter the transformation path.
  * Select **Browse** to select an existing child transformation.
  * Select **New** to create and save a new child transformation.

  The selected child transformation must start with the **Get records from stream** step.

  If the selected transformation shares the same root path as the current transformation, PDI automatically inserts `${Internal.Entry.Current.Directory}` in place of the common root path.

  Example:

  * Current transformation: `/home/admin/transformation.ktr`
  * Selected transformation: `/home/admin/path/sub.ktr`
  * Converted path: `${Internal.Entry.Current.Directory}/path/sub.ktr`

  If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

#### Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the JMS Consumer step.

When you select **New**, PDI generates the required **Get records from stream** step in a new canvas tab. The step’s fields and types match the fields and types you specify on the **Fields** tab of the parent JMS Consumer step.

1. In the JMS Consumer step, select **New**.
2. In the Save As dialog box, select a location and enter a file name.
3. Select **Save**.
4. Select the new transformation tab to view and edit the child transformation.
5. When finished, return to the JMS Consumer step.

### Options

The JMS Consumer step includes the following tabs: **Setup**, **Security**, **Batch**, **Fields**, and **Result fields**.

#### Setup tab

Define the connections used for receiving messages, topics, and queues.

* **IBM MQ**: Select if you are using IBM MQ as your message broker.
* **ActiveMQ**: Select if you are using Apache ActiveMQ Artemis (JMS 2.0) as your message broker.
* **JMS URL**: Broker URL for the selected connection type.
* **Destination type**: Delivery model to use:
  * **Topic**: Publish-and-subscribe model. One message can be delivered to multiple consumers.
  * **Queue**: Point-to-point model. One message is delivered from a single producer to a single consumer.
* **Destination name**: Name of the topic or queue.
* **Receive timeout**: Time to wait for incoming messages, in milliseconds. A timeout of `0` never expires.

#### Security tab

![Security tab in JMS Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-19ac7c9a3ec9a9f722bff8e3780e5186b45193c9%2FPDITransStep_JMSConsumer_SecurityTab.png?alt=media)

Authentication:

* **Username**: User name required to access the ActiveMQ or IBM MQ server.
* **Password**: Password associated with the user name.
* **Use secure protocol**: Select to secure the message stream with SSL. You can adjust settings in the SSL properties table.

SSL properties:

| Name                     | Value                                                                                                                                                                                                                                                                 |
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Ciphersuite**          | CipherSuite name. Values depend on the provider. For details, see [Sun Providers](https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunProviders.html).                                                                                                 |
| **Context Algorithm**    | Name of the secure protocol you are using.                                                                                                                                                                                                                            |
| **FIPS required**        | IBM MQ only. Set to `True` to enable FIPS support. Set to `False` if FIPS is not required.                                                                                                                                                                            |
| **Key Store Password**   | Password for the key store.                                                                                                                                                                                                                                           |
| **Key Store Path**       | File path to the key store.                                                                                                                                                                                                                                           |
| **Key Store Type**       | IBM MQ only. Key store format.                                                                                                                                                                                                                                        |
| **SSL Provider**         | ActiveMQ only. SSL implementation: JDK or OpenSSL. If you select OpenSSL, you must provide the OpenSSL libraries. For details, see the ActiveMQ documentation: [Configuring transports](https://activemq.apache.org/artemis/docs/latest/configuring-transports.html). |
| **Trust Store Password** | IBM MQ only. Password for the trust store.                                                                                                                                                                                                                            |
| **Trust All**            | ActiveMQ only. Set to `True` to trust all certificates without validation (not recommended). Set to `False` (recommended).                                                                                                                                            |
| **Trust Store Path**     | File path to the trust store certificates.                                                                                                                                                                                                                            |
| **Trust Store Type**     | IBM MQ only. Trust store format.                                                                                                                                                                                                                                      |
| **Verify Host**          | ActiveMQ only. Set to `True` to verify the host server name matches its certificate. Set to `False` to omit host verification.                                                                                                                                        |

#### Batch tab

![Batch tab in JMS Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-d6ceb81f853b7a94be1a8e4d68447105da646e65%2FPDI_JMS_consumer_step_batch_tab_new_prefetch_options.png?alt=media)

Specify how many messages to consume before processing. Messages are consumed when either **Duration (ms)** or **Number of records** is reached.

If you set either value to `0`, PDI ignores that setting. To run the transformation, either **Duration (ms)** or **Number of records** must be greater than `0`.

* **Duration (ms)**: Amount of time (in milliseconds) to collect records before executing the child transformation.

{% hint style="info" %}
You must set this field if you use Spark as your processing engine.
{% endhint %}

* **Number of records**: Number of records to collect before executing the child transformation.
* **Maximum concurrent batches**: Maximum number of batches to collect records at the same time. Default: `1`.

  Use this option only when your consumer step cannot keep pace with the stream rate and your environment has adequate CPU and memory.
* **Message prefetch limit**: Limit for the number of incoming messages to queue for processing. Default: `100000`.

#### Fields tab

![Fields tab in JMS Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-6e57f239edde7c93090139982e8a91c2e049bb59%2FPDITransStep_JMSConsumer_FieldsTab.png?alt=media)

Define the fields in the record format.

* **Input name**: Field name received from the JMS stream. Default fields include:
  * `message`: Individual message in a record.
  * `destination`: Name of the queue or topic that messages are consumed from.
  * `messageId`: Value that uniquely identifies each message.
  * `jmsTimestamp`: Time a message was generated.
  * `jmsRedelivered`: Whether the message was marked for redelivery.
* **Output name**: Output field name.
* **Type**: Data format for the field (for example, String).

#### Result fields tab

![Result fields tab in JMS Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-12943c8ab187d03265e71b5b14d406801f62e34f%2FPDITransStep_JMSConsumer_ResultFieldsTab.png?alt=media)

Use **Return fields from** to select the step in the child transformation that streams records back to the parent transformation.

### See also

* [JMS Producer](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/jms-producer)

### Metadata injection support

All fields in this step support metadata injection. You can use this step with [ETL metadata injection](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/etl-metadata-injection) to pass metadata to your transformation at runtime.
