Kafka consumer

The PDI client pulls streaming data from Kafka through a Kafka transformation.

The parent Kafka consumer step runs a child transformation that executes according to message batch size or duration, letting you process a continuous stream of records in near real time.

The child transformation must start with the Get records from stream step.

In the Kafka consumer step itself, you can define:

  • How many messages to accept for processing

  • The record format and data types

  • How to return fields from the child transformation back to the parent transformation

Kafka records are stored within topics and consist of a category to which records are published. Topics are divided into logs known as partitions. Kafka scales topic consumption by distributing partitions among a consumer group.

Before you use the Kafka consumer step, configure a named connection for your distribution. For details, see Connecting to a Hadoop cluster with the PDI client.

circle-info

Because the Kafka consumer step continuously ingests streaming data, you may want to use the Abort step in either the parent or child transformation to stop consuming records for specific workflows.

You can stop consumer ingestion by entering a stop date in the Offset Settings tab.

Step name and child transformation

The Kafka consumer step requires definitions for setup, batch, fields, result fields, Kafka-specific options, and offset settings.

Kafka consumer dialog box
  • Step name: Specify the unique name of the step on the canvas. Default: Kafka consumer.

  • Transformation: Child transformation to execute.

    You can:

    • Enter the transformation path.

    • Select Browse to select an existing child transformation.

    • Select New to create and save a new child transformation.

    The child transformation must start with the Get records from stream step.

    If the selected transformation shares the same root path as the current transformation, PDI automatically inserts ${Internal.Entry.Current.Directory} in place of the common root path.

    Example:

    • Current transformation: /home/admin/transformation.ktr

    • Selected transformation: /home/admin/path/sub.ktr

    • Converted path: ${Internal.Entry.Current.Directory}/path/sub.ktr

    If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.

Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the Kafka consumer step.

When you select New, PDI generates the required Get records from stream step in a new canvas tab. The step’s fields and types match the Fields tab configuration in the parent Kafka consumer step.

  1. In the Kafka consumer step, select New.

  2. In the Save As dialog box, select a location and enter a file name.

  3. Select Save.

  4. Select the new transformation tab to view and edit the child transformation.

  5. When finished, return to the Kafka consumer step.

Options

The Kafka consumer step includes the following tabs:

  • Setup

  • Batch

  • Fields

  • Result fields

  • Options

  • Offset Settings

  • Security

Setup tab

Kafka consumer Setup tab

Use this tab to define connections, the topics to subscribe to, and the consumer group.

  • Connection: Connection type:

    • Direct: Specify Bootstrap servers.

    • Cluster: Specify a Hadoop cluster configuration.

    For details, see Connecting to a Hadoop cluster with the PDI client.

  • Topics: Kafka topics to consume from. Add all topics you want to consume.

  • Consumer group: Consumer group identifier.

    Each Kafka consumer step starts a single thread. When part of a consumer group, each consumer is assigned a subset of topic partitions.

Batch tab

Kafka consumer Batch tab

Use this tab to specify how many messages to consume before processing. Messages are consumed when either Duration (ms) or Number of records is reached.

If you set either value to 0, PDI ignores that setting. To run the child transformation, either Duration (ms) or Number of records must be greater than 0.

  • Duration (ms): Time (in milliseconds) to collect records before executing the child transformation.

  • Number of records: Number of records to collect before executing the child transformation.

  • Maximum concurrent batches: Maximum number of batches to collect at the same time. Default: 1.

    Use this option only when the consumer cannot keep pace with the stream rate and the environment has adequate CPU and memory.

  • Message prefetch limit: Limit for incoming messages to queue for processing. Default: 100000.

  • Offset management:

    • Commit when record read: Commit offset when a record is read.

    • Commit when batch completed: Commit offsets after the batch is processed.

Fields tab

Kafka consumer Fields tab

Use this tab to define the record format.

  • Input name: Incoming fields received from Kafka streams. Default inputs include:

    • key: Determines message distribution to partitions. If no key is present, messages are randomly distributed.

    • message: The message value.

    • topic: Topic name.

    • partition: Partition number.

    • offset: Sequential ID that uniquely identifies the record within the partition.

    • timestamp: Time the message is received on the server.

  • Output name: Output field name.

  • Type: Data type for streaming (applies to key and message). Options include String, Boolean, Number, Integer, and Binary.

