AMQP Consumer

The Advanced Message Queuing Protocol (AMQP) Consumer step receives streaming data from an AMQP message producer through an AMQP 0-9-1 compatible broker. You can configure this step to use an existing AMQP message queue or create a new one.

You can also set up the AMQP Consumer step to continuously ingest streaming data from either an AMQP message or broker. The parent AMQP Consumer step runs a child transformation that executes according to the message batch size or duration, letting you process a continuous stream of records in near real time.

The child transformation must start with the Get records from stream step. You can also select a step in the child transformation to stream records back to the parent transformation.

circle-info

Because the AMQP Consumer step continuously ingests streaming data, you might want to use the Abort step in either the parent or child transformation to stop consuming records for specific workflows.

For example, you can run the parent transformation on a timed schedule, or abort the child transformation if sensor data exceeds a preset range.

Before you begin

Before you use the AMQP Consumer step, be aware of the following conditions:

  • This step uses and requires the AMQP 0-9-1 messaging protocol.

  • You must have an AMQP 0-9-1 compatible broker (such as RabbitMQarrow-up-right) available before you configure this step.

  • You can use the AMQP Consumer step alone to ingest messages from any AMQP producer or broker. The AMQP Producer step is not required.

  • If you use both AMQP Consumer and AMQP Producer (in the same transformation or in separate transformations), some of the AMQP Consumer settings must match settings in AMQP Producer.

Step name and child transformation

AMQP Consumer dialog box
  • Step name: Specify the unique name of the AMQP Consumer step on the canvas. You can customize the name or leave it as the default.

  • Transformation: Specify the child transformation to execute by doing one of the following:

    • Enter its path.

    • Select Browse to select an existing child transformation.

    • Select New to create and save a new child transformation. For details, see Create a child transformation.

    Note: The selected child transformation must start with the Get records from stream step.

    If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path.

    • Example: If the current transformation path is /home/admin/transformation.ktr and you select a transformation in /home/admin/path/sub.ktr, then the path is converted to ${Internal.Entry.Current.Directory}/path/sub.ktr.

    If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

    Transformations that were previously specified by reference are automatically converted to be specified by transformation name in the Pentaho Repository.

Create a child transformation

If you do not already have a child transformation, you can create one while setting up the AMQP Consumer step.

When you select New, a new child transformation is created and opened in a new canvas tab. It automatically includes the required Get records from stream step.

In addition, the Get records from stream step is automatically configured to match the fields and data types you specify in the parent step’s Fields tab.

1

Create the transformation

In the AMQP Consumer step, select New.

The Save As dialog box appears.

2

Save the transformation

Navigate to the location where you want to save the child transformation, enter a file name, and then select Save.

3

Edit the new transformation

Select the New Transformation tab to view and edit the child transformation. (Optional) Continue to build the child transformation and save it.

4

Return to the AMQP Consumer step

Return to the AMQP Consumer step when finished.

Options

The AMQP Consumer step requires you to specify options and parameters on the Setup, Security, Batch, Fields, and Result fields tabs.

Setup tab

Setup tab in AMQP Consumer

In the Setup tab, specify the connection details and AMQP entity names:

  • Broker connection URI

  • Queue name

  • Exchange name

  • Exchange type

  • Routing keys or headers (depending on exchange type)

When the queue does not already exist, the AMQP Consumer step creates the queue the first time you run the transformation.

New queues default to the following properties:

  • Durable

  • Non-auto-delete

  • Non-exclusive

circle-info

As a best practice, run the AMQP Consumer step first (to create the queue and bindings), then start producing messages.

Define the following options:

  • Connection: The URI address of the AMQP broker that this step connects to. For details, see RabbitMQ URI specificationarrow-up-right.

  • Queue name: The name of the new queue to create and consume from.

    If you specify a queue name that already exists on the broker, but its parameter settings differ from the default parameters above (or if the specified queue has a different Exchange type), the transformation aborts.

  • Exchange name: The exchange name to bind the queue to.

    If the exchange name does not exist, it is created with the following default properties:

    • Durable

    • Non-auto-delete

    If you leave Exchange name blank, use the DEFAULT exchange by setting Exchange type to DIRECT.

    If you use AMQP Producer, it must also have a blank exchange name.

Exchange type

