PostgreSQL JDBC Table

The PostgreSQL JDBC Table origin reads data from a PostgreSQL table. The origin can read all of the columns from a table or only the specified columns from a table. To read from one or more tables using a custom query, use the JDBC Query origin.

When you configure the PostgreSQL JDBC Table origin, you specify database connection information and any additional JDBC configuration properties you want to use. You can also use a connection to configure the origin.

You configure the table to read, and optionally define the columns to read from the table. You specify the offset column and the number of partitions used to read from the database table. The data type allowed for the offset column depends on the specified number of partitions. You can also configure an additional predicate for the query.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets, which enables reading the entire data set each time you start the pipeline.

You can optionally configure advanced properties such as specifying the fetch size, custom offset queries, and the JDBC driver to bundle with the pipeline.

StreamSets has tested this origin on PostgreSQL 9.6.2 with the PostgreSQL 42.2.5 driver.

PostgreSQL JDBC Driver

The PostgreSQL JDBC Table origin includes the PostgreSQL JDBC driver, so you do not need to install a driver before using the origin.

If you want to use a custom driver, you can install it as an external library for the JDBC stage library.

By default, Transformer bundles a JDBC driver into the launched Spark application so that the driver is available on each node in the cluster. If you prefer to manually install an appropriate JDBC driver on each Spark node, you can configure the stage to skip bundling the driver on the Advanced tab of the stage properties.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel.

For the PostgreSQL JDBC Table origin, Transformer determines the partitioning based on the number of partitions that you configure for the origin. Spark creates one connection to the database for each partition.

Spark uses these partitions throughout the pipeline unless a processor causes Spark to shuffle the data. When you need to change the partitioning in the pipeline, use the Repartition processor.

Consider the following when you define the number of partitions for the origin to use:
  • The size and configuration of the cluster.
  • The amount of data being processed.
  • The number of concurrent connections that can be made to the database.

If the pipeline fails because the origin encounters an out of memory error, you likely need to increase the number of partitions for the origin.

Offset Column Requirement

The PostgreSQL JDBC Table origin uses a single offset column. The offset column should contain unique, incremental values, and should not contain null values. The origin does not process records with null offset values. The offset column must also be of a supported data type.

When a table includes a single primary key column, the origin uses it as the offset column, by default.

You can configure the origin to use a different offset column. You might specify an alternate offset column when the table uses a composite key or when the data type of the primary key column is not supported.

The PostgreSQL JDBC Table origin uses the offset column to perform two tasks:
Create partitions
When creating partitions, the origin determines the data to be processed and then divides the data into partitions based on ranges of offset values.

For example, say you have rows with integer offsets from 1 to 1000 and you configure the origin to create two partitions. The first partition might include records with offsets from 1-500, and the second partition, the offsets from 501-1000.

Track processing
The origin tracks processing using values in the offset column. When reading the last row in a batch, the origin saves the value from the offset column. In the subsequent batch, the origin starts reading from the following row.
For example, if the first batch in a pipeline includes offsets 1-250 based on an integer offset column, the origin creates the next batch starting from offset 251. Similarly, say a batch pipeline stops after processing offset 250. The next time the pipeline runs, the origin begins processing with offset 251, unless you reset the offset.
Note: By default, the origin does not process records with null offset values. You can configure the origin to process those records.

Supported Offset Data Types

The supported data types for an offset column differ based on the number of partitions that you want the origin to use when reading the data.

The following table lists the PostgreSQL data types that are supported for the offset column:
Partitions Supported Offset Data Type
One partition
  • Char
  • Text
  • Varchar
One or more partitions
  • Bigint
  • Date
  • Decimal
  • Double Precision
  • Float
  • Integer
  • Numeric
  • Real
  • Smallint
  • Time
  • Timestamp
  • Timestamp with Time Zone

Null Offset Value Handling

By default, the PostgreSQL JDBC Table origin does not process records with null offset values. You can configure the origin to process those records by enabling the Partition for NULL property on the Advanced tab.

