Kafka Producer

The Kafka Producer step publishes messages in near real time to one Kafka topic.

Kafka records are stored within topics and consist of a category to which records are published. Topics are divided into logs known as partitions.

Before you use the Kafka Producer step, configure a named connection for your distribution. For details, see Connecting to a Hadoop cluster with the PDI client.

Step name

  • Step name: Specify the unique name of the step on the canvas. Default: Kafka Producer.

Options

The Kafka Producer step includes these tabs:

  • Setup

  • Options (configuration properties)

  • Security

Setup tab

Kafka Producer step
  • Connection: Connection type.

    • Direct: Specify Bootstrap servers.

    • Cluster: Specify a Hadoop cluster configuration.

    In a Hadoop cluster configuration, you can specify host names and ports for HDFS, Job Tracker, security, and other big data cluster components.

  • Client ID: Unique client identifier used to identify and set up a durable connection path to the server.

  • Topic: Topic to publish to.

  • Key Field: Optional key field.

    Kafka can distribute messages to partitions based on their keys. If no key is present, messages are randomly distributed to partitions.

  • Message Field: Field that contains the message value.

Options tab

Options tab

Use this tab to configure Kafka producer properties.

  • Property values can be encrypted.

  • You can use PDI environment variables, kettle.properties variables, and parameter values.

For property details, see the Apache Kafka documentationarrow-up-right.

Security

You can implement security using SSL, SASL, or SASL SSL.

Using SSL

  1. On the Setup tab, select Direct and set Bootstrap servers to ${KAFKA_ssl_url}.

  2. On the Options tab, add these properties:

    Option
    Value

    compression.type

    none

    ssl.truststore.location

    $[Path to Trust store]

    ssl.truststore.password

    $[Password]

    ssl.keystore.location

    $[Path to Key store]

    ssl.keystore.password

    $[Key store password]

    ssl.key.password

    $[Key password]

    security.protocol

    SSL

    ssl.protocol

    TLS 1.2

  3. Select OK.

Using SASL

SASL security requires the Kerberos configuration file krb5.conf and a Kerberos principal.

  1. Copy krb5.conf to ${JAVA_HOME}/conf/security.

  2. Run kinit ${KERBEROS_PRINCIPAL_KAFKA} to obtain a Kerberos TGT.

  3. Copy ${KERBEROS_PRINCIPAL_KAFKA}.keytab from the server to the workstation where PDI is installed.

  4. On the Setup tab, select Direct and set Bootstrap servers to ${KAFKA_SASL_PLAINTEXT_URL}.

  5. On the Options tab, add these properties:

    Option
    Value

    compression.type

    none

    security.protocol

    SASL_PLAINTEXT

    sasl.mechanism

    GSSAPI

    sasl.kerberos.service.name

    ${KERBEROS_KAFKA_SERVICE_NAME}

    sasl.jaas.config

    ${SASL_JAAS_CONFIG}

  6. Select OK.

Sample ${SASL_JAAS_CONFIG}:

Using SASL SSL

  1. On the Setup tab, select Direct and set the URL to ${KAFKA_KERBEROS_SSL_URL}.

  2. On the Options tab, add these properties:

    Option
    Value

    compression.type

    none

    security.protocol

    SASL_SSL

    sasl.mechanism

    PLAIN

    sasl.kerberos.service.name

    ${KERBEROS_KAFKA_SERVICE_NAME}

    sasl.jaas.config

    ${SASL_JAAS_CONFIG}

    ssl.truststore.location

    $[Path to Trust store]

    ssl.truststore.password

    $[Password]

    ssl.keystore.location

    $[Path to Key store]

    ssl.keystore.password

    $[Key store password]

    ssl.key.password

    $[Key password]

    ssl.protocol

    TLS 1.2

  3. Select OK.

For details about Kafka Kerberos connectivity, see Confluent documentationarrow-up-right.

See also

Last updated

Was this helpful?