Connecting to Spark (Beta 3 Connector) min read


Alert

The MemSQL Spark Connector 3.0 is a beta release to be used for testing and development and is not intended for production use.

Apache Spark is an open-source data processing framework. Spark excels at iterative computation and includes numerous libraries for statistical analysis, graph computations, and machine learning. The MemSQL Spark Connector allows you to connect your Spark and MemSQL environments. You can use MemSQL and Spark together to accelerate workloads by taking advantage of computational power of spark in tandem with the fast ingest and persistent storage MemSQL has to offer.

The MemSQL Spark Connector integrates with Apache Spark 2.3 and supports both data loading and extraction from database tables and Spark DataFrames.

The connector is implemented as a native Spark SQL plugin, and supports Spark’s DataSource API. Spark SQL supports operating on a variety of data sources through the DataFrame interface, and the DataFrame API is the widely used framework for how Spark interacts with other systems.

In addition, the connector is a true Spark data source; it integrates with the Catalyst query optimizer, supports robust SQL pushdown, and leverages MemSQL LOAD DATA to accelerate ingest from Spark via compression.

You can download the Spark Connector from its GitHub repository and from Maven Central. The group is com.memsql and the artifact is memsql-spark-connector_2.11.

This topic discusses how to configure and start using the MemSQL Spark Connector 3.0.

Note: We’ve made significant changes between the Spark Connector 3.0 and Spark Connector 2.0. Please see the Migrating between the Spark Connector 2.0 and the Spark Connector 3.0 section.

Getting Started

The Spark Connector 3.0 library requires Apache Spark 2.0+. The connector has been compiled and tested against Spark version 2.3.4 and Scala version 2.11.

  • You need a running MemSQL cluster running versions 6.8 or higher, and a running Apache Spark environment.
  • You can find the latest version of the connector on Maven Central and spark-packages.org. The group is com.memsql and the artifact is memsql-spark-connector_2.11.
  • We release two versions of the memsql-spark-connector, one per Spark version. An example version number is: 3.0.0-beta-spark-2.3.4 which is the 3.0.0-beta version of the connector, compiled and tested against Spark 2.3.4. Make sure you are using the most recent version of the beta.
  • In addition to adding the memsql-spark-connector, you will also need to have the MariaDB JDBC driver installed. This library is tested against the following MariaDB driver version:

    "org.mariadb.jdbc" % "mariadb-java-client"  % "2.+"
    

Installation

Inside your project definition, you must add a dependency for the MemSQL Connector using either sbt or Maven. Using Maven, navigate to MemSQL’s Spark Connector, select the latest version, and grab the dependency information.

As an example, you would copy the following from the Maven repository to add the dependency. You must input the beta version and Spark version and that will depend on which you choose from the repository. We always recommend using the latest version for both, if possible.

<dependency>
  <groupId>com.memsql</groupId>
  <artifactId>memsql-spark-connector_2.11</artifactId>
  <version>3.0.0-beta<insert-beta-version>-spark-<insert-spark-version></version>
</dependency>

Configuration Settings

The MemSQL Spark Connector leverages Spark SQL’s Data Sources API. The connection to MemSQL relies on the following Spark configuration settings:

The memsql-spark-connector is configurable globally via Spark options and locally when constructing a DataFrame. The global and local options use the same names; however the global options have the prefix spark.datasource.memsql.:

Option Description
ddlEndpoint (required) Hostname or IP address of the MemSQL Master Aggregator in the format host[:port](port is optional). Example: master-agg.foo.internal:3308 or master-agg.foo.internal
dmlEndPoint Hostname or IP address of MemSQL Aggregator nodes to run queries against in the format host[:port],host[:port],... (:port is optional, multiple hosts separated by comma). Example: child-agg:3308,child-agg2 (default: ddlEndpoint)
user (required) MemSQL username.
Password (required) MemSQL password.
query The query to run (mutually exclusive with database table).
dbtable The table to query (mutually exclusive with query).
database If set, all connections will default to using this database (default:empty).
truncate Truncate instead of drop an existing table during Overwrite (default: false).
loadDataCompression Compress data on load; one of three options: GZip, LZ4, Skip. (default:GZip).
disablePushdown Disable SQL Pushdown when running queries (default:false).

