# Kafka consumer

The PDI client pulls [streaming data](https://github.com/pentaho/documentation/blob/main/PDIA/9.3/PDI/Streaming%20Analytics/Streaming%20analytics=GUID-27004CDD-BC78-457A-ABB5-1683D9AB3FBE=3=en=.md) from Kafka through a Kafka transformation. The parent Kafka Consumer step runs a child (sub-transformation) that executes according to message batch size or duration, letting you process a continuous stream of records in near real-time. The child transformation must start with the [Get records from stream](/pdia-data-integration/9.3-data-integration/pdi-transformation-steps-reference-overview/get-records-from-stream.md) step.

You can configure the Kafka Consumer step to continuously ingest streaming data from your Kafka server. Depending on your setup, you can execute the transformation within PDI or within the Adaptive Execution Layer (AEL), using Spark as the processing engine.

If you are using Spark as the processing engine, you must execute the child transformation according to **Duration (ms)** only.

In the Kafka Consumer step itself, you can define the number of messages to accept for processing, as well as the specific data formats to stream activity data and system metrics. You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts.

Additionally, from the Kafka Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This allows records processed by a Kafka Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.

Kafka records are stored within topics, and consist of a category to which the records are published. Topics are divided into a set of logs known as partitions. Kafka scales topic consumption by distributing partitions among a consumer group. A consumer group is a set of consumers sharing a common group identifier.

Before using the Kafka Consumer step, you must configure a named connection for your distribution. For information on named connections, see [Connecting to a Hadoop cluster with the PDI client](/pdia-data-integration/9.3-data-integration/advanced-topics-pentaho-data-integration-overview/connecting-to-a-hadoop-cluster-with-the-pdi-client-article.md).

**Note:** Since the Kafka Consumer step continuously ingests streaming data, you may want to use the [Abort](/pdia-data-integration/9.3-data-integration/pdi-transformation-steps-reference-overview/abort.md) step in either the parent or child transformation to stop consuming records from Kafka for specific workflows. For example, you can run the parent transformation on a timed schedule, or abort the child transformation if sensor data exceeds a preset range.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pentaho.com/pdia-data-integration/9.3-data-integration/pdi-transformation-steps-reference-overview/kafka-consumer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
