# Kafka consumer

The PDI client pulls [streaming data](/pdia-data-integration/extracting-data-into-pdi/streaming-analytics.md) from Kafka through a Kafka transformation.

The parent **Kafka consumer** step runs a child transformation that executes according to message batch size or duration, letting you process a continuous stream of records in near real time.

The child transformation must start with the [Get records from stream](/pdia-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step.

In the Kafka consumer step itself, you can define:

* How many messages to accept for processing
* The record format and data types
* How to return fields from the child transformation back to the parent transformation

Kafka records are stored within topics and consist of a category to which records are published. Topics are divided into logs known as partitions. Kafka scales topic consumption by distributing partitions among a consumer group.

Before you use the Kafka consumer step, configure a named connection for your distribution. For details, see [Connecting to a Hadoop cluster with the PDI client](/pdia-data-integration/extracting-data-into-pdi/connecting-to-a-hadoop-cluster-with-the-pdi-client-article.md).

{% hint style="info" %}
Because the Kafka consumer step continuously ingests streaming data, you may want to use the [Abort](/pdia-data-integration/pdi-transformation-steps-reference-overview/abort.md) step in either the parent or child transformation to stop consuming records for specific workflows.
{% endhint %}

You can stop consumer ingestion by entering a stop date in the **Offset Settings** tab.

### Step name and child transformation

The Kafka consumer step requires definitions for setup, batch, fields, result fields, Kafka-specific options, and offset settings.

![Kafka consumer dialog box](/files/PpzS8TbH1KEmVn1jLBTm)

* **Step name**: Specify the unique name of the step on the canvas. Default: `Kafka consumer`.
* **Transformation**: Child transformation to execute.

  You can:

  * Enter the transformation path.
  * Select **Browse** to select an existing child transformation.
  * Select **New** to create and save a new child transformation.

  The child transformation must start with the **Get records from stream** step.

  If the selected transformation shares the same root path as the current transformation, PDI automatically inserts `${Internal.Entry.Current.Directory}` in place of the common root path.

  Example:

  * Current transformation: `/home/admin/transformation.ktr`
  * Selected transformation: `/home/admin/path/sub.ktr`
  * Converted path: `${Internal.Entry.Current.Directory}/path/sub.ktr`

  If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

#### Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the Kafka consumer step.

When you select **New**, PDI generates the required **Get records from stream** step in a new canvas tab. The step’s fields and types match the **Fields** tab configuration in the parent Kafka consumer step.

1. In the Kafka consumer step, select **New**.
2. In the Save As dialog box, select a location and enter a file name.
3. Select **Save**.
4. Select the new transformation tab to view and edit the child transformation.
5. When finished, return to the Kafka consumer step.

### Options

The Kafka consumer step includes the following tabs:

* **Setup**
* **Batch**
* **Fields**
* **Result fields**
* **Options**
* **Offset Settings**
* **Security**

#### Setup tab

![Kafka consumer Setup tab](/files/BqGUzQ9UG0XLvDXH8uMY)

Use this tab to define connections, the topics to subscribe to, and the consumer group.

* **Connection**: Connection type:

  * **Direct**: Specify **Bootstrap servers**.
  * **Cluster**: Specify a **Hadoop cluster** configuration.

  For details, see [Connecting to a Hadoop cluster with the PDI client](/pdia-data-integration/extracting-data-into-pdi/connecting-to-a-hadoop-cluster-with-the-pdi-client-article.md).
* **Topics**: Kafka topics to consume from. Add all topics you want to consume.
* **Consumer group**: Consumer group identifier.

  Each Kafka consumer step starts a single thread. When part of a consumer group, each consumer is assigned a subset of topic partitions.

#### Batch tab

![Kafka consumer Batch tab](/files/DvoA8uOKc8raMhPd7aPd)

Use this tab to specify how many messages to consume before processing. Messages are consumed when either **Duration (ms)** or **Number of records** is reached.

If you set either value to `0`, PDI ignores that setting. To run the child transformation, either **Duration (ms)** or **Number of records** must be greater than `0`.

* **Duration (ms)**: Time (in milliseconds) to collect records before executing the child transformation.
* **Number of records**: Number of records to collect before executing the child transformation.
* **Maximum concurrent batches**: Maximum number of batches to collect at the same time. Default: `1`.

  Use this option only when the consumer cannot keep pace with the stream rate and the environment has adequate CPU and memory.
* **Message prefetch limit**: Limit for incoming messages to queue for processing. Default: `100000`.
* **Offset management**:
  * **Commit when record read**: Commit offset when a record is read.
  * **Commit when batch completed**: Commit offsets after the batch is processed.

#### Fields tab

![Kafka consumer Fields tab](/files/HzamWX3NXXJvyoJjaazN)

Use this tab to define the record format.

* **Input name**: Incoming fields received from Kafka streams. Default inputs include:
  * `key`: Determines message distribution to partitions. If no key is present, messages are randomly distributed.
  * `message`: The message value.
  * `topic`: Topic name.
  * `partition`: Partition number.
  * `offset`: Sequential ID that uniquely identifies the record within the partition.
  * `timestamp`: Time the message is received on the server.
* **Output name**: Output field name.
* **Type**: Data type for streaming (applies to `key` and `message`). Options include String, Boolean, Number, Integer, and Binary.

#### Result fields tab

![Kafka consumer Results Fields tab](/files/qTmiH2n3Ie0AZ82iiffE)

Use this tab to select the step from the child transformation that streams fields back to the parent transformation.

* **Return fields from**: Step name in the child transformation that returns fields to the parent transformation.

#### Options tab

![Kafka consumer Options tab](/files/KFaHUihjdawYRJgnHoQJ)

Use this tab to send properties to the broker.

* Option values can be encrypted.
* Some common properties are provided for convenience.
* You can enter any Kafka property.

For property details, see the [Apache Kafka documentation](https://kafka.apache.org/documentation/).

#### Offset Settings tab

![Kafka consumer Offset Settings tab](/files/OouuSP0F1gmkh0rq9qgl)

Use this tab to stop the Kafka consumer when the message offset timestamp reaches the timestamp you specify.

Kafka consumer runs in normal mode if you do not specify a value on this tab.

* **Offset timestamp**: End timestamp at which to stop the consumer.
* **Timestamp format**: Format for the offset timestamp.

  If you provide an epoch value, timestamp format is not required.

**Modes**

You can run Kafka consumer in three modes:

1. **Infinite loop**: If **Offset timestamp** is empty, the consumer runs indefinitely.
2. **End timestamp**: If **Offset timestamp** is set, the consumer stops when it reaches the specified time.
3. **Read data between two timestamps**: Use Kafka consumer with the [Kafka Offset](/pdia-data-integration/pdi-job-entries-reference-overview/kafka-offset.md) job.
   * Kafka Offset job’s **Offset timestamp** = start time
   * Kafka consumer’s **Offset timestamp** = end time

Example: If Kafka Offset job is `23/08/02 07:03:00` and Kafka consumer is `23/08/04 07:03:00`, the consumer reads a 48-hour period.

{% hint style="info" %}
The Kafka Offset job resets offsets for all partitions in the consumer group topic according to the job **Offset timestamp**.
{% endhint %}

#### Security

You can implement security using SSL, SASL, or SASL SSL.

**SSL**

1. On the **Setup** tab, select **Direct** and set **Bootstrap servers** to `${KAFKA_ssl_url}`.
2. On the **Options** tab, add these properties:

   | Option                    | Value                     |
   | ------------------------- | ------------------------- |
   | `auto.offset.reset`       | `latest`                  |
   | `ssl.key.password`        | `$[Key password]`         |
   | `ssl.keystore.location`   | `$[Path to Key store]`    |
   | `ssl.keystore.password`   | `$[Key store password]`   |
   | `ssl.truststore.location` | `$[Path to Trust store]`  |
   | `ssl.truststore.password` | `$[Trust store Password]` |
   | `ssl.protocol`            | `TLS 1.2`                 |
   | `security.protocol`       | `SSL`                     |
3. Select **OK**.

**SASL**

SASL security requires `krb5.conf` and a Kerberos principal.

1. Copy `krb5.conf` to `${JAVA_HOME}/conf/security`.
2. Run `kinit ${KERBEROS_PRINCIPAL_KAFKA}` to obtain a Kerberos TGT.
3. Copy `${KERBEROS_PRINCIPAL_KAFKA}.keytab` from the server to the workstation where PDI is installed.
4. On the **Setup** tab, select **Direct** and set **Bootstrap servers** to `${KAFKA_SASL_PLAINTEXT_URL}`.
5. On the **Options** tab, add these properties:

   | Option                       | Value                            |
   | ---------------------------- | -------------------------------- |
   | `auto.offset.reset`          | `latest`                         |
   | `security.protocol`          | `SASL_PLAINTEXT`                 |
   | `sasl.mechanism`             | `GSSAPI`                         |
   | `sasl.kerberos.service.name` | `${KERBEROS_KAFKA_SERVICE_NAME}` |
   | `sasl.jaas.config`           | `${SASL_JAAS_CONFIG}`            |
6. Select **OK**.

Sample `${SASL_JAAS_CONFIG}`:

```
com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true  debug=true doNotPrompt=true keyTab="Path to ${KERBEROS_PRINCIPAL_KAFKA}.keytab" principal="${Pricipal created in Kerberos for Kafka}";
```

**SASL SSL**

1. On the **Setup** tab, select **Direct** and set the URL to `${KAFKA_KERBEROS_SSL_URL}`.
2. On the **Options** tab, add these properties:

   | Option                       | Value                            |
   | ---------------------------- | -------------------------------- |
   | `auto.offset.reset`          | `latest`                         |
   | `ssl.truststore.location`    | `$[Path to Trust store]`         |
   | `ssl.truststore.password`    | `$[Trust store Password]`        |
   | `ssl.keystore.location`      | `$[Key store location]`          |
   | `ssl.keystore.password`      | `$[Key store password]`          |
   | `ssl.key.password`           | `$[ Key password]`               |
   | `security.protocol`          | `SASL_SSL`                       |
   | `sasl.mechanism`             | `PLAIN`                          |
   | `sasl.kerberos.service.name` | `${KERBEROS_KAFKA_SERVICE_NAME}` |
   | `sasl.jaas.config`           | `${SASL_JAAS_CONFIG}`            |
3. Select **OK**.

For details about Kafka Kerberos connectivity, see [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_gssapi.html).

### See also

* [Kafka Producer](/pdia-data-integration/pdi-transformation-steps-reference-overview/kafka-producer.md)
* [Kafka Offset](/pdia-data-integration/pdi-job-entries-reference-overview/kafka-offset.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/kafka-consumer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
