Kafka Producer
Supported pipeline types:
|
When you configure a Kafka Producer, you define connection information, the partition strategy, and data format to use. You can also configure Kafka Producer to determine the topic to write to at runtime.
The Kafka Producer passes data to partitions in the Kafka topic based on the partition strategy that you choose. You can optionally write a batch of records to the Kafka cluster as a single message. When you want the destination to send responses to a microservice origin within a microservice pipeline, you specify the type of response to send.
You can add additional Kafka configuration properties as needed. You can also configure the origin to use Kafka security features.
You can configure the Kafka Producer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
Broker List
The Kafka Producer connects to Kafka based on the topic and associated brokers that you specify. To ensure a connection in case a specified broker goes down, list as many brokers as possible.
Runtime Topic Resolution
Kafka Producer can write a record to the topic based on an expression. When Kafka Producer evaluates a record, it calculates the expression based on record values and writes the record to the resulting topic.
When performing runtime topic resolution, Kafka Producer can write to any topic by default. You can create a white list of topics to limit the number of topics Kafka Producer attempts to use. When you create a white list, any record that resolves to an unlisted topic is sent to the stage for error handling. Use a white list when record data might resolve to invalid topic names.
Partition Strategy
The partition strategy determines how to write data to Kafka partitions. You can use a partition strategy to balance the work load or to write data semantically.
- Round-Robin
- Writes each record to a different partition using a cyclical order. Use for load balancing.
- Random
- Writes each record to a different partition using a random order. Use for load balancing.
- Expression
- Writes each record to a partition based on the results of the partition expression. Use to perform semantic partitioning.
- Default
- Writes each record using the default partition strategy that Kafka provides.
Send Microservice Responses
The Kafka Producer destination can send responses to a microservice origin when you use the destination in a microservice pipeline.
- All successfully written records.
- Responses from the destination system - For information about the possible responses, see the documentation for the destination system.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Producer destination.
When you add a Kafka configuration property, enter the exact property name and the value. The stage does not validate the property names or values.
Several properties are defined by default, you can edit or remove the properties as necessary.
- key.serializer.class
- metadata.broker.list
- partitioner.class
- producer.type
- serializer.class
Enabling Security
You can configure the Kafka Producer to connect securely to Kafka through SSL/TLS, Kerberos, or both.
In Data Collector Edge pipelines, the Kafka Producer destination supports only SSL/TLS.
Enabling SSL/TLS
Perform the following steps to enable the Kafka Producer to use SSL/TLS to connect to Kafka.
In Data Collector Edge pipelines, only the security.protocol,
ssl.truststore.location, and
ssl.keystore.location Kafka configuration properties are
valid. For the truststore and keystore locations, enter an absolute path for the
truststore and keystore files that use the PEM format. In Data Collector Edge pipelines, do not add any of the other SSL Kafka properties listed in these
instructions.
- To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation.
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
For example, the following properties allow the stage to use SSL/TLS to connect to Kafka with client authentication:

Enabling Kerberos (SASL)
When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka. Perform the following steps to enable the Kafka Producer destination to use Kerberos to connect to Kafka.
Not valid in Data Collector Edge pipelines. In Data Collector Edge pipelines, the Kafka Producer destination supports only SSL/TLS.
- To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation.
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
For example, the following Kafka properties enable connecting to Kafka with Kerberos:
Enabling SSL/TLS and Kerberos
You can enable Kafka Producer to use SSL/TLS and Kerberos to connect to Kafka.
Not valid in Data Collector Edge pipelines. In Data Collector Edge pipelines, the Kafka Producer destination supports using only SSL/TLS to connect
to Kafka.
- Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
Data Formats
The Kafka Producer destination writes data to Kafka based on the data format that you select.
In Data Collector Edge pipelines,
the destination supports only the Binary, JSON, SDC Record, and Text data formats.
- Avro
- The destination writes records based on the Avro schema.
- Binary
- The stage writes binary data to a single field in the record.
- Delimited
- The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
- JSON
- The destination writes records as JSON data. You can use one of
the following formats:
- Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
- Protobuf
- Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
- SDC Record
- The destination writes records in the SDC Record data format.
- Text
- The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use. When necessary, merge record data into the field earlier in the pipeline.
- XML
- The destination creates a valid XML document for each record. The
destination requires the record to have a single root field that
contains the rest of the record data. For details and
suggestions for how to accomplish this, see Record Structure Requirement.
The destination can include indentation to produce human-readable documents. It can also validate that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.
Configuring a Kafka Producer
Configure a Kafka Producer to write data to a Kafka cluster.