# MQTT Consumer

The PDI client can pull [streaming data](https://github.com/pentaho/documentation/blob/main/PDIA/11.0/PDI/Streaming%20Analytics/Streaming%20analytics=GUID-27004CDD-BC78-457A-ABB5-1683D9AB3FBE=3=en=.md) from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the [Get records from stream](/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step.

Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.

![MQTT Consumer dialog box](/files/uK0oYOSSfXQwwxe0tB8o)

### Step name

* **Step name**: Specifies the unique name of the step on the canvas. Default: `MQTT Consumer`.

### Transformation

Use **Transformation** to specify the child transformation to run.

You can specify the child transformation by doing any of the following:

* Enter the transformation path.
* Select **Browse** to select an existing child transformation.
* Select **New** to create and save a new child transformation. For details, see [Create and save a new child transformation](#create-and-save-a-new-child-transformation).

The selected child transformation must start with the [Get records from stream](/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step.

If you select a transformation that has the same root path as the current transformation, the variable `${Internal.Entry.Current.Directory}` is inserted automatically in place of the common root path.

Example:

* Current transformation: `/home/admin/transformation.ktr`
* Selected transformation: `/home/admin/path/sub.ktr`
* Converted path: `${Internal.Entry.Current.Directory}/path/sub.ktr`

If you are working with a repository, specify the name of the transformation. If you are not working with a repository, specify the XML file name of the transformation.

Transformations previously specified by reference are converted automatically to use the transformation name in the Pentaho Repository.

#### Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the MQTT Consumer step.

When you select **New**, PDI generates the required [Get records from stream](/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step in a new canvas tab. The step’s fields and types match the **Fields** tab configuration in the parent MQTT Consumer step.

1. In the MQTT Consumer step, select **New**.

   The **Save As** dialog box appears.
2. Navigate to the location where you want to save your new child transformation, then enter the file name.
3. Select **Save**.

   A notification box informs you that the child transformation has been created and opened in a new tab. If you do not want to see this notification again, select **Don't show me this again**.
4. Select the new transformation tab to view and edit the child transformation.

   It automatically contains the [Get records from stream](/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step. Optionally, you can continue to build this transformation and save it.
5. When finished, return to the MQTT Consumer step.

### Options

The MQTT Consumer step includes the following tabs.

* **Setup**
* **Security**
* **Batch**
* **Fields**
* **Result fields**
* **Options**

#### Setup tab

![Setup tab in MQTT Consumer](/files/w9m9pAoWyNDLl7RqFr0b)

In this tab, define the connections used for receiving messages, topics to which you want to subscribe, and the consumer group for the topics.

| Option                       | Description                                                                                                                                                                                                       |
| ---------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Connection**               | Specify the address of the MQTT server to which this step will connect for sending or retrieving messages.                                                                                                        |
| **Client ID**                | Specify a unique ID for the MQTT client. The MQTT server uses this client ID to recognize each distinct client and that client's current state.                                                                   |
| **Topics**                   | Specify the MQTT topic or topics to subscribe to.                                                                                                                                                                 |
| **Quality of Service (QoS)** | <p>Quality of Service (QoS) is a level of guarantee for message delivery. Select one of the following options:</p><ul><li>At most once (0) (default)</li><li>At least once (1)</li><li>Exactly once (2)</li></ul> |

#### Security tab

![Security tab in MQTT Consumer](/files/NZYIwD3PRCC7BldwLwcd)

Use this tab to define authentication credentials for the MQTT server.

| Option                  | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Username**            | Specify the user name required to access the MQTT server.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| **Password**            | Specify the password associated with the user name.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| **Use secure protocol** | Select this option to define SSL properties for the connection.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| **SSL Properties**      | <ul><li><strong>ssl.contextProvider</strong></li></ul><p>Specify the underlying JSSE provider.</p><ul><li><strong>ssl.enabledCipherSuites</strong></li></ul><p>Specify which ciphers are enabled. Values are dependent on the provider.</p><ul><li><strong>ssl.keyManager</strong></li></ul><p>Specify the algorithm that will be used to create a KeyManagerFactory object instead of using the default algorithm available in the platform.</p><ul><li><strong>ssl.keyStore</strong></li></ul><p>Specify the name of the file that contains the KeyStore object that you want the KeyManager to use.</p><ul><li><strong>ssl.keyStorePassword</strong></li></ul><p>Specify the password for the KeyStore object that you want the KeyManager to use.</p><ul><li><strong>ssl.keyStoreProvider</strong></li></ul><p>Specify the identifying name or string for the key store provider.</p><ul><li><strong>ssl.keyStoreType</strong></li></ul><p>Specify the identifying name or string for the type of key store.</p><ul><li><strong>ssl.protocol</strong></li></ul><p>Specify the type of SSL protocol to use.</p><ul><li><strong>ssl.trustManager</strong></li></ul><p>Specify the algorithm that will be used to create a TrustManagerFactory object, instead of using the default algorithm available in the platform.</p><ul><li><strong>ssl.trustStore</strong></li></ul><p>Specify the name of the file that contains the KeyStore object that you want the TrustManager to use.</p><ul><li><strong>ssl.trustStorePassword</strong></li></ul><p>Specify the password for the TrustStore object that you want the TrustManager to use.</p><ul><li><strong>ssl.trustStoreProvider</strong></li></ul><p>Specify the identifier or string for the trust store provider.</p><ul><li><strong>ssl.trustStoreType</strong></li></ul><p>Specify the type of KeyStore object that you want the TrustManager to use.</p> |

#### Batch tab

![Batch tab in MQTT Consumer](/files/5wRJU4Vw5qIxDxoR7e6C)

Use this tab to specify how many messages to consume before processing. You can specify message count and or a specific amount of time.

The number of messages consumed before processing is defined by either **Duration (ms)** or **Number of records**. Messages are consumed when either the specified duration or number of records occurs. If you set either option to `0`, PDI ignores that parameter.

| Option                         | Description                                                                                                                                                                                                                                                                                                                                                     |
| ------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Duration (ms)**              | Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation. If this option is set to `0`, then **Number of records** triggers consumption. Either **Duration** or **Number of records** must be greater than `0` to run the transformation.                           |
| **Number of records**          | Specify a number. After every X records, the specified transformation is run and those records are passed to the transformation. If this option is set to `0`, then **Duration** triggers consumption. Either **Duration** or **Number of records** must be greater than `0` to run the transformation.                                                         |
| **Maximum concurrent batches** | Specify the maximum number of batches used to collect records at the same time. Default: `1`. Use this option only when your consumer step cannot keep pace with the speed at which the data is streaming and the environment has adequate CPU and memory. An error occurs if the environment cannot handle the maximum number of concurrent batches specified. |
| **Message prefetch limit**     | Specify a limit for how many incoming messages this step will queue for processing, as they are received from the broker. Setting this value forces the broker to manage the backpressure of messages exceeding the specified limit. Default: `100000`.                                                                                                         |

#### Fields tab

![Fields tab in MQTT Consumer](/files/z4q0wP0CPzV7QiuhQ3o3)

Use this tab to define the fields in the record format.

| Option          | Description                                                                                                                                                                                                                                                                              |
| --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Input name**  | <p>The input name is received from the MQTT streams. The following are received by default:</p><ul><li><strong>message</strong></li></ul><p>The individual message contained in a record.</p><ul><li><strong>topic</strong></li></ul><p>The category to which records are published.</p> |
| **Output name** | The output name can be mapped to subscriber and member requirements.                                                                                                                                                                                                                     |
| **Type**        | This value is always String. This field applies to the message and topic input names.                                                                                                                                                                                                    |

#### Result fields tab

![Result fields tab in MQTT Consumer](/files/dqir0ZIP1TOwgHF9B480)

Use this tab to select the step from the child transformation that will stream records back to the parent transformation.

| Option                 | Description                                                                                                                                                                                                                                    |
| ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Return fields from** | Select the name of the step (from the child transformation) that will stream fields back to the parent transformation. The data values in these returned fields are available to any subsequent downstream steps in the parent transformation. |

#### Options tab

![Options tab in MQTT Consumer](/files/c1LaKXyIGrubhaxczrRv)

This tab includes the following MQTT-specific parameters.

| Parameter               | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Keep Alive Interval** | Specify a maximum number of interval seconds that is permitted to elapse between the point at which the PDI client finishes transmitting one control packet and the point it starts sending the next.                                                                                                                                                                                                                                                                                                                           |
| **Max Inflight**        | Specify the maximum number of messages to have in process at any given time.                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| **Connection Timeout**  | Specify the time (in seconds) to disconnect if a message is not received.                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| **Clean Session**       | <p>Specify whether the broker stores or purges messages for a session:</p><ul><li><strong><code>True</code></strong></li></ul><p>The broker does not store any information for the client. All information from a previous persistent session is purged.</p><ul><li><strong><code>False</code></strong></li></ul><p>The broker stores all subscriptions for the client. When QoS is set to 1 or 2, missed messages are stored. For details, see the QoS setting in the <a href="#setup-tab"><strong>Setup tab</strong></a>.</p> |
| **Storage Level**       | <p>Indicates whether messages are stored in memory or on disk:</p><ul><li>Blank (default): memory</li><li>Disk: enter a valid path</li></ul>                                                                                                                                                                                                                                                                                                                                                                                    |
| **Server URIs**         | Specify the MQTT server universal resource identifier (URI).                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| **MQTT Version**        | Specify the MQTT protocol version that this step connects to.                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| **Automatic Reconnect** | <p>Enable the client to attempt an automatic reconnect to the server if it becomes disconnected:</p><ul><li><strong><code>True</code></strong>: reconnect to the server</li><li><strong><code>False</code></strong>: do not reconnect</li></ul>                                                                                                                                                                                                                                                                                 |

### Metadata injection support

This step supports metadata injection. You can use it with [ETL metadata injection](/pdia-data-integration/pdi-transformation-steps-reference-overview/etl-metadata-injection.md) to pass metadata to your transformation at runtime.

### See also

* [MQTT Producer](/pdia-data-integration/pdi-transformation-steps-reference-overview/mqtt-producer.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/mqtt-consumer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
