Transforms

A transform is an optional user-defined program that receives data from a pipeline’s extractor and outputs modified data in the format expected by the pipeline. That is, in JSON, Avro, or CSV. Transforms can be written in any language, but the MemSQL node’s host Linux distribution must have the required dependencies to execute the transform. For example, if you write a transform in Python, the node’s Linux distribution must have Python installed and configured before it can be executed.

In a pipeline’s lifecycle, a transform is referenced by using an http:// or file:// URI when executing a CREATE PIPELINE statement. During pipeline creation, a cluster’s master aggregator distributes the transform executable to each leaf node in the cluster. Each leaf node then executes the transform every time a batch partition is processed. For complete Transform syntax, see the CREATE PIPELINE reference documentation.

When the CREATE PIPELINE statement is executed, the transform must be accessible at the specified file system or network endpoint. If the transform is unavailable, pipeline creation will fail. An example CREATE PIPELINE statement with transform is shown below:

CREATE PIPELINE mypipeline AS
LOAD DATA KAFKA '192.168.1.100:9092/my-topic'
WITH TRANSFORM ('http://www.memsql.com/my-transform.tar.gz', 'my-executable.py', '')
INTO TABLE t

Creating a Transform

There are a few important things to consider when creating a transform. First, the host Linux distribution must have the required dependencies to execute the transform. If the necessary dependencies aren’t installed on your node, the transform will fail to run.

Depending on your desired language and platform, any virtual machine overhead may greatly reduce a pipeline’s performance. As described in the section above, transforms are executed every time a batch partition is processed, which can be many times per second. Virtual machine overhead will reduce the execution speed of a transform, and thus degrade the performance of the entire pipeline.

One of the most important considerations about creating a transform is the way that data is exchanged from the pipeline’s extractor to the transform. When data is extracted from the source, it’s streamed to the transform via the stdin communication channel as bytes. However, the bytes are encoded differently for different extractors. While an S3 pipeline streams raw bytes from the extractor to the transform, a Kafka pipeline streams byte length encoded data. See the next section to understand how to transform data in a Kafka pipeline.

The final step of a transform is to write the transformed data to stdout in the format specified in a CREATE PIPELINE statement. For example, it may need to write CSV with the specified FIELDS TERMINATED BY string, or it may need to write “raw stream” Avro with the specified schema. The bytes written to stdout will be interpreted as if by a LOAD DATA query with the same options as those given to a CREATE PIPELINE statement.

Kafka Pipeline Byte Length Encoding

For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the stdin stream in the following way:

  1. Read the first eight bytes to determine the byte length of the actual message data that follows.
  2. Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire stdin stream has been read.
  3. If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
  4. Continue repeating this process until all data from stdin has been read.

Consider the following method written in Python, which reads the first eight bytes of stdin to determine the byte length of the actual message:

def input_stream():
    """
        Consume STDIN and yield each record that is received from MemSQL
    """
    while True:
        byte_len = sys.stdin.read(8)
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0]
            result = sys.stdin.read(byte_len)
            yield result
        else:
            assert len(byte_len) == 0, byte_len
            return

Once the length of the message has been determined, the message itself can be transformed programmatically.

Example Kafka Transform

The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of stdin to determine the length of the message, and then simply writes the contents of the message to stdout.

#!/usr/bin/python

import struct
import sys


def input_stream():
    """
        Consume STDIN and yield each record that is received from MemSQL
    """
    while True:
        byte_len = sys.stdin.read(8)
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0]
            result = sys.stdin.read(byte_len)
            yield result
        else:
            assert len(byte_len) == 0, byte_len
            return


def log(message):
    """
        Log an informational message to stderr which will show up in MemSQL in
        the event of transform failure.
    """
    sys.stderr.write(message + "\n")


def emit(message):
    """
        Emit a record back to MemSQL by writing it to STDOUT.  The record
        should be formatted as JSON, Avro, or CSV as it will be parsed by
        LOAD DATA.
    """
    sys.stdout.write(message + "\n")

log("Begin transform")

# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
    # Since this is an identity transform we just emit what we receive.
    emit(data)

log("End transform")

Iterating on a Transform

There are two workflows for iterating on a transform script, depending on how the pipeline is being created. If a pipeline is being created using the MemSQL Ops UI, iterating on the transform is possible by re-uploading the transform file and re-running Test Pipeline inside the Create Pipeline modal. The transform file will then be managed by MemSQL Ops.

If the pipeline is being created directly with a CREATE PIPELINE statement, updating the transform is then possible by running ALTER PIPELINE pipeline_name RELOAD TRANSFORM. This command reloads the transform from the URI specified in the original CREATE PIPELINE statement.

See Also

Was this article useful?