The MemSQL Spark Connector 3.0 is a beta release to be used for testing and development and is not intended for production use.
Apache Spark is an open-source data processing framework. Spark excels at iterative computation and includes numerous libraries for statistical analysis, graph computations, and machine learning. The MemSQL Spark Connector allows you to connect your Spark and MemSQL environments. You can use MemSQL and Spark together to accelerate workloads by taking advantage of computational power of spark in tandem with the fast ingest and persistent storage MemSQL has to offer.
The MemSQL Spark Connector integrates with Apache Spark 2.3 and supports both data loading and extraction from database tables and Spark DataFrames.
The connector is implemented as a native Spark SQL plugin, and supports Spark’s DataSource API. Spark SQL supports operating on a variety of data sources through the DataFrame interface, and the DataFrame API is the widely used framework for how Spark interacts with other systems.
In addition, the connector is a true Spark data source; it integrates with the Catalyst query optimizer, supports robust SQL pushdown, and leverages MemSQL
LOAD DATA to accelerate ingest from Spark via compression.
This topic discusses how to configure and start using the MemSQL Spark Connector 3.0.
Note: We’ve made significant changes between the Spark Connector 3.0 and Spark Connector 2.0. Please see the Migrating between the Spark Connector 2.0 and the Spark Connector 3.0 section.
The Spark Connector 3.0 library requires Apache Spark 2.0+. The connector has been compiled and tested against Spark version 2.3.4 and Scala version 2.11.
- You need a running MemSQL cluster running versions 6.8 or higher, and a running Apache Spark environment.
- You can find the latest version of the connector on Maven Central and
spark-packages.org. The group is
com.memsqland the artifact is
- We release two versions of the memsql-spark-connector, one per Spark version. An example version number is: 3.0.0-beta-spark-2.3.4 which is the 3.0.0-beta version of the connector, compiled and tested against Spark 2.3.4. Make sure you are using the most recent version of the beta.
In addition to adding the memsql-spark-connector, you will also need to have the MariaDB JDBC driver installed. This library is tested against the following MariaDB driver version:
"org.mariadb.jdbc" % "mariadb-java-client" % "2.+"
Inside your project definition, you must add a dependency for the MemSQL Connector using either sbt or Maven. Using Maven, navigate to MemSQL’s Spark Connector, select the latest version, and grab the dependency information.
As an example, you would copy the following from the Maven repository to add the dependency. You must input the beta version and Spark version and that will depend on which you choose from the repository. We always recommend using the latest version for both, if possible.
<dependency> <groupId>com.memsql</groupId> <artifactId>memsql-spark-connector_2.11</artifactId> <version>3.0.0-beta<insert-beta-version>-spark-<insert-spark-version></version> </dependency>
The MemSQL Spark Connector leverages Spark SQL’s Data Sources API. The connection to MemSQL relies on the following Spark configuration settings:
The memsql-spark-connector is configurable globally via Spark options and locally when constructing a DataFrame. The global and local options use the same names; however the global options have the prefix
|ddlEndpoint (required)||Hostname or IP address of the MemSQL Master Aggregator in the format
|dmlEndPoint||Hostname or IP address of MemSQL Aggregator nodes to run queries against in the format
|user (required)||MemSQL username.|
|Password (required)||MemSQL password.|
|query||The query to run (mutually exclusive with database table).|
|dbtable||The table to query (mutually exclusive with query).|
|database||If set, all connections will default to using this database (default:empty).|
|truncate||Truncate instead of drop an existing table during Overwrite (default: false).|
|loadDataCompression||Compress data on load; one of three options: GZip, LZ4, Skip. (default:GZip).|
|disablePushdown||Disable SQL Pushdown when running queries (default:false).|
Example 1: Configuring the memsql-spark-connector globally
spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal") spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307") spark.conf.set("spark.datasource.memsql.user", "admin") spark.conf.set("spark.datasource.memsql.password", "s3cur3-pa$$word")
Example 2: Reading a MemSQL table into a DataFrame
val df = spark.read .format("memsql") .option("ddlEndpoint", "memsql-master.cluster.internal") .option("user", "admin") .load("foo")
Example 3: Reading a MemSQL table into a DataFrame and applying Data Frame operations
val x = spark.read .format("memsql") .option("ddlEndpoint") .option("user", "admin") .load("foo") .withColumn("hello", lit(2)) .filter(col("id") > 1) .limit(1000) .groupBy(col("id")) .agg(count("*"))
Example 4: Configuring the memsql-spark-connector using an external table in Spark SQL
CREATE TABLE bar USING memsql OPTIONS ('ddlEndpoint'='memsql-master.cluster.internal','dbtable'='foo.bar')
Example 5: Using the Spark write API to save a Data Frame to MemSQL
df.write .format("memsql") .option("loadDataCompression", "LZ4") .option("truncate", "false") .mode(SaveMode.Overwrite) .save("foo.bar") // in format: database.table
If the target table (“foo” in the example above) does not exist, the memsql-spark-connector will automatically attempt to create the table. If you specify
SaveMode.Overwrite, and the target table already exists, it will be recreated or truncated before load. Specify
truncate = true to truncate rather than re-create.
Specifying keys for tables created by the Spark Connector
When creating a table, the memsql-spark-connector will read options prefixed with
tableKey. These options must be formatted in a specific way in order to correctly specify the keys.
The default table type is a MemSQL columnstore. If you want to use a rowstore table, you will need to specify a primary key using the
To explain we will refer to the following example:
df.write .format("memsql") .option("tableKey.primary", "id") .option("tableKey.key.created_firstname", "created, firstname") .option("tableKey.unique", "username") .mode(SaveMode.Overwrite) .save("foo.bar") // in format: database.table
In this example, we are creating three keys:
- A primary key on the
- A regular key on the combination of the
createdcolumns, with the key name
- A unique key on the
usernamecolumn Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.
To change the default ColumnStore sort key you can specify it explicitly:
df.write .option("tableKey.columnstore", "id")
You can also customize the shard key like so.
df.write .option("tableKey.shard", "id, lastname")
Data Type Conversions
When saving a DataFrame to MemSQL, the DataFrame column will be converted to the following MemSQL type:
|Spark Type||MemSQL Type|
When reading a MemSQL table as a Spark DataFrame, the MemSQL column type will be converted to the following SparkType:
For all supported operations, refer to Spark’s data source APIs.
The memsql-spark-connector has extensive support for rewriting Spark SQL and dataframe operation query plans into standalone MemSQL queries. This allows most of the computation to be pushed into the MemSQL distributed system without any manual intervention. The SQL rewrites are enabled automatically but can be disabled using the
disablePushdown option. We also support partial pushdown in the case where certain parts of a query can be evaluated in MemSQL and certain parts need to be evaluated in Spark.
SQL Pushdown is either enabled or disabled on the entire Spark Session. If you want to run multiple queries in parallel with different values of
disablePushdown, make sure to run them on separate Spark Sessions.
We currently support most of the primary Logical Plan nodes in Spark SQL including:
We also support most Spark SQL expressions. A full list of supported operators/functions can be found in the file ExpressionGen.scala.
The best place to look for examples of fully supported queries is in the tests. Check out this file as a starting point: SQLPushdownTest.scala.
Load Balancing Requests
DML and DDL endpoints in the configuration refer to your child aggregators and your master aggregator, respectively. Use the DDL endpoint (master aggregator) for running DDL operations (e.g.
CREATE TABLE). Use DML endpoints (child aggregators) to read data and perform other DML operations. When you provide both endpoints to the connector, JDBC will automatically load balance non-DDL queries across your aggregators.
- The spark user must have access to the Master Aggregator.
- See the permissions matrix for details on MemSQL-specific operations and their required privileges.
If you want to know whether your queries are pushed down,
df.explain() can show what parts of the query were pushed down and can also be used for debugging if you encounter an issue. Additionally, if you pass the argument
true, you will get a lot more output that includes pre- and post-optimization passes.
In addition, the memsql-spark-connector outputs a lot of helpful information when the TRACE log level is enabled for the
com.memsql.spark package. You can do this in your log4j configuration by adding the following line:
Make sure not to leave it in place since it generates a huge amount of tracing output.
Migrating between the Spark Connector 2.0 and the Spark Connector 3.0
You may have previously used the MemSQL Spark Connector 2.0. There are many enhancements between the two versions, and the sections below describe the differences in configuration and functionality between the MemSQL Spark Connector versions 3.0 and 2.0.
- If you are only using the Spark reader API, using version 3.0 of the connector will only require a change in the configuration options. Please see below for details.
- If you are using the previous Spark connector to write data to MemSQL, the existing
savetoMemSQLfunction is deprecated and has been replaced with Spark’s
|2.0 Option||Related 3.0 Option (if applicable)||Details|
|masterHost||ddlEndpoint||Master host to connect to.|
|N/A||dmlEndpoint||Spark Connector 3.0 allows you to specify DML endpoints for load balancing for non-DDL queries. Load balancing is supported through different mechanism in Spark 2.0|
|masterPort||N/A||For the 3.0 version, the port is specified in the ddl/dml endpoint, respectively.|
|user||user||User to connect with.|
|password||password||Password for user.|
|defaultDatabase||database||Database to connect to.|
|N/A||query||The query to run (optional in 3.0).|
|N/A||dbtable||The table to query (optional in 3.0).|
|defaultSaveMode||N/A||Spark streaming is not available in 3.0. This is for Spark streaming in version 2.0 and allows a user to specify options for overriding duplicate keys.|
|disablePartitionPushdown||N/A||No partition pushdown in Spark connector 3.0.|
|defaultCreateMode||N/A||Controls whether databases and tables are created if they don’t exist. In 3.0 , we will automatically create a table if it doesn’t exist, but we will not create a database.|
|CompressionType||loadDataCompression||Compression options (there are more compression options available in the 3.0 connector).|
|defaultInsertBatchSize||N/A||This is only valid for limiting insert batches using multi-insert statements. This is not valid for Spark 3.0 because it uses LOAD DATA exclusively.|
|N/A||truncate||Spark Connector 3.0 allows you to specify whether to truncate or drop an existing table during overwrite.|
|N/A||disablePushdown||Controls whether SQL pushdown is Enabled or Disabled. The previous connector did not support robust SQL pushdown.|
Version 3.0 of the MemSQL Spark Connector contains the following functionality that is not available in version 2.0:
- Robust SQL pushdown for most query shapes and operations, including partial pushdown via a deep Spark Catalyst integration
- Supports using LZ4 compression to accelerate writing data into MemSQL
- Wraps the JDBC connector directly to stay current with the latest bug fixes and performance optimizations
- Re-designed from the ground up to be ready for a easy Spark 3.0 migration
Version 2.0 of the MemSQL Spark Connector contains the following functionality that is not available in version 3.0:
- Reading directly from MemSQL partitions for certain query shapes in a non-consistent way
- The SaveToMemSQL() function to write to MemSQL; this is replaced with using
- Adding indexes to automatically created tables; this can be done via a JDBC query
- Falling back to batch-inserts to support merging-on-write; this will be eventually re-enabled via advanced
- No duplicate key update specification support (planned). JDBC will just drop/reload data with new specification
- No formal Spark streaming support