Product Solutions Support
Try MemSQL

CREATE PIPELINE

The CREATE PIPELINE clause creates a new pipeline in a MemSQL database.

CREATE PIPELINE [IF NOT EXISTS] pipeline_name AS
  LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration | azure_blob_configuration }
    [BATCH_INTERVAL milliseconds]
    [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
  [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS]
  INTO TABLE table_name
  [FIELDS | COLUMNS]
  [TERMINATED BY 'string'
    [[OPTIONALLY] ENCLOSED BY 'char']
    [ESCAPED BY 'char']
  ]
  [LINES
    [STARTING BY 'string']
    [TERMINATED BY 'string']
  ]
  [ (column_name, ... ) ]
  [ON DUPLICATE KEY UPDATE column_name = expression, [...]]

  kafka_configuration:
    KAFKA 'kafka_topic_endpoint'

  s3_configuration:
    S3 { 'bucket-name' | 'bucket-name/object-name' | 'bucket-name/prefix/object-name' }
      CREDENTIALS 'credentials_json'
      [CONFIG 'configuration_json']
      
  filesystem_configuration:
  FS 'path'
  
  azure_blob_configuration:
    AZURE { ‘container-name’ | 'container-name/object-name' | 'container-name/prefix/object-name' }
      CREDENTIALS 'credentials_json'
      [CONFIG 'configuration_json']

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE mypipeline AS
  LOAD DATA KAFKA '127.0.0.1/my-topic'
  INTO TABLE `my_table`;

This statement creates a new pipeline named my_pipeline, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into my_table. For more information about Kafka Pipelines, see Kafka Pipelines Overview.

S3 Pipeline Syntax

The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE mypipeline AS
  LOAD DATA S3 'my-bucket-name'
    CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"}'
  INTO TABLE `my_table`

This statement creates a new pipeline named my_pipeline, uses an S3 bucket named my-bucket-name as the data source, and will start ingesting the bucket’s objects into my_table. For more information about S3 Pipelines, see S3 Pipelines Overview.

Note that no CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, MemSQL will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown:

S3 Pipeline Using Specified Region:

CREATE PIPELINE mypipeline AS
  LOAD DATA S3 'my-bucket-name'
    CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"}'
    CONFIG '{"region": "us-west-1"}'
  INTO TABLE `my_table`

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE mypipeline AS
LOAD DATA AZURE 'my-container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key":
"my_account_key"}'
INTO TABLE `my_table`

This statement creates a new pipeline named my_pipeline, uses an Azure Blob container named my-container-name as the data source, and will start ingesting the bucket’s objects into my_table. For more information about Azure Pipelines, see Azure Blob Pipelines Overview

Note that no CONFIG clause is required to create an Azure pipeline.

Each of the clauses in a CREATE PIPELINE statement are described below.

Filesystem Pipeline Syntax

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE my_pipeline
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `my_table`
FIELDS TERMINATED BY ',';

This statement creates a new pipeline named my_pipeline, uses a directory as the data source, and will start ingesting data into my_table. For more information about Filesystem Pipelines, see Filesystem Pipelines Overview.

LOAD DATA

The CREATE PIPELINE clause uses standard LOAD DATA syntax options, including its error handling options. For more information, see the LOAD DATA page.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

WITH TRANSFORM

Info

Transforms are currently a preview feature of Pipelines. Their syntax and functionality may change in future MemSQL versions.

Additionally, transforms are not supported in MemSQL Cloud.

Pipeline source data can be transformed by specifying an executable program. The data is transformed after the extraction process and before it is loaded into the database. For more information, see Transforms.

WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1')
Was this article useful?