When you enable the Partition for NULL property, the origin queries the table for rows with null offset values, then groups the resulting records into a single partition. As a result, when the table includes null offset values, each batch of data contains a partition of records with null offset values.

Note: Null offset values preclude the origin from differentiating between records, so duplicate null offset records are likely to be included in each null partition and, therefore, written with each batch. Take care when using the Partition for NULL property, especially in streaming pipelines.

Default Offset Queries

The PostgreSQL JDBC Table origin uses two offset queries to determine the offset values to use when querying the database. The default queries work for most cases. On the rare occasion when you want to produce different results, you can configure custom offset queries to override the default queries.

The origin uses the following offset queries, by default:
Min/max offset query
This query returns the minimum and maximum values in the offset column. The origin uses these values to process all existing data in the table.
The origin uses this query to create the initial batch for a pipeline – that is, the first time that a batch pipeline runs or the first batch of a streaming pipeline. The origin also uses this query after you reset the offsets for a pipeline.
Max offset query
This query returns the maximum offset in the offset column. The origin uses this value along with the last-saved offset to process all new data that arrived since processing the last batch.
The origin uses this query to create subsequent batches for a pipeline – that is, the second and subsequent runs of a batch pipeline or the second and subsequent batches of a streaming pipeline.

Custom Offset Queries

Configure custom offset queries to override the default offset queries for the PostgreSQL JDBC Table origin.

The origin allows configuring the following custom offset queries:
Custom Min/Max Query

Returns the minimum and maximum value to use as offsets when querying the database. Configure this query to override the default min/max query that generates the first batch processed by the pipeline.

The custom min/max query is a Spark SQL query that returns one row with two values: the first is the minimum offset, the second is the maximum offset. The origin uses the return values to determine the range of data to query. The offset range is inclusive.

For example, say your custom query returns 10 for min and 200 for max, and the origin is in a batch pipeline. The first time that the pipeline runs, the origin processes all records with offsets between 10 and 200, inclusive. The second time it runs, it begins processing after the 200 offset value.

Custom Max Query
Returns a maximum value to use as an offset when querying the database. Configure this query to override the default max query that generates the max offset to process after the first batch.

While a custom min/max query defines the range of data to process in the first batch of the pipeline, a custom max query defines the maximum offset to process throughout the pipeline run.

The custom max query is a Spark SQL query that returns a single value. Note that the origin continues to check for new data to process, even after processing the custom max offset value.

Specify a custom max query along with the custom min/max query to define a range of data for the pipeline to process, such as data generated in 2019.

For example, say you want to process only the data with offsets 1000 to 8000, inclusive. And you want the first batch to process one thousand records. To do this, you configure the custom min/max query to return 1000 and 2000. This sets the lower boundary of the data that is processed and defines the size of the first batch. To set the upper boundary of the data that is processed, you set the custom max query to 8000.

In the first batch, the origin processes records with offsets between 1000 and 2000, inclusive. In the second batch, the origin processes any new records with offsets between 2001 and 8000, inclusive. Now, say the last record in the second batch has an offset value of 2500. Then, the third batch processes any new records with offsets between 2501 and 8000, and so on.

PostgreSQL Data Types

The following table lists the PostgreSQL data types that the PostgreSQL JDBC Table origin supports and the Transformer data types they are converted to.

PostgreSQL data types not listed in the table are not supported.

PostgreSQL Data Type Transformer Data Type
Bigint Long
Bit, Bit Varying, Bytea Binary
Boolean Boolean
Box, Circle, Interval, Line, Lseg, Path, Point, Polygon String
Char, Text, Varchar String
Cidr, Inet, Macadr String
Date Date
Daterange, Int4range, Int8range, Numrange, Tsrange, Tstzrange String
Decimal, Numeric Decimal
Double Precision, Money Double
Integer Integer
Json, Jsonb, Uuid, Xml String
Real Float
Smallint Short
Time, Time with Time Zone, Timestamp, Timestamp with Time Zone Timestamp

Configuring a PostgreSQL JDBC Table Origin

