Kafka Consumer
The Kafka Consumer origin reads data from a single topic in an Apache Kafka cluster. To use multiple threads to read from multiple topics, use the Kafka Multitopic Consumer.
When you configure a Kafka Consumer, you configure the consumer group name, topic, and ZooKeeper connection information.
You can configure the Kafka Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
You can add additional Kafka configuration properties as needed. You can also configure the origin to use Kafka security features.
Kafka Consumer includes record header attributes that enable you to use information about the record in pipeline processing.
Offset Management
The first time that a Kafka Consumer origin identified by a consumer group receives messages from a topic, an offset entry is created for that consumer group and topic. The offset entry is created in ZooKeeper or Kafka, depending on your Kafka version and broker configuration.
- No stored offset
- When the consumer group and topic combination does not have a previously stored offset, the Kafka Consumer origin by default receives messages sent to the topic after the pipeline starts, processing data from all partitions and ignoring any existing messages in the topic.
- Previously stored offset
- When the consumer group and topic combination has a previously stored offset, the Kafka Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Setting the Initial Offset
You can set an initial offset for the consumer group so that the Kafka Consumer origin reads the topic from the beginning.
By default when the consumer group and topic combination does not have a previously stored offset, the Kafka Consumer origin reads only messages received after the pipeline starts.
- On the Kafka tab, click the Add icon
to add a new Kafka configuration property.
You can use simple or bulk edit mode to add configuration properties.
- For the property name, enter auto.offset.reset.
- Set the value for the auto.offset.reset property to
earliest.
For more information about auto.offset.reset, see the Apache Kafka documentation.
For more information about adding custom Kafka configuration properties, see Additional Kafka Properties.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Consumer.
When you add the Kafka configuration property, enter the exact property name and the value. The Kafka Consumer does not validate the property names or values.
- auto.commit.enable
- group.id
- zookeeper.connect
Record Header Attributes
The Kafka Consumer origin creates record header attributes that include information about the originating file for the record. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.
- avroSchema - When processing Avro data, provides the Avro schema.
- offset - The offset where the record originated.
- partition - The partition where the record originated.
- topic - The topic where the record originated.
Enabling Security
You can configure the Kafka Consumer origin to connect securely through SSL/TLS, Kerberos, or both.
Enabling SSL/TLS
Perform the following steps to enable the Kafka Consumer origin to use SSL/TLS to connect to Kafka. You can use the same steps to configure a Kafka Producer.
- To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation.
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
For example, the following properties allow the stage to use SSL/TLS to connect to Kafka with client authentication:

Enabling Kerberos (SASL)
When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka.
Perform the following steps to enable the Kafka Consumer origin to use Kerberos to connect to Kafka:
- To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation.
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
For example, the following Kafka properties enable connecting to Kafka with Kerberos:
Enabling SSL/TLS and Kerberos
You can enable the Kafka Consumer origin to use SSL/TLS and Kerberos to connect to Kafka.
- Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the General tab of the stage, set the Stage Library property to the appropriate Apache Kafka version.
- On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
Data Formats
The Kafka Consumer origin processes data differently based on the data format. Kafka Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a "precision" and "scale" field attribute for each Decimal field. For more information about field attributes, see Field Attributes.
- Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Tab-Separated Values - File that includes tab-separated values.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Kafka Consumer
Configure a Kafka Consumer origin to read messages from a Kafka cluster.