A transform is an optional user-defined program that receives data from a pipeline’s extractor and outputs modified data in the format expected by the pipeline. That is, in JSON, Avro, or CSV. Transforms can be written in any language, but the MemSQL node’s host Linux distribution must have the required dependencies to execute the transform. For example, if you write a transform in Python, the node’s Linux distribution must have Python installed and configured before it can be executed.
In a pipeline’s lifecycle, a transform is referenced by using an
file:// URI when executing a CREATE PIPELINE statement. During pipeline creation, a cluster’s master aggregator distributes the transform executable to each leaf node in the cluster. Each leaf node then executes the transform every time a batch partition is processed. For complete Transform syntax, see the CREATE PIPELINE reference documentation.
CREATE PIPELINE statement is executed, the transform must be accessible at the specified file system or network endpoint. If the transform is unavailable, pipeline creation will fail. An example
CREATE PIPELINE statement with transform is shown below:
CREATE PIPELINE mypipeline AS LOAD DATA KAFKA '192.168.1.100:9092/my-topic' WITH TRANSFORM ('http://www.memsql.com/my-transform.tar.gz', 'my-executable.py', '') INTO TABLE t
Creating a Transform
There are a few important things to consider when creating a transform. First, the host Linux distribution must have the required dependencies to execute the transform. If the necessary dependencies aren’t installed on your node, the transform will fail to run. Also, because transforms must be executable, a shebang should be at the top of your transform file (to ensure the appropriate interpreter is invoked). This will depend on the programming language used in your transform (e.g.
#!/usr/bin/env python3 for Python 3 or
#!/usr/bin/env ruby for Ruby).
Depending on your desired language and platform, any virtual machine overhead may greatly reduce a pipeline’s performance. As described in the section above, transforms are executed every time a batch partition is processed, which can be many times per second. Virtual machine overhead will reduce the execution speed of a transform, and thus degrade the performance of the entire pipeline.
One of the most important considerations about creating a transform is the way that data is exchanged from the pipeline’s extractor to the transform. When data is extracted from the source, it’s streamed to the transform via the
stdin communication channel as bytes. However, the bytes are encoded differently for different extractors. While an S3 pipeline streams raw bytes from the extractor to the transform, a Kafka pipeline streams byte length encoded data. See the next section to understand how to transform data in a Kafka pipeline.
The final step of a transform is to write the transformed data to
stdout in the format specified in a
CREATE PIPELINE statement. For example, it may need to write CSV with the specified
FIELDS TERMINATED BY string, or it may need to write “raw stream” Avro with the specified schema. The bytes written to
stdout will be interpreted as if by a LOAD DATA query with the same options as those given to a
CREATE PIPELINE statement.
Kafka Pipeline Byte Length Encoding
For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the
stdin stream in the following way:
- Read the first eight bytes to determine the byte length of the actual message data that follows.
- Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire
stdinstream has been read.
- If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
- Continue repeating this process until all data from
stdinhas been read.
The example in the next section shows how to do this process.
Example Kafka Transform
The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of
stdin to determine the length of the message, and then simply writes the contents of the message to
The code example works for both Python 2 and Python 3.
#!/usr/bin/python import struct import sys binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer def input_stream(): """ Consume STDIN and yield each record that is received from MemSQL """ while True: byte_len = binary_stdin.read(8) if len(byte_len) == 8: byte_len = struct.unpack("L", byte_len) result = binary_stdin.read(byte_len) yield result else: assert len(byte_len) == 0, byte_len return def log(message): """ Log an informational message to stderr which will show up in MemSQL in the event of transform failure. """ binary_stderr.write(message + b"\n") def emit(message): """ Emit a record back to MemSQL by writing it to STDOUT. The record should be formatted as JSON, Avro, or CSV as it will be parsed by LOAD DATA. """ binary_stdout.write(message + b"\n") log(b"Begin transform") # We start the transform here by reading from the input_stream() iterator. for data in input_stream(): # Since this is an identity transform we just emit what we receive. emit(data) log(b"End transform")
Iterating on a Transform
There are two workflows for iterating on a transform script, depending on how the pipeline is being created. If a pipeline is being created using the MemSQL Ops UI, iterating on the transform is possible by re-uploading the transform file and re-running Test Pipeline inside the Create Pipeline wizard. The transform file will then be managed by MemSQL Ops.
If the pipeline is being created directly with a CREATE PIPELINE statement, updating the transform is then possible by running ALTER PIPELINE pipeline_name RELOAD TRANSFORM. This command reloads the transform from the URI specified in the original CREATE PIPELINE statement.