Google Pub/Sub Subscriber
The Google Pub/Sub Subscriber origin consumes messages from a Google Pub/Sub subscription.
When you configure the origin, you define the Google Pub/Sub subscription ID to receive messages from. You also define the project and credentials provider to use to connect to Google Pub/Sub. The origin can retrieve credentials from the Google Application Default Credentials or from a Google Cloud service account credentials file.
The Google Pub/Sub Subscriber origin can use multiple threads to enable parallel processing of data from a Google Pub/Sub subscription.
When available, the Google Pub/Sub Subscriber origin includes user-defined message attributes in record header attributes.
Credentials
When the Google Pub/Sub Subscriber origin consumes messages from a Google Pub/Sub subscription, it must pass credentials to Google Pub/Sub. Configure the origin to retrieve the credentials from the Google Application Default Credentials or from a Google Cloud service account credentials file.
Default Credentials Provider
When configured to use the Google Application
Default Credentials, the origin checks for the credentials file defined in the
GOOGLE_APPLICATION_CREDENTIALS
environment variable. If the
environment variable doesn't exist and Data Collector is
running on a virtual machine (VM) in Google Cloud Platform (GCP), the origin uses the
built-in service account associated with the virtual machine instance.
For more information about the default credentials, see Google Application Default Credentials in the Google Developer documentation.
Complete the following steps to define the credentials file in the environment variable:
- Use the Google Cloud Platform Console or the
gcloud
command-line tool to create a Google service account and have your application use it for API access.For example, to use the command line tool, run the following commands:gcloud iam service-accounts create my-account gcloud iam service-accounts keys create key.json --iam-account=my-account@my-project.iam.gserviceaccount.com
- Store the generated credentials file on the Data Collector machine.
- Add the
GOOGLE_APPLICATION_CREDENTIALS
environment variable to the appropriate file and point it to the credentials file.Modify environment variables using the method required by your installation type.
Set the environment variable as follows:
export GOOGLE_APPLICATION_CREDENTIALS="/var/lib/sdc-resources/keyfile.json"
- Restart Data Collector to enable the changes.
- On the Credentials tab for the stage, select Default Credentials Provider for the credentials provider.
Service Account Credentials File (JSON)
When configured to use the Google Cloud service account credentials file, the origin checks for the file defined in the origin properties.
Complete the following steps to use the service account credentials file:
- Generate a service account credentials file in JSON
format.
Use the Google Cloud Platform Console or the
gcloud
command-line tool to generate and download the credentials file. For more information, see generating a service account credential in the Google Cloud Platform documentation. - Store the generated credentials file on the Data Collector machine.
As a best practice, store the file in the Data Collector resources directory,
$SDC_RESOURCES
. - On the Credentials tab for the stage, select Service Account Credentials File for the credentials provider and enter the path to the credentials file.
Multithreaded Processing
The Google Pub/Sub Subscriber origin can perform parallel processing and enables the creation of a multithreaded pipeline. The origin uses multiple concurrent threads based on the Num Pipeline Runners property.
When you start the pipeline, each thread connects to the origin system and creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and performs all pipeline processing after the origin.
Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline instances, the order that batches are written to destinations is not ensured.
For example, say you set the Num Pipeline Runners property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read-order.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Record Header Attributes
The Google Pub/Sub Subscriber origin includes user-defined message attributes in record header attributes when they are available. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
A Google Pub/Sub message contains a payload and optional attributes that describe the payload content. If the Google Pub/Sub Subscriber origin consumes a message with optional attributes, the origin includes the message attributes in record header attributes.
You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.
Data Formats
The Google Pub/Sub Subscriber origin processes data differently based on the data format. Google Pub/Sub Subscriber can process the following types of data:
- Avro
- Generates a record for every message. Includes a "precision" and "scale" field attribute for each Decimal field. For more information about field attributes, see Field Attributes.
- Binary
- Generates a record with a single byte array field at the root of the record.
- Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Tab-Separated Values - File that includes tab-separated values.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Google Pub/Sub Subscriber Origin
Configure a Google Pub/Sub Subscriber origin to consume messages from a Google Pub/Sub subscription.