# AMQP Consumer

The Advanced Message Queuing Protocol (AMQP) Consumer step receives [streaming data](https://docs.pentaho.com/pdia-data-integration/extracting-data-into-pdi/streaming-analytics) from an AMQP message producer through an AMQP 0-9-1 compatible broker. You can configure this step to use an existing AMQP message queue or create a new one.

You can also set up the AMQP Consumer step to continuously ingest streaming data from either an AMQP message or broker. The parent AMQP Consumer step runs a child transformation that executes according to the message batch size or duration, letting you process a continuous stream of records in near real time.

The child transformation must start with the [Get records from stream](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream) step. You can also select a step in the child transformation to stream records back to the parent transformation.

{% hint style="info" %}
Because the AMQP Consumer step continuously ingests streaming data, you might want to use the [Abort](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/abort) step in either the parent or child transformation to stop consuming records for specific workflows.

For example, you can run the parent transformation on a timed schedule, or abort the child transformation if sensor data exceeds a preset range.
{% endhint %}

### Before you begin

Before you use the AMQP Consumer step, be aware of the following conditions:

* This step uses and requires the AMQP 0-9-1 messaging protocol.
* You must have an AMQP 0-9-1 compatible broker (such as [RabbitMQ](https://www.rabbitmq.com/)) available before you configure this step.
* You can use the AMQP Consumer step alone to ingest messages from any AMQP producer or broker. The [AMQP Producer](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/amqp-producer) step is not required.
* If you use both AMQP Consumer and AMQP Producer (in the same transformation or in separate transformations), some of the AMQP Consumer settings must match settings in AMQP Producer.

### Step name and child transformation

![AMQP Consumer dialog box](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-f75687aec5bbd1fd32d8fc48d472a9ce8b1d6264%2FPDITransStep_AMQPConsumer_PropertiesDialogBox.png?alt=media)

* **Step name**: Specify the unique name of the AMQP Consumer step on the canvas. You can customize the name or leave it as the default.
* **Transformation**: Specify the child transformation to execute by doing one of the following:

  * Enter its path.
  * Select **Browse** to select an existing child transformation.
  * Select **New** to create and save a new child transformation. For details, see [Create a child transformation](#create-a-child-transformation).

  **Note:** The selected child transformation must start with the **Get records from stream** step.

  If you select a transformation that has the same root path as the current transformation, the variable `${Internal.Entry.Current.Directory}` is automatically inserted in place of the common root path.

  * Example: If the current transformation path is `/home/admin/transformation.ktr` and you select a transformation in `/home/admin/path/sub.ktr`, then the path is converted to `${Internal.Entry.Current.Directory}/path/sub.ktr`.

  If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

  Transformations that were previously specified by reference are automatically converted to be specified by transformation name in the Pentaho Repository.

### Create a child transformation

If you do not already have a child transformation, you can create one while setting up the AMQP Consumer step.

When you select **New**, a new child transformation is created and opened in a new canvas tab. It automatically includes the required **Get records from stream** step.

In addition, the **Get records from stream** step is automatically configured to match the fields and data types you specify in the parent step’s **Fields** tab.

{% stepper %}
{% step %}

### Create the transformation

In the AMQP Consumer step, select **New**.

The **Save As** dialog box appears.
{% endstep %}

{% step %}

### Save the transformation

Navigate to the location where you want to save the child transformation, enter a file name, and then select **Save**.
{% endstep %}

{% step %}

### Edit the new transformation

Select the **New Transformation** tab to view and edit the child transformation. (Optional) Continue to build the child transformation and save it.
{% endstep %}

{% step %}

### Return to the AMQP Consumer step

Return to the AMQP Consumer step when finished.
{% endstep %}
{% endstepper %}

### Options

The AMQP Consumer step requires you to specify options and parameters on the **Setup**, **Security**, **Batch**, **Fields**, and **Result fields** tabs.

#### Setup tab

![Setup tab in AMQP Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-c72d87e91d6dcf32c8db0691bd01846cd03b3226%2FPDITransStep_AMQPConsumer_SetupTab.png?alt=media)

In the **Setup** tab, specify the connection details and AMQP entity names:

* Broker connection URI
* Queue name
* Exchange name
* Exchange type
* Routing keys or headers (depending on exchange type)

{% tabs %}
{% tab title="Create a new queue" %}
When the queue does not already exist, the AMQP Consumer step creates the queue the first time you run the transformation.

New queues default to the following properties:

* Durable
* Non-auto-delete
* Non-exclusive

{% hint style="info" %}
As a best practice, run the AMQP Consumer step first (to create the queue and bindings), then start producing messages.
{% endhint %}

Define the following options:

* **Connection**: The URI address of the AMQP broker that this step connects to. For details, see [RabbitMQ URI specification](https://www.rabbitmq.com/uri-spec.html).
* **Queue name**: The name of the new queue to create and consume from.

  If you specify a queue name that already exists on the broker, but its parameter settings differ from the default parameters above (or if the specified queue has a different **Exchange type**), the transformation aborts.
* **Exchange name**: The exchange name to bind the queue to.

  If the exchange name does not exist, it is created with the following default properties:

  * Durable
  * Non-auto-delete

  If you leave **Exchange name** blank, use the **DEFAULT** exchange by setting **Exchange type** to **DIRECT**.

  If you use [AMQP Producer](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/amqp-producer), it must also have a blank exchange name.
  {% endtab %}

{% tab title="Use an existing queue" %}
To use an AMQP message queue that already exists on the broker, define the following options:

* **Connection**: The URI address of the AMQP broker that this step connects to. For details, see [RabbitMQ URI specification](https://www.rabbitmq.com/uri-spec.html).
* **Queue name**: The name of the existing queue to consume from.

  The queue must conform to the following parameters:

  * Durable
  * Non-auto-delete
  * Non-exclusive

  If you specify a queue name that does not exist, the step creates a new queue with these parameters.
* **Exchange name**: The exchange name to bind the queue to.

  If you leave **Exchange name** blank, use the **DEFAULT** exchange by setting **Exchange type** to **DIRECT**.

  If you use [AMQP Producer](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/amqp-producer), it must also have a blank exchange name.
  {% endtab %}
  {% endtabs %}

**Exchange type**

Set **Exchange type** to the exchange type pattern that the exchange uses:

* **DIRECT**: Routes messages to queues based on the message routing key.
* **DEFAULT**: To use the default exchange type, do not specify an **Exchange name** and set **Exchange type** to **DIRECT**.
* **FANOUT**: Routes messages to all queues that are bound to the fanout exchange. The routing key is ignored.
* **TOPIC**: Routes messages to one or more queues based on a match between a message routing key and the pattern used to bind a queue to an exchange.
* **HEADERS**: Routes messages using key/value pairs expressed as message headers.

**Routing keys**

When you use **DIRECT** or **TOPIC** as the exchange type, specify the routing key (or multiple routing keys) in the **Routing Keys** table.

![Routing Keys table in Setup tab of AMQP Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-6d6f55601c76ea300a49719fd26a0ec6e92b685e%2FPDI_TransStep_AMQP-Consumer_Routing-Keys_Table.png?alt=media)

{% hint style="info" %}
If you set **Exchange type** to **DIRECT** and leave **Exchange name** blank, the value you set for **Queue name** is used as the routing key (regardless of whether you specify routing keys in the table).
{% endhint %}

After you run the transformation, the routing key configuration is bound to the queue. Even if you later remove routing keys from the table, the binding persists in the AMQP broker.

For details about verifying the queue’s bindings, see the RabbitMQ `list_bindings` documentation: <https://www.rabbitmq.com/rabbitmqctl.8.html#list_bindings>

**Headers**

When you use **HEADERS** as the exchange type, specify the header **Name** and **Value** pairs in the **Headers** table. Only string values are accepted.

![Headers table](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-ec6658c4920f7bca39edf165245ef608d10dc7a8%2FPDI_TransStep_AMQP-Consumer_Headers_Table.png?alt=media)

There are two options for specifying headers:

* **Match all headers**: For a message to be delivered, the producer message must contain all the header key/value pairs specified in the AMQP Consumer step. (The producer message can contain additional headers.)
* **Match any header**: For a message to be delivered, at least one header key/value pair must match between producer and consumer.

After you run the transformation, the header configuration is bound to the queue. Even if you later remove headers from the table, the binding persists in the AMQP broker.

For details about verifying the queue’s bindings, see the RabbitMQ `list_bindings` documentation: <https://www.rabbitmq.com/rabbitmqctl.8.html#list_bindings>

#### Security tab

![Security tab](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-98dd341e38136f288caac798c10af99a8f9f7193%2FPDITransStep_AMQPConsumer_SecurityTab.png?alt=media)

The **Security** tab defines authentication credentials and (optionally) SSL properties for the AMQP broker.

* **Username**: The user name required to access the AMQP broker.
* **Password**: The password associated with **Username**.
* **Use secure protocol**: Select this option to define SSL properties for the connection.
* **SSL properties**: SSL configuration details, including context algorithm, keystore and truststore paths, passwords, and types.

#### Batch tab

![Batch tab in AMQP Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-d9ce98132a9311d1f5bd0d99e4426ded17dbdb1b%2FPDI_AMQP_consumer_step_batch_tab_new_prefetch_options.png?alt=media)

Use this tab to designate how many messages to consume before processing.

Message consumption is triggered by either:

* **Duration (ms)**, or
* **Number of records**

Consumption starts when either threshold is reached. If you set either option to `0`, PDI ignores that option.

* **Duration (ms)**: The amount of time (in milliseconds) that the step collects records before executing the child transformation.

  If this option is `0`, **Number of records** triggers consumption. Either **Duration (ms)** or **Number of records** must be greater than `0` to run the transformation.
* **Number of records**: The number of records to collect before executing the child transformation.

  If this option is `0`, **Duration (ms)** triggers consumption. Either **Duration (ms)** or **Number of records** must be greater than `0` to run the transformation.
* **Maximum concurrent batches**: The maximum number of batches used to collect records at the same time.

  Use this option only when the consumer step cannot keep pace with the incoming stream. Your environment must have adequate CPU and memory for the specified value.
* **Message prefetch limit**: The maximum number of incoming messages that this step queues for processing.

  Setting this value forces the broker to manage backpressure for messages that exceed the specified limit. The default is `100000`.
* **Ack management**: How acknowledgements are managed:
  * **Ack when message received**: Automatically acknowledges message receipt as soon as the broker delivers the message.

    **Important:** Messages can be lost before they are consumed.
  * **Ack when batch completed**: Acknowledges to the broker only after the batch is processed.

    **Important:** Use this option to ensure that all messages are consumed by this step.

#### Fields tab

![Fields tab in AMQP Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-c317399b4e5622663fe19373652e77c56b3b81ea%2FPDITransStep_AMQPConsumer_FieldsTab.png?alt=media)

Use this tab to define the fields in the record format.

* **Input name**: The input name is provided by the AMQP Consumer step. By default, the following input names are assigned:
  * `message`: The individual message contained in a record.
  * `queue name`: The queue that records are published to and received from.
  * `routing key`: The routing key associated with the exchange type.
  * `exchange name`: The exchange name that messages are received from.
* **Output name**: A substitute output field name. The data type must match the **Type** of the input field.
* **Type**: The data format for streaming records: **String** or **Binary**. The default is **String**.

#### Result fields tab

![Result Fields tab in AMQP Consumer](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-b4796b14e8f6acf5df0cff6c7bcf4024dd9406de%2FPDITransStep_AMQPConsumer_ResultFieldsTab.png?alt=media)

Use this tab to select the step in the child transformation that streams records back to the parent transformation.

This option lets records processed in the child transformation be passed downstream to other steps in the parent transformation.

* **Return fields from:** Select the step (from the child transformation) that streams fields back to the parent transformation. The returned field values are available to downstream steps in the parent transformation.

### See also

* [AMQP Producer](https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/amqp-producer)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/amqp-consumer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
