# Kafka Producer

The **Kafka Producer** step publishes messages in near real time to one Kafka topic.

Kafka records are stored within topics and consist of a category to which records are published. Topics are divided into logs known as partitions.

Before you use the Kafka Producer step, configure a named connection for your distribution. For details, see [Connecting to a Hadoop cluster with the PDI client](/pdia-data-integration/extracting-data-into-pdi/connecting-to-a-hadoop-cluster-with-the-pdi-client-article.md).

### Step name

* **Step name**: Specify the unique name of the step on the canvas. Default: `Kafka Producer`.

### Options

The Kafka Producer step includes these tabs:

* **Setup**
* **Options** (configuration properties)
* **Security**

#### Setup tab

![Kafka Producer step](/files/M8lpAdrxdWvh3TS8SAM1)

* **Connection**: Connection type.

  * **Direct**: Specify **Bootstrap servers**.
  * **Cluster**: Specify a **Hadoop cluster** configuration.

  In a Hadoop cluster configuration, you can specify host names and ports for HDFS, Job Tracker, security, and other big data cluster components.
* **Client ID**: Unique client identifier used to identify and set up a durable connection path to the server.
* **Topic**: Topic to publish to.
* **Key Field**: Optional key field.

  Kafka can distribute messages to partitions based on their keys. If no key is present, messages are randomly distributed to partitions.
* **Message Field**: Field that contains the message value.

#### Options tab

![Options tab](/files/0smRw4fHvzupbAgWe4nS)

Use this tab to configure Kafka producer properties.

* Property values can be encrypted.
* You can use PDI environment variables, `kettle.properties` variables, and parameter values.

For property details, see the [Apache Kafka documentation](https://kafka.apache.org/documentation/).

#### Security

You can implement security using SSL, SASL, or SASL SSL.

**Using SSL**

1. On the **Setup** tab, select **Direct** and set **Bootstrap servers** to `${KAFKA_ssl_url}`.
2. On the **Options** tab, add these properties:

   | Option                    | Value                    |
   | ------------------------- | ------------------------ |
   | `compression.type`        | `none`                   |
   | `ssl.truststore.location` | `$[Path to Trust store]` |
   | `ssl.truststore.password` | `$[Password]`            |
   | `ssl.keystore.location`   | `$[Path to Key store]`   |
   | `ssl.keystore.password`   | `$[Key store password]`  |
   | `ssl.key.password`        | `$[Key password]`        |
   | `security.protocol`       | `SSL`                    |
   | `ssl.protocol`            | `TLS 1.2`                |
3. Select **OK**.

**Using SASL**

SASL security requires the Kerberos configuration file `krb5.conf` and a Kerberos principal.

1. Copy `krb5.conf` to `${JAVA_HOME}/conf/security`.
2. Run `kinit ${KERBEROS_PRINCIPAL_KAFKA}` to obtain a Kerberos TGT.
3. Copy `${KERBEROS_PRINCIPAL_KAFKA}.keytab` from the server to the workstation where PDI is installed.
4. On the **Setup** tab, select **Direct** and set **Bootstrap servers** to `${KAFKA_SASL_PLAINTEXT_URL}`.
5. On the **Options** tab, add these properties:

   | Option                       | Value                            |
   | ---------------------------- | -------------------------------- |
   | `compression.type`           | `none`                           |
   | `security.protocol`          | `SASL_PLAINTEXT`                 |
   | `sasl.mechanism`             | `GSSAPI`                         |
   | `sasl.kerberos.service.name` | `${KERBEROS_KAFKA_SERVICE_NAME}` |
   | `sasl.jaas.config`           | `${SASL_JAAS_CONFIG}`            |
6. Select **OK**.

Sample `${SASL_JAAS_CONFIG}`:

```
com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true  debug=true doNotPrompt=true keyTab="Path to ${KERBEROS_PRINCIPAL_KAFKA}.keytab" principal="${Pricipal created in Kerberos for Kafka}";
```

**Using SASL SSL**

1. On the **Setup** tab, select **Direct** and set the URL to `${KAFKA_KERBEROS_SSL_URL}`.
2. On the **Options** tab, add these properties:

   | Option                       | Value                            |
   | ---------------------------- | -------------------------------- |
   | `compression.type`           | `none`                           |
   | `security.protocol`          | `SASL_SSL`                       |
   | `sasl.mechanism`             | `PLAIN`                          |
   | `sasl.kerberos.service.name` | `${KERBEROS_KAFKA_SERVICE_NAME}` |
   | `sasl.jaas.config`           | `${SASL_JAAS_CONFIG}`            |
   | `ssl.truststore.location`    | `$[Path to Trust store]`         |
   | `ssl.truststore.password`    | `$[Password]`                    |
   | `ssl.keystore.location`      | `$[Path to Key store]`           |
   | `ssl.keystore.password`      | `$[Key store password]`          |
   | `ssl.key.password`           | `$[Key password]`                |
   | `ssl.protocol`               | `TLS 1.2`                        |
3. Select **OK**.

For details about Kafka Kerberos connectivity, see [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_gssapi.html).

### See also

* [Kafka consumer](/pdia-data-integration/pdi-transformation-steps-reference-overview/kafka-consumer.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pentaho.com/pdia-data-integration/pdi-transformation-steps-reference-overview/kafka-producer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
