Kafka Multitopic Consumer
Supported pipeline types:
|
When you configure a Kafka Multitopic Consumer, you configure the consumer group name and the brokers to use. You also specify the topics to process and the number of threads to use. In Kafka, make sure that the partition assignment strategy is configured appropriately.
You can configure the origin to produce a single record when a message includes multiple objects. And you can add additional Kafka configuration properties as needed, including Kafka security features.
When processing Avro data, you can configure the Kafka Multitopic Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
Kafka Multitopic Consumer includes record header attributes that enable you to use information about the record in pipeline processing.
Offset Management
The first time that a Kafka Multitopic Consumer origin identified by a consumer group receives messages from a topic, an offset entry is created for that consumer group and topic. The offset entry is created in Kafka.
- No stored offset
- When the consumer group and topic combination does not have a previously stored offset, the Kafka Multitopic Consumer origin uses the Auto Offset Reset property to determine the first message to read. You can set the origin to read messages in the topic starting from the earliest message, latest message, or a particular timestamp. The default setting is the earliest message, which results in the origin reading all existing messages in the topic.
- Previously stored offset
- When the consumer group and topic combination has a previously stored offset, the Kafka Multitopic Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Multithreaded Processing
The Kafka Multitopic Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline. The Kafka Multitopic Consumer origin uses multiple concurrent threads based on the Number of Threads property and the partition assignment strategy defined in the Kafka cluster.
When performing multithreaded processing, the Kafka Multitopic Consumer origin checks the list of topics to process and creates the specified number of threads. Each thread connects to Kafka and creates a batch of data from a partition assigned by the broker based on the Kafka partition assignment strategy. Then, it passes the batch to an available pipeline runner.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and represents all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. The threads are assigned to different partitions based on the Kafka partition assignment strategy. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview. For more information about the Kafka partition assignment strategies, see the Kafka documentation.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Multitopic Consumer.
When you add the Kafka configuration property, enter the exact property name and the value. The Kafka Multitopic Consumer does not validate the property names or values.
- auto.commit.interval.ms
- bootstrap.servers
- enable.auto.commit
- group.id
- max.poll.records
Record Header Attributes
The Kafka Multitopic Consumer origin creates record header attributes that include information about the originating file for the record. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.
- avroSchema - When processing Avro data, provides the Avro schema.
- offset - The offset where the record originated.
- partition - The partition where the record originated.
- topic - The topic where the record originated.
Enabling Security
You can configure the Kafka Multitopic Consumer origin to connect securely to Kafka through SSL/TLS, Kerberos, or both.
Enabling SSL/TLS
- To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation.
- On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
For example, the following properties allow the stage to use SSL/TLS to connect to Kafka with client authentication:

Enabling Kerberos (SASL)
When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab.
- To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation.
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
For example, the following Kafka properties enable connecting to Kafka with Kerberos:
Enabling SSL/TLS and Kerberos
You can enable the Kafka Multitopic Consumer origin to use SSL/TLS and Kerberos to connect to Kafka.
- Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
- Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
- Add the Java Authentication and Authorization
Service (JAAS) configuration properties required for Kafka clients based on your
installation and authentication type:
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
KafkaClient
login section to the file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:-Djava.security.auth.login.config=<JAAS config path>
Modify environment variables using the method required by your installation type.
- RPM or tarball installation with LDAP
authentication - If LDAP authentication is enabled in an
RPM or tarball installation, add the properties to the JAAS
configuration file used by Data Collector - the
$SDC_CONF/ldap-login.conf
file. Add the followingKafkaClient
login section to the end of theldap-login.conf
file:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/sdc.keytab" principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };
- Cloudera Manager installation with LDAP
authentication - If LDAP authentication is enabled in a
Cloudera Manager installation, enable the LDAP Config File Substitutions
(ldap.login.file.allow.substitutions) property for the StreamSets
service in Cloudera Manager.
If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.
If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.
Add the following
KafkaClient
login section to the appropriate field as follows:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
For example:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="sdc/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
- RPM, tarball, or Cloudera Manager installation without LDAP
authentication - If Data Collector does
not use LDAP authentication, create a separate JAAS configuration file
on the Data Collector
machine. Add the following
- On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
- Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
- Then add and configure the following SSL Kafka
properties:
- ssl.truststore.location
- ssl.truststore.password
When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:- ssl.keystore.location
- ssl.keystore.password
- ssl.key.password
Some brokers might require adding the following properties as well:- ssl.enabled.protocols
- ssl.truststore.type
- ssl.keystore.type
For details about these properties, see the Kafka documentation.
Data Formats
The Kafka Multitopic Consumer origin processes data differently based on the data format. Kafka Multitopic Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.