Set Exchange type to the exchange type pattern that the exchange uses:

  • DIRECT: Routes messages to queues based on the message routing key.

  • DEFAULT: To use the default exchange type, do not specify an Exchange name and set Exchange type to DIRECT.

  • FANOUT: Routes messages to all queues that are bound to the fanout exchange. The routing key is ignored.

  • TOPIC: Routes messages to one or more queues based on a match between a message routing key and the pattern used to bind a queue to an exchange.

  • HEADERS: Routes messages using key/value pairs expressed as message headers.

Routing keys

When you use DIRECT or TOPIC as the exchange type, specify the routing key (or multiple routing keys) in the Routing Keys table.

Routing Keys table in Setup tab of AMQP Consumer
circle-info

If you set Exchange type to DIRECT and leave Exchange name blank, the value you set for Queue name is used as the routing key (regardless of whether you specify routing keys in the table).

After you run the transformation, the routing key configuration is bound to the queue. Even if you later remove routing keys from the table, the binding persists in the AMQP broker.

For details about verifying the queue’s bindings, see the RabbitMQ list_bindings documentation: https://www.rabbitmq.com/rabbitmqctl.8.html#list_bindingsarrow-up-right

Headers

When you use HEADERS as the exchange type, specify the header Name and Value pairs in the Headers table. Only string values are accepted.

Headers table

There are two options for specifying headers:

  • Match all headers: For a message to be delivered, the producer message must contain all the header key/value pairs specified in the AMQP Consumer step. (The producer message can contain additional headers.)

  • Match any header: For a message to be delivered, at least one header key/value pair must match between producer and consumer.

After you run the transformation, the header configuration is bound to the queue. Even if you later remove headers from the table, the binding persists in the AMQP broker.

For details about verifying the queue’s bindings, see the RabbitMQ list_bindings documentation: https://www.rabbitmq.com/rabbitmqctl.8.html#list_bindingsarrow-up-right

Security tab

Security tab

The Security tab defines authentication credentials and (optionally) SSL properties for the AMQP broker.

  • Username: The user name required to access the AMQP broker.

  • Password: The password associated with Username.

  • Use secure protocol: Select this option to define SSL properties for the connection.

  • SSL properties: SSL configuration details, including context algorithm, keystore and truststore paths, passwords, and types.

Batch tab

Batch tab in AMQP Consumer

Use this tab to designate how many messages to consume before processing.

Message consumption is triggered by either:

  • Duration (ms), or

  • Number of records

Consumption starts when either threshold is reached. If you set either option to 0, PDI ignores that option.

  • Duration (ms): The amount of time (in milliseconds) that the step collects records before executing the child transformation.

    If this option is 0, Number of records triggers consumption. Either Duration (ms) or Number of records must be greater than 0 to run the transformation.

  • Number of records: The number of records to collect before executing the child transformation.

    If this option is 0, Duration (ms) triggers consumption. Either Duration (ms) or Number of records must be greater than 0 to run the transformation.

  • Maximum concurrent batches: The maximum number of batches used to collect records at the same time.

    Use this option only when the consumer step cannot keep pace with the incoming stream. Your environment must have adequate CPU and memory for the specified value.

  • Message prefetch limit: The maximum number of incoming messages that this step queues for processing.

    Setting this value forces the broker to manage backpressure for messages that exceed the specified limit. The default is 100000.

  • Ack management: How acknowledgements are managed:

    • Ack when message received: Automatically acknowledges message receipt as soon as the broker delivers the message.

      Important: Messages can be lost before they are consumed.

    • Ack when batch completed: Acknowledges to the broker only after the batch is processed.

      Important: Use this option to ensure that all messages are consumed by this step.

Fields tab

Fields tab in AMQP Consumer

Use this tab to define the fields in the record format.

  • Input name: The input name is provided by the AMQP Consumer step. By default, the following input names are assigned:

    • message: The individual message contained in a record.

    • queue name: The queue that records are published to and received from.

    • routing key: The routing key associated with the exchange type.

    • exchange name: The exchange name that messages are received from.

  • Output name: A substitute output field name. The data type must match the Type of the input field.

  • Type: The data format for streaming records: String or Binary. The default is String.

Result fields tab

Result Fields tab in AMQP Consumer

Use this tab to select the step in the child transformation that streams records back to the parent transformation.

This option lets records processed in the child transformation be passed downstream to other steps in the parent transformation.

  • Return fields from: Select the step (from the child transformation) that streams fields back to the parent transformation. The returned field values are available to downstream steps in the parent transformation.

See also

Last updated

Was this helpful?