Hadoop FS Standalone
Supported pipeline types:
|
The files to be processed must all share a file name pattern and be fully written. The origin can use multiple threads to enable the parallel processing of files. Use the Hadoop FS Standalone origin only in pipelines configured for standalone execution mode. To read from HDFS in cluster execution mode, use the Hadoop FS origin.
When you configure the Hadoop FS Standalone origin, you define the directory to use, read order, file name pattern, file name pattern mode, and the first file to process. You can use glob patterns or regular expressions to define the file name pattern that you want to use.
When using the last-modified timestamp read order, you can configure the origin to read from subdirectories. To use multiple threads for processing, specify the number of threads to use. You can also enable reading compressed files. After processing a file, the Hadoop FS Standalone origin can keep, archive, or delete the file.
When the pipeline stops, the Hadoop FS Standalone origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested files.
The origin generates record header attributes that enable you to use the origins of a record in pipeline processing.
When necessary, you can enable Kerberos authentication. You can also specify a Hadoop user to impersonate, define a Hadoop configuration file directory, and add Hadoop configuration properties as needed.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
File Directory
To define the directory that the Hadoop FS Standalone origin reads files from, enter an absolute directory. Use a glob pattern to include wildcards and define multiple directories to read files from.
/hr/employees/CA/SFO
/hr/employees/CA/SJC
/hr/employees/CA/LAX
/hr/employees/WA/SEA
Folders to Read | File Directory Defined |
---|---|
/hr/employees/CA/SFO /hr/employees/CA/SJC /hr/employees/CA/LAX |
/hr/employees/CA/* |
/hr/employees/CA/SFO /hr/employees/CA/SJC |
/hr/employees/CA/S* |
/hr/employees/CA/SFO /hr/employees/CA/SJC /hr/employees/WA/SEA |
/hr/employees/*/S* |
For more information about glob patterns, see the Oracle Java documentation.
File Name Pattern and Mode
Use a file name pattern to define the files that the Hadoop FS Standalone origin processes. You can use either a glob pattern or a regular expression to define the file name pattern.
The Hadoop FS Standalone origin processes files based on the file name pattern mode, file
name pattern, and specified directory. For example, if you specify a
/logs/weblog/
directory, glob mode, and *.json
as the file name pattern, the origin processes all files with the "json" extension
in the /logs/weblog/
directory.
The origin processes files in order based on the specified read order.
For more information about glob syntax, see the Oracle Java documentation. For more information about regular expressions, see Regular Expressions Overview.
Read Order
The Hadoop FS Standalone origin reads files in ascending order based on the timestamp or file name:
- Last Modified Timestamp
- The Hadoop FS Standalone origin can read files in ascending order based on the last modified timestamp associated with the file. When the origin reads from a secondary location - not the directory where the files are created and written - the last-modified timestamp should be when the file is moved to the directory to be processed.
- Lexicographically Ascending File Names
- The Hadoop FS Standalone origin can read files in lexicographically ascending order based on file names. Note that lexicographically ascending order reads the numbers 1 through 11 as follows:
Multithreaded Processing
The Hadoop FS Standalone origin uses multiple concurrent threads to process data based on the Number of Threads property.
Each thread reads data from a single file, and each file can have a maximum of one thread read from it at a time. The file read order is based on the configuration for the Read Order property.
As the pipeline runs, each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and performs all pipeline processing after the origin.
Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you configure the origin to read files from a directory using 5 threads and the Last Modified Timestamp read order. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners.
The Hadoop FS Standalone origin assigns a thread to each of the five oldest files in the directory. Each thread processes its assigned file, passing batches of data to the origin. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
After each thread completes processing a file, it continues to the next file based on the last-modified timestamp, until all files are processed.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Reading from Subdirectories
When using the Last Modified Timestamp read order, the Hadoop FS Standalone origin can read files in subdirectories of the specified file directory.
When you configure the origin to read from subdirectories, it reads files from all subdirectories. It reads files in ascending order based on timestamp, regardless of the location of the file within the directory.
File Name
|
Directory
|
Last Modified Timestamp
|
log-1.json
|
/logs/west/
|
APR 24 2016 14:03:35
|
log-0054.json
|
/logs/east/
|
APR 24 2016 14:05:03
|
log-0055.json
|
/logs/west/
|
APR 24 2016 14:45:11
|
log-2.json
|
/logs/
|
APR 24 2016 14:45:11
|
Post-Processing Subdirectories
When the Hadoop FS Standalone origin reads from subdirectories, it uses the subdirectory structure when archiving files during post-processing.
You can archive files when the origin completes processing a file or when it cannot fully process a file.
File Name
|
Archive Directory
|
log-1.json
|
/processed/logs/west/
|
log-0054.json
|
/processed/logs/east/
|
log-0055.json
|
/processed/logs/west/
|
log-2.json
|
/processed/logs/
|
First File for Processing
Configure a first file for processing when you want Hadoop FS Standalone to ignore one or more existing files in the directory.
When you define a first file to process, the Hadoop FS Standalone origin starts processing with the specified file and continues based on the read order and file name pattern. When you do not specify a first file, the origin processes all files in the directory that match the file name pattern.
For example, say the Hadoop FS Standalone origin reads files based on last-modified timestamp. To ignore all files older than a particular file, use that file name as the first file to process.
Similarly, say you have the origin reading files based on lexicographically ascending file names, and the file directory includes the following files: web_001.log, web_002.log, web_003.log.
If you configure web_002.log as the first file, the origin reads web_002.log and continues to web_003.log. It skips web_001.log.
Reading from Azure Data Lake Storage
The Hadoop FS Standalone origin can read data directly from Azure Data Lake Storage using the ADL protocol provided by Hadoop. The origin can connect using Azure Active Directory Service Principal or refresh-token authentication.
- Configure Azure credentials in one of the
following ways:
- If the Azure credentials are defined in the HDFS
configuration file
core-site.xml
, configure access to the file.- On the Hadoop FS tab, configure the Hadoop FS Configuration Directory property to point to the directory that includes the file.
For information about defining the Azure credentials in the
core-site.xml
file, see the Hadoop documentation. - If the credentials are not defined in the
core-site.xml
file, use Hadoop FS configuration properties to pass Azure credentials:- On the Hadoop FS tab, click
the Add icon to add new
Hadoop FS configuration properties, for the
required credentials.
You can use simple or bulk edit mode to add configuration properties.
- Enter the property names and values for the
credential provider that you want to use. To connect using Azure Active Directory refresh tokens, configure the following properties:
fs.adl.oauth2.access.token.provider.type
- Set to RefreshToken.fs.adl.oauth2.client.id
- Set to the application ID.fs.adl.oauth2.refresh.token
- Set to an OAuth2 refresh token generated through Azure Active Directory. For information on generating a refresh token, see the Microsoft documentation.
fs.adl.oauth2.access.token.provider.type
- Set toClientCredential
.fs.adl.oauth2.refresh.url
- Set to the OAuth 2 token endpoint URL for Azure Active Directory.fs.adl.oauth2.client.id
- Set to the application ID.fs.adl.oauth2.credential
- Set to a generated application key.
For tips on locating and generating the required information in the Azure portal, see the Hadoop documentation.
- On the Hadoop FS tab, click
the Add icon to add new
Hadoop FS configuration properties, for the
required credentials.
- If the Azure credentials are defined in the HDFS
configuration file
- In the origin, on the Hadoop FS tab, configure
the Hadoop FS URI property using the following
structure:
adl://<account name>.azuredatalakestore.net/
In the URI, <account name> is the name of the Azure Data Lake Storage account.
Reading from Azure HDInsight
You can use the HDP stage libraries to access Azure blob storage using the WASB protocol. This enables the Hadoop FS Standalone origin to read from Azure HDInsight.
To read from an Azure HDInsight cluster, Data Collector can be installed anywhere. It can be installed on a node in the HDInsight cluster or outside of the cluster entirely.
- On the General tab of the Hadoop FS Standalone origin, for the Stage Library property, select the HDP stage library version 2.4 or later.
- Configure Azure credentials in one of the following ways:
- If the Azure credentials are defined in the HDFS configuration file
core-site.xml
, configure the origin to access the file.- On the Hadoop FS tab, configure the Hadoop FS Configuration Directory property to point to the directory that includes the file.
- If the credentials are not defined in the
core-site.xml
file, use a Hadoop FS configuration property to pass the Azure credentials:- In the origin, on the Hadoop FS tab, click
the Add icon to add a new Hadoop FS
configuration property.
You can use simple or bulk edit mode to add configuration properties.
- Enter the following property name, using the Azure storage
account name for <storage account
name>:
For example, if the storage account name isfs.azure.account.key.<storage account name>.blob.core.windows.net
sdchd
, then enter the following name for the property:fs.azure.account.key.sdchd.blob.core.windows.net
Tip: You can find the Azure storage account name on the Access Keys page in the Microsoft Azure portal. To view the page in the Microsoft Azure portal, click . A page like the following appears, with the storage account name and access keys: - For the value of the Hadoop FS Configuration property, enter an
access key value for the Azure storage account. You can use any
valid key.Tip: The account key value also displays on the Access Keys page. For example, on the image above, you could use either the key1 or key2 value.
- In the origin, on the Hadoop FS tab, click
the Add icon to add a new Hadoop FS
configuration property.
- If the Azure credentials are defined in the HDFS configuration file
- In the origin, on the Hadoop FS tab, configure the
Hadoop FS URI property using the following
structure:
<wasb[s]>://<container name>@<storage account name>.blob.core.windows.net/<path to files>
In the URI, <container name> is the Azure container name. And <storage account name> is the same Azure storage account name that you used for the Hadoop FS configuration property.
For example, for asdc-hd
container in a storage account namedsdchd
, with all files in a files directory, you would define the Hadoop FS URI as follows:wasbs://sdc-hd@sdchd.blob.core.windows.net/files
Tip: You can find the container name and storage account name on the Essentials page in the Microsoft Azure portal. For a standard storage account, in the Microsoft Azure portal, click . For a blob storage account, click .A page like the following displays with the container name and storage account name:
Though the host name for the Hadoop FS URI is<storage account name>.blob.core.windows.net
, you can alternatively use the host name of the Azure blob service endpoint as the hostname for the Hadoop FS URI.
Example
The following image shows how to configure the Hadoop FS Standalone origin to read from HDInsight using the Azure account information in the examples above:
Record Header Attributes
The Hadoop FS Standalone origin creates record header attributes that include information about the originating file for the record.
When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.
- avroSchema - When processing Avro data, provides the Avro schema.
- baseDir - Base directory containing the file where the record originated.
- filename - Provides the name of the file where the record originated.
- file - Provides the file path and file name where the record originated.
- mtime - Provides the last-modified time for the file.
- offset - Provides the file offset in bytes. The file offset is the location in the file where the record originated.
- atime - Provides the last accessed time.
- isDirectory - Indicates if the file is a directory.
- isSymbolicLink - Indicates if the file is a symbolic link.
- size - Provides the file size.
- owner - Provides the file owner.
- group - Provides the group associated with the file.
- blocksize - Provides the block size of the file.
- replication - Provides the replication of the file.
- isEncrypted - Indicates if the file is encrypted.
Event Generation
The Hadoop FS Standalone origin can generate events that you can use in an event stream. When you enable event generation, the origin generates event records each time the origin starts or completes reading a file. It can also generate events when it completes processing all available data and the configured batch wait time has elapsed.
- With the Pipeline Finisher executor to
stop the pipeline and transition the pipeline to a Finished state when
the origin completes processing available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.
For an example, see Case Study: Stop the Pipeline.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Case Study: Sending Email.
- With a destination to store event information.
For an example, see Case Study: Event Storage.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Records
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses one of the following types:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The Hadoop FS Standalone origin can generate the following types of event records:
- new-file
- The Hadoop FS Standalone origin generates a new-file event record when it starts processing a new file.
- finished-file
- The Hadoop FS Standalone origin generates a finished-file event record when it finishes processing a file.
- no-more-data
- The Hadoop FS Standalone origin generates a no-more-data event record when the origin completes processing all available records and the number of seconds configured for Batch Wait Time elapses without any new files appearing to be processed.
Buffer Limit and Error Handling
The Hadoop FS Standalone origin passes each record to a buffer. The size of the buffer determines the maximum size of the record that can be processed. Decrease the buffer limit when memory on the Data Collector machine is limited. Increase the buffer limit to process larger records when memory is available.
- Discard
- The origin discards the record and all remaining records in the file, and then continues processing the next file.
- Send to Error
- With a buffer limit error, the origin cannot send the record to the pipeline
for error handling because it is unable to fully process the record.
Instead, the origin creates a message stating that a buffer overrun error occurred. The message includes the file and offset where the buffer overrun error occurred. The information displays in the pipeline history.
If an error directory is configured for the stage, the origin moves the file to the error directory and continues processing the next file.
- Stop Pipeline
- The origin stops the pipeline and creates a message stating that a buffer overrun error occurred. The message includes the file and offset where the buffer overrun error occurred. The information displays in the pipeline history.
Kerberos Authentication
You can use Kerberos authentication to connect to HDFS. When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to HDFS. By default, Data Collector uses the user account who started it to connect.
The Kerberos principal and keytab are defined in the Data Collector
configuration file, $SDC_CONF/sdc.properties
. To use Kerberos
authentication, configure all Kerberos properties in the Data Collector
configuration file, and then enable Kerberos in the Hadoop FS Standalone origin.
For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication in the Data Collector documentation.
HDFS Properties and Configuration Files
- HDFS configuration files
- You can use the following HDFS configuration files with the Hadoop FS
Standalone origin:
- core-site.xml
- hdfs-site.xml
- Individual properties
- You can configure individual HDFS properties in the origin. To add an HDFS
property, you specify the exact property name and the value. The Hadoop FS
Standalone origin does not validate the property names or
values.Note: Individual properties override properties defined in HDFS configuration files.
Impersonation User
Data Collector can either use the currently logged in Data Collector user or a user configured in the Hadoop FS Standalone origin to read from HDFS.
A Data Collector configuration property can be set that requires using the currently logged in Data Collector user. When this property is not set, you can specify a user in the origin. For more information about Hadoop impersonation and the Data Collector property, see Hadoop Impersonation Mode in the Data Collector documentation.
Note that the origin uses a different user account to connect to HDFS. By default, Data Collector uses the user account who started it to connect to external systems. When using Kerberos, Data Collector uses the Kerberos principal.
- On Hadoop, configure the user as a proxy user and
authorize the user to impersonate a Hadoop user.
For more information, see the Hadoop documentation.
- In the Hadoop FS Standalone origin, on the Connection tab, configure the Impersonation User property.
Data Formats
- Avro
- Generates a record for every Avro record. The origin includes the Avro schema in the
avroSchema
record header attribute. It also includes aprecision
andscale
field attribute for each Decimal field. - Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- Whole File
- Streams whole files from the origin system to the destination system. You can specify a transfer rate or use all available resources to perform the transfer.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.