Snowflake
Supported pipeline types:
|
As recommended by Snowflake, the Snowflake destination stages data to one of the Snowflake-supported cloud platforms: Amazon S3 or Microsoft Azure. Then, the destination sends a command to Snowflake to process the staged data. Writing directly to Snowflake using the JDBC Producer destination is possible, but not recommended for performance reasons.
You can use the Snowflake destination to write new data or change data capture (CDC) data to Snowflake. When processing new data, the destination can load data to Snowflake using the COPY command or Snowpipe. When processing CDC data, the destination uses the MERGE command.
The Snowflake destination writes data from record fields to table columns based on matching names. The destination can compensate for data drift by creating new columns and tables in Snowflake when new fields and table references appear in records.
When you configure the Snowflake destination, you specify the Snowflake region, account and connection information and the number of connections to use to write to Snowflake. You can also define additional Snowflake connection properties as needed.
You configure the Snowflake warehouse, database, schema, and tables to use, and optionally enable properties to handle data drift. You specify load method properties and staging details, and optionally define advanced properties for Amazon S3 or Microsoft Azure.
You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can also configure the destination to replace missing fields or fields containing invalid data types with the specified default values, and to replace newline characters in string fields with a specified character.
Before you use the Snowflake destination, you must install the Snowflake stage library and complete other prerequisite tasks. The Snowflake stage library is an Enterprise stage library that is free for development purposes only. For information about purchasing the stage library for use in production, contact StreamSets.
When using the Snowflake destination, StreamSets recommends configuring the Snowflake warehouse to auto-resume upon receiving new queries.
Sample Use Cases
Here are a couple common scenarios for using the Snowflake destination:
- Replicating a database
-
Say you want to replicate data being written to five tables in an Oracle database schema. You want to write both existing data and incoming CDC data to Snowflake.
To do this, you create two pipelines, one to load the existing data and one to process incoming data, as follows:- First pipeline for replicating data - The first pipeline uses the
multithreaded JDBC Multitable Consumer origin to read from the
tables that you want to replicate. To take advantage of Snowflake’s
bulk load abilities, you configure the origin to use a very large
batch size - somewhere between 20,000 and 50,000 records per batch.
You set the number of threads to five to read data from all five
tables concurrently, and you increase the connection pool size to
five to allow writing to five tables in Snowflake concurrently.
In the pipeline, you use as many processors as needed to process the data. Then, you configure the Snowflake destination to load the data into Snowflake.
If you wanted the data to be immediately available after being written to Snowflake, you would use the default COPY command load method. But since you can tolerate a bit of latency, you use the faster, cheaper Snowpipe to load the data. Using Snowpipe requires performing some prerequisite steps in Snowflake.
After the initial load is complete, you stop the first pipeline and start the second pipeline to process incoming CDC data.
- Second pipeline for CDC data - In the second pipeline, you use the
Oracle CDC Client origin and the Snowflake destination. You
configure this origin to use a very large batch size as well,
somewhere between 20,000 and 50,000 records per batch.
In the destination, you select the Use CDC property to perform CRUD operations when writing to Snowflake. This results in the destination using the MERGE command to load data into Snowflake. You specify a field in the records that contains the table name to use when writing to Snowflake, and you define the key columns for each table.
To improve performance, you also increase the connection pool size. For more information, see Performance Optimization.
- First pipeline for replicating data - The first pipeline uses the
multithreaded JDBC Multitable Consumer origin to read from the
tables that you want to replicate. To take advantage of Snowflake’s
bulk load abilities, you configure the origin to use a very large
batch size - somewhere between 20,000 and 50,000 records per batch.
You set the number of threads to five to read data from all five
tables concurrently, and you increase the connection pool size to
five to allow writing to five tables in Snowflake concurrently.
- Offloading from Hadoop
- Say you have a Hadoop data lake that you want to move into Snowflake. In this case, you only need one pipeline that includes the multithreaded Hadoop FS Standalone origin, all of the processors that you need, and the Snowflake destination.
Prerequisites
Before you configure the Snowflake destination, complete the following prerequisites:
- Install the Snowflake stage library.
- Create a Snowflake external stage for staging data.
- To use Snowpipe, complete the Snowpipe prerequisites, as well.
Install the Snowflake Stage Library
You must install the Snowflake stage library before using the Snowflake destination. The Snowflake stage library includes the Snowflake JDBC driver that the destination uses to access Snowflake.
The Snowflake stage library is an Enterprise stage library that is free for development purposes only. For information about purchasing the stage library for use in production, contact StreamSets.
You can install the Enterprise stage library using Package Manager for a tarball Data Collector installation or as a custom stage library for a tarball, RPM, or Cloudera Manager Data Collector installation.
Supported Versions
Data Collector Version | Supported Stage Library Version |
---|---|
Data Collector 3.8.x | Snowflake Enterprise Library 1.0.1 or 1.0.2 |
Data Collector 3.7.x | Snowflake Enterprise Library 1.0.1 |
Installing with Package Manager
You can use Package Manager to install the Snowflake stage library on a tarball Data Collector installation.
Installing as a Custom Stage Library
You can install the Snowflake Enterprise stage library as a custom stage library on a tarball, RPM, or Cloudera Manager Data Collector installation.
Create a Snowflake External Stage
To write data to Snowflake, the Snowflake destination stages data to one of the Snowflake cloud service providers, Amazon S3 or Microsoft Azure. Before using the destination in a pipeline, you must create a Snowflake external stage.
- Amazon S3
- To stage data in Amazon S3, create the Snowflake external stage in a bucket in the same S3 region that hosts your Snowflake virtual warehouse. For example, if your Snowflake warehouse is in AWS US West, then create the Snowflake external stage in a bucket in the AWS US West region.
- Microsoft Azure
- To stage data in Microsoft Azure, complete the following tasks:
- Configure Snowflake authentication for the Microsoft Azure Blob
Storage container that you want to use.
You can use an SAS token or an Azure account name and key for authentication. For information about configuring SAS token authentication, see Configuring an Azure Container for Loading Data.
- Create a Snowflake external stage in the container.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Use a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.
For example, the following URL creates an external stage namedsdc-externalstage
inazure://myaccount.blob.core.windows.net/mycontainer/load/
and loads all staged data to Snowflake:azure://myaccount.blob.core.windows.net/mycontainer/load/sdc-externalstage/
You can create an Azure stage using the Snowflake web interface or SQL. For more information, see Creating an Azure Stage in the Snowflake documentation.
- Configure Snowflake authentication for the Microsoft Azure Blob
Storage container that you want to use.
Snowpipe Prerequisites
When processing new data, you can use Snowpipe, the Snowflake continuous ingestion engine, to load data to Snowflake tables. You cannot use Snowpipe to process CDC data.
- In Snowflake, create a pipe for Snowpipe to use to load data.
For more information, see Create a Pipe in the Snowflake documentation.
- In Snowflake, generate a private key PEM and a public key PEM.
For details, see Using Key-Pair Authentication in the Snowflake documentation. You do not need to generate JSON Web Tokens (JWT) as described in Step 5.
- In Snowflake, assign the public key to the Snowflake user account configured in
the stage.
You can use the Snowflake console or the ALTER USER command.
- Optionally, to secure the private key PEM and password, use runtime resources or credential stores.
- When you configure the destination, enter the private key PEM and password, and the public key PEM. Or, use a runtime resources or credential store function to reference the information.
Load Methods
- COPY command for new data
- The COPY command, the default load method, performs a bulk synchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables.
- Snowpipe for new data
- Snowpipe, the Snowflake continuous ingestion service, performs an asynchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables. When needed, you can configure the destination to use a custom Snowflake endpoint.
- MERGE command for CDC data
- Like the COPY command, the MERGE command performs a bulk synchronous load to Snowflake. But instead of treating all records as INSERT, it inserts, updates, and deletes records as appropriate. Use this method to write change data capture (CDC) data to Snowflake tables using CRUD operations.
For more information about Snowpipe or the COPY or MERGE commands, see the Snowflake documentation.
Performance Optimization
Use the following tips to optimize for performance and cost-effectiveness when using the Snowflake destination:
- Increase the batch size
- The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of Snowflake's bulk loading abilities, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed.
- Use multiple threads
- When writing to Snowflake using Snowpipe or the COPY command, you can use multiple threads to improve performance when you include a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently.
- Enable additional connections to Snowflake
- When writing to multiple Snowflake tables using the COPY or MERGE commands, increase the number of connections that the Snowflake destination makes to Snowflake. Each additional connection allows the destination to write to an additional table, concurrently.
Row Generation
When writing a record to a table, the Snowflake destination includes all record
fields in the resulting row, by default. The destination uses the root field,
/
, as the basis for the resulting row.
You can configure the Row Field property to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to Snowflake exists in a single map or list-map field within the record.
If you want to use the root field, but do not want to include all fields in the resulting row, you can configure the destination to ignore all specified first-level fields.
The Snowflake destination converts all map or list-map fields within the specified root field to the Snowflake Variant data type. The Snowflake destination fully supports the Variant data type.
By default, records with missing fields or with invalid data types in fields are treated as error records. You can configure the destination to replace missing fields and data of invalid types with user-defined default values. Then, you specify the default values to use for each data type. You can also configure the destination to replace newline characters in string fields with a replacement character.
Writing to Multiple Tables
You can use the Snowflake destination to write to multiple tables within a Snowflake schema. To write to multiple tables, you specify a field in the record that specifies the table to write to.
For example, say you have Snowflake tables named after departments in your company, such
as Operations, Sales, and Marketing. Also, the records being processed have a
dept
field with matching values. You can configure the Snowflake
destination to write records to the various tables by entering the following expression
in the Table property: ${record:value('/dept')}
.
When using the COPY or MERGE command to load data, you can configure the Snowflake
destination to automatically create tables when a new value appears in the specified
field. For example, if the dept
field suddenly includes an
Engineering
department, the destination can create a new
Engineering table in Snowflake for the new data. For more information, see Creating Columns and Tables for Data Drift.
When using a command to write to multiple Snowflake tables, you might also increase the number of connections that the destination uses for the write. For more information, see Performance Optimization.
Creating Columns and Tables for Data Drift
You can configure the Snowflake destination to automatically compensate for changes in column or table requirements, also known as data drift.
When enabled for data drift, the Snowflake
destination creates new columns in Snowflake tables when new fields appear in records.
For example, if a record suddenly includes a new Address2
field, the
destination creates a new Address2
column in the target table.
By default, the destination creates new columns based on the data in the new fields, such as creating a Double column for decimal data. You can, however, configure the destination to create all new columns as Varchar.
When data drift is enabled, you can also configure the destination to create new tables
as needed. For example, say the destination writes data to tables based on the region
name in the Region
field. When a new SW-3
region shows
up in a record, the destination creates a new SW-3
table in Snowflake
and writes the record to the new table.
To enable the automatic creation of new columns, select the Data Drift Enabled property on the Snowflake tab. Then, to enable the creation of new tables, select the Table Auto Create property.
Generated Data Types
When creating new tables or creating new columns in existing tables, the Snowflake destination uses field names to generate the new column names.
Record Field Data Type | Snowflake Column Data Type |
---|---|
Byte Array | Binary |
Char | Char |
String | Varchar |
Byte, Integer, Long, Short | Number |
Decimal, Double, Float | Double |
Boolean | Boolean |
Date | Date |
Datetime | Timestampntz |
Time | Time |
Zoned Datetime | Timestamptz |
Map, List-Map | Variant |
The Snowflake destination fully supports the Variant data type.
Define the CRUD Operation
The Snowflake destination can insert, update, or delete data when you configure the destination to process CDC data. When processing CDC data, the destination uses the MERGE command to write data to Snowflake.
- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
AWS Credentials
When the Snowflake destination stages data on Amazon S3, it must pass credentials to Amazon Web Services. Use AWS access key pairs to pass the credentials. The Snowflake destination does not support using IAM roles at this time.
To use AWS access key pairs, specify the Access Key ID and Secret Access Key properties in the destination.
Configuring a Snowflake Destination
Configure a Snowflake destination to write data to Snowflake tables. Before you use the destination in a pipeline, complete the required prerequisites.