Examples

Example 1: Configuring the memsql-spark-connector globally

spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.memsql.user", "admin")
spark.conf.set("spark.datasource.memsql.password", "s3cur3-pa$$word")

Example 2: Reading a MemSQL table into a DataFrame

val df = spark.read
    .format("memsql")
    .option("ddlEndpoint", "memsql-master.cluster.internal")
    .option("user", "admin")
    .load("foo")

Example 3: Reading a MemSQL table into a DataFrame and applying Data Frame operations

val x = spark.read
.format("memsql")
.option("ddlEndpoint")
	.option("user", "admin")
	.load("foo")
.withColumn("hello", lit(2))
.filter(col("id") > 1)
  	.limit(1000)
   	.groupBy(col("id"))
    	.agg(count("*"))

Example 4: Configuring the memsql-spark-connector using an external table in Spark SQL

CREATE TABLE bar USING memsql OPTIONS ('ddlEndpoint'='memsql-master.cluster.internal','dbtable'='foo.bar')

Example 5: Using the Spark write API to save a Data Frame to MemSQL

df.write
    .format("memsql")
    .option("loadDataCompression", "LZ4")
    .option("truncate", "false")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

If the target table (“foo” in the example above) does not exist, the memsql-spark-connector will automatically attempt to create the table. If you specify SaveMode.Overwrite, and the target table already exists, it will be recreated or truncated before load. Specify truncate = true to truncate rather than re-create.

Specifying keys for tables created by the Spark Connector

When creating a table, the memsql-spark-connector will read options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

Info

The default table type is a MemSQL columnstore. If you want to use a rowstore table, you will need to specify a primary key using the tableKey option.

To explain we will refer to the following example:

df.write
    .format("memsql")
    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstname")
    .option("tableKey.unique", "username")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

In this example, we are creating three keys:

  1. A primary key on the id column
  2. A regular key on the combination of the firstname and created columns, with the key name created_firstname
  3. A unique key on the username column Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.

To change the default ColumnStore sort key you can specify it explicitly:

df.write
    .option("tableKey.columnstore", "id")

You can also customize the shard key like so.

df.write
    .option("tableKey.shard", "id, lastname")

Data Type Conversions

When saving a DataFrame to MemSQL, the DataFrame column will be converted to the following MemSQL type:

Spark Type MemSQL Type
LongType BIGINT
IntegerType INT
ShortType SMALLINT
FloatType FLOAT
DoubleType DOUBLE
ByteType TINYINT
StringType TEXT
BinaryType BLOB
DecimalType DECIMAL
BooleanType BOOLEAN
TimeStampType TIMESTAMP
BooleanType DateType

When reading a MemSQL table as a Spark DataFrame, the MemSQL column type will be converted to the following SparkType:

MemSQL Type SparkType
TINYINT ByteType
SMALLINT ShortType
INTEGER IntegerType
BIGINT(signed) Longtype
DOUBLE DoubleType
NUMERIC DecimalType
REAL DoubleType
DECIMAL DecimalType
TIMESTAMP TimeStampType
DATE DateType
TIME TimeStampType
CHAR,VARCHAR StringType
BOOLEAN, BIT BooleanType
BLOB, BINARY BinaryType

Supported Operations

For all supported operations, refer to Spark’s data source APIs.

SQL Pushdown

The memsql-spark-connector has extensive support for rewriting Spark SQL and dataframe operation query plans into standalone MemSQL queries. This allows most of the computation to be pushed into the MemSQL distributed system without any manual intervention. The SQL rewrites are enabled automatically but can be disabled using the disablePushdown option. We also support partial pushdown in the case where certain parts of a query can be evaluated in MemSQL and certain parts need to be evaluated in Spark.

SQL Pushdown is either enabled or disabled on the entire Spark Session. If you want to run multiple queries in parallel with different values of disablePushdown, make sure to run them on separate Spark Sessions.

We currently support most of the primary Logical Plan nodes in Spark SQL including: Project, Filter, Aggregate, Window, Join, Limit, Sort.

We also support most Spark SQL expressions. A full list of supported operators/functions can be found in the file ExpressionGen.scala.

The best place to look for examples of fully supported queries is in the tests. Check out this file as a starting point: SQLPushdownTest.scala.