Configure an PostgreSQL JDBC Table origin to read data from a PostgreSQL table.
  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Load Data Only Once Reads data in a single batch and caches the results for reuse. Use to perform lookups in streaming execution mode pipelines.

    When using the origin to perform lookups, do not limit the batch size. All lookup data should be read in a single batch.

    This property is ignored in batch execution mode.

    Cache Data Caches processed data so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, it also caches data.

    Skip Offset Tracking Skips tracking offsets.

    In a streaming pipeline, the origin reads all available data with each batch.

    In a batch pipeline, the origin reads all available data each time the pipeline starts.

  2. On the Connection tab, configure the following properties:
    Connection Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    JDBC Connection String

    Connection string used to connect to the database.

    Use the connection string format required by the database:
    • PostgreSQL - jdbc:postgresql://<host>:<port>/<dbname>

      For example: jdbc:postgresql://33333:2222/mydb

    • Amazon Aurora PostgreSQL - jdbc:postgresql:aws://<instance_name>.<cluster_id>.<aws_region>.rds.amazonaws.com:<port>/

      For example: jdbc:postgresql:aws://instance.1234abcd.us-west-1.rds.amazonaws.com:1234/

    Note: If you include the JDBC credentials in the connection string, use a user account created for the origin. The user must have the necessary permissions or privileges for the database.
    Use Credentials

    Enables entering credentials. Use when you do not include credentials in the JDBC connection string.

    Username PostgreSQL username.

    The specified user must have the necessary permissions or privileges for the database.

    Password PostgreSQL password.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use.

    To add properties, click Add and define the JDBC property name and value. You can use simple or bulk edit mode to configure the properties.

    Use the property names and values as expected by JDBC.

  3. On the Table tab, configure the following properties:
    Table Property Description
    Schema Name of the schema in which the table is located. Define when the database requires a schema.
    Table Database table to read.
    Offset Column Table column used to create partitions and track processed rows. The offset column must be of a supported type.

    The offset column should contain unique, incremental values, and should not contain null values. The origin does not process records with null offset values.

    By default, the origin uses the primary key column as the offset column. Specify another column as needed.

    Number of Partitions Number of partitions used when reading a batch.

    Default value is 10.

    Columns to Read Columns to read from the table. If you specify no columns, the origin reads all the columns in the table.

    Click the Add icon to specify an additional column. You can use simple or bulk edit mode to configure the columns.

    Additional Predicate Additional predicate to include in the WHERE clause of the query. Use Spark SQL syntax.
  4. On the Advanced tab, configure the following properties:
    Advanced Property Description
    Specify Fetch Size Enables specifying a specific fetch size.
    Fetch Size Suggests the number of rows that the JDBC driver should fetch with each database round-trip.

    Use 0 to skip defining a fetch size.

    For more information about configuring a fetch size, see the database documentation.

    Partition for NULL Enables processing records with null offset values. Generates a partition in each batch that includes all records with null offset values.
    Note: Enabling this property is likely to result in the processing of duplicate records.
    Use Custom Min/Max Query Enables overriding the default min/max offset query that the origin uses to generate the initial batch for the pipeline.
    Custom Min/Max Query Custom Spark SQL query that returns the minimum and maximum offset values to read, in this order.
    Use Custom Max Query Enables overriding the default max query that the origin uses to generate subsequent batches for the pipeline.
    Custom Max Query Custom Spark SQL query that returns the maximum offset values to read.
    JDBC Driver JDBC driver to include with the pipeline:
    • Bundle driver from connection string - Transformer bundles a JDBC driver with the pipeline.

      Select this option to use the PostgreSQL JDBC driver included with the origin. Bundling a driver ensures that pipeline can run on all Spark cluster nodes.

    • Bundle custom driver - Transformer bundles the specified driver with the pipeline.

      Select this option to use a third-party driver that you installed on Transformer as an external library. Bundling a driver ensures that the pipeline can run on all Spark cluster nodes.

    • Do not bundle driver - Transformer does not include a driver with the pipeline.

      Select this option in the rare case that each node of the Spark cluster includes a compatible JDBC driver for the pipeline to use.

    Driver Class Name Class name of the custom JDBC driver to use.