Result fields tab

Kafka consumer Results Fields tab

Use this tab to select the step from the child transformation that streams fields back to the parent transformation.

  • Return fields from: Step name in the child transformation that returns fields to the parent transformation.

Options tab

Kafka consumer Options tab

Use this tab to send properties to the broker.

  • Option values can be encrypted.

  • Some common properties are provided for convenience.

  • You can enter any Kafka property.

For property details, see the Apache Kafka documentationarrow-up-right.

Offset Settings tab

Kafka consumer Offset Settings tab

Use this tab to stop the Kafka consumer when the message offset timestamp reaches the timestamp you specify.

Kafka consumer runs in normal mode if you do not specify a value on this tab.

  • Offset timestamp: End timestamp at which to stop the consumer.

  • Timestamp format: Format for the offset timestamp.

    If you provide an epoch value, timestamp format is not required.

Modes

You can run Kafka consumer in three modes:

  1. Infinite loop: If Offset timestamp is empty, the consumer runs indefinitely.

  2. End timestamp: If Offset timestamp is set, the consumer stops when it reaches the specified time.

  3. Read data between two timestamps: Use Kafka consumer with the Kafka Offset job.

    • Kafka Offset job’s Offset timestamp = start time

    • Kafka consumer’s Offset timestamp = end time

Example: If Kafka Offset job is 23/08/02 07:03:00 and Kafka consumer is 23/08/04 07:03:00, the consumer reads a 48-hour period.

circle-info

The Kafka Offset job resets offsets for all partitions in the consumer group topic according to the job Offset timestamp.

Security

You can implement security using SSL, SASL, or SASL SSL.

SSL

  1. On the Setup tab, select Direct and set Bootstrap servers to ${KAFKA_ssl_url}.

  2. On the Options tab, add these properties:

    Option
    Value

    auto.offset.reset

    latest

    ssl.key.password

    $[Key password]

    ssl.keystore.location

    $[Path to Key store]

    ssl.keystore.password

    $[Key store password]

    ssl.truststore.location

    $[Path to Trust store]

    ssl.truststore.password

    $[Trust store Password]

    ssl.protocol

    TLS 1.2

    security.protocol

    SSL

  3. Select OK.

SASL

SASL security requires krb5.conf and a Kerberos principal.

  1. Copy krb5.conf to ${JAVA_HOME}/conf/security.

  2. Run kinit ${KERBEROS_PRINCIPAL_KAFKA} to obtain a Kerberos TGT.

  3. Copy ${KERBEROS_PRINCIPAL_KAFKA}.keytab from the server to the workstation where PDI is installed.

  4. On the Setup tab, select Direct and set Bootstrap servers to ${KAFKA_SASL_PLAINTEXT_URL}.

  5. On the Options tab, add these properties:

    Option
    Value

    auto.offset.reset

    latest

    security.protocol

    SASL_PLAINTEXT

    sasl.mechanism

    GSSAPI

    sasl.kerberos.service.name

    ${KERBEROS_KAFKA_SERVICE_NAME}

    sasl.jaas.config

    ${SASL_JAAS_CONFIG}

  6. Select OK.

Sample ${SASL_JAAS_CONFIG}:

SASL SSL

  1. On the Setup tab, select Direct and set the URL to ${KAFKA_KERBEROS_SSL_URL}.

  2. On the Options tab, add these properties:

    Option
    Value

    auto.offset.reset

    latest

    ssl.truststore.location

    $[Path to Trust store]

    ssl.truststore.password

    $[Trust store Password]

    ssl.keystore.location

    $[Key store location]

    ssl.keystore.password

    $[Key store password]

    ssl.key.password

    $[ Key password]

    security.protocol

    SASL_SSL

    sasl.mechanism

    PLAIN

    sasl.kerberos.service.name

    ${KERBEROS_KAFKA_SERVICE_NAME}

    sasl.jaas.config

    ${SASL_JAAS_CONFIG}

  3. Select OK.

For details about Kafka Kerberos connectivity, see Confluent documentationarrow-up-right.

See also

Last updated

Was this helpful?