Load Balancing Requests

DML and DDL endpoints in the configuration refer to your child aggregators and your master aggregator, respectively. Use the DDL endpoint (master aggregator) for running DDL operations (e.g. CREATE TABLE). Use DML endpoints (child aggregators) to read data and perform other DML operations. When you provide both endpoints to the connector, JDBC will automatically load balance non-DDL queries across your aggregators.

Security Considerations

  • The spark user must have access to the Master Aggregator.
  • See the permissions matrix for details on MemSQL-specific operations and their required privileges.

Debugging

SQL Pushdown

If you want to know whether your queries are pushed down, df.explain() can show what parts of the query were pushed down and can also be used for debugging if you encounter an issue. Additionally, if you pass the argument true, you will get a lot more output that includes pre- and post-optimization passes.

Other

In addition, the memsql-spark-connector outputs a lot of helpful information when the TRACE log level is enabled for the com.memsql.spark package. You can do this in your log4j configuration by adding the following line: log4j.logger.com.memsql.spark=TRACE Make sure not to leave it in place since it generates a huge amount of tracing output.

Migrating between the Spark Connector 2.0 and the Spark Connector 3.0

You may have previously used the MemSQL Spark Connector 2.0. There are many enhancements between the two versions, and the sections below describe the differences in configuration and functionality between the MemSQL Spark Connector versions 3.0 and 2.0.

Configuration Differences

  • If you are only using the Spark reader API, using version 3.0 of the connector will only require a change in the configuration options. Please see below for details.
  • If you are using the previous Spark connector to write data to MemSQL, the existing savetoMemSQL function is deprecated and has been replaced with Spark’s df.write and .save() functions.

Configuration Comparisons

2.0 Option Related 3.0 Option (if applicable) Details
masterHost ddlEndpoint Master host to connect to.
N/A dmlEndpoint Spark Connector 3.0 allows you to specify DML endpoints for load balancing for non-DDL queries. Load balancing is supported through different mechanism in Spark 2.0
masterPort N/A For the 3.0 version, the port is specified in the ddl/dml endpoint, respectively.
user user User to connect with.
password password Password for user.
defaultDatabase database Database to connect to.
N/A query The query to run (optional in 3.0).
N/A dbtable The table to query (optional in 3.0).
defaultSaveMode N/A Spark streaming is not available in 3.0. This is for Spark streaming in version 2.0 and allows a user to specify options for overriding duplicate keys.
disablePartitionPushdown N/A No partition pushdown in Spark connector 3.0.
defaultCreateMode N/A Controls whether databases and tables are created if they don’t exist. In 3.0 , we will automatically create a table if it doesn’t exist, but we will not create a database.
CompressionType loadDataCompression Compression options (there are more compression options available in the 3.0 connector).
defaultInsertBatchSize N/A This is only valid for limiting insert batches using multi-insert statements. This is not valid for Spark 3.0 because it uses LOAD DATA exclusively.
N/A truncate Spark Connector 3.0 allows you to specify whether to truncate or drop an existing table during overwrite.
N/A disablePushdown Controls whether SQL pushdown is Enabled or Disabled. The previous connector did not support robust SQL pushdown.

Functionality Differences

Version 3.0 of the MemSQL Spark Connector contains the following functionality that is not available in version 2.0:

  • Robust SQL pushdown for most query shapes and operations, including partial pushdown via a deep Spark Catalyst integration
  • Supports using LZ4 compression to accelerate writing data into MemSQL
  • Wraps the JDBC connector directly to stay current with the latest bug fixes and performance optimizations
  • Re-designed from the ground up to be ready for a easy Spark 3.0 migration

Version 2.0 of the MemSQL Spark Connector contains the following functionality that is not available in version 3.0:

  • Reading directly from MemSQL partitions for certain query shapes in a non-consistent way
  • The SaveToMemSQL() function to write to MemSQL; this is replaced with using df.write directly
  • Adding indexes to automatically created tables; this can be done via a JDBC query
  • Falling back to batch-inserts to support merging-on-write; this will be eventually re-enabled via advanced LOAD DATA options
  • No duplicate key update specification support (planned). JDBC will just drop/reload data with new specification
  • No formal Spark streaming support
Was this article useful?