Product Solutions Support
Try MemSQL

Spark Connector

The MemSQL Spark Connector integrates with Apache Spark 2.0 and 2.1 and supports both data loading and extraction from database tables and Spark DataFrames.

You can download the Spark Connector from its GitHub repository. Scala reference documentation for the Spark Connector is available here.

This topic discusses how to configure and start using the Spark Connector 2.0. For information on version 1.0 of the Spark Connector, see the 1.33 branch on GitHub.

Info

The 79 page guide covers how to design, build, and deploy Spark applications using the MemSQL Spark Connector. Inside, you will find code samples to help you get started and performance recommendations for your production-ready Apache Spark and MemSQL implementations. Download the free MemSQL Spark Connector Guide today.

Prerequisites

The Spark Connector 2.0 library requires Apache Spark 2.0+ and has been primarily tested against Spark version 2.0.2. For support with Spark 1.x, please check the 1.x branch.

Installation

Inside your project definition, add a dependency for the MemSQL Connector using either sbt or Maven:

sbt Configuration:

libraryDependencies  += "com.memsql" %% "memsql-connector" % "2.0.2"

Maven Configuration:

<dependency>
    <groupId>com.memsql</groupId>
    <artifactId>memsql-connector_2.11</artifactId>
    <version>2.0.2</version>
</dependency>

Configuration Settings

The MemSQL Spark Connector leverages Spark SQL’s Data Sources API. The connection to MemSQL relies on the following Spark configuration settings:

Setting Name Default Value
spark.memsql.host localhost
spark.memsql.port 3306
spark.memsql.user root
spark.memsql.password None
spark.memsql.defaultDatabase None
spark.memsql.defaultSaveMode “error” (see description below)
spark.memsql.disablePartitionPushdown false
spark.memsql.defaultCreateMode DatabaseAndTable

When data is loaded to a MemSQL table, defaultCreateMode specifies whether the connector will create the database and/or table if it doesn’t already exist. The possible values are DatabaseAndTable, Table, and Skip. The user will need the corresponding create permissions if the value is not Skip.

Note that all MemSQL credentials have to be the same on all nodes to take advantage of partition pushdown, which queries leaves directly.

Loading Data from MemSQL

The following example creates a DataFrame from the table illinois in the database customers. To use the library, pass in com.memsql.spark.connector as the format parameter to ensure that Spark calls the MemSQL Spark Connector code. The path option is the full path of the table using the syntax $database_name.$table_name. If there is only a table name, the connector will look for the table in the default database set in the configuration.

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
	.setAppName("MemSQL Spark Connector Example")
	.set("spark.memsql.host", "10.0.0.190")
	.set("spark.memsql.password", "foobar")
	.set("spark.memsql.defaultDatabase", "customers")
val spark = SparkSession.builder().config(conf).getOrCreate()

val customersFromIllinois = spark
	.read
	.format("com.memsql.spark.connector")
	.options(Map("path" -> ("customers.illinois")))
	.load()
// customersFromIllinois is now a Spark DataFrame which represents the specified MemSQL table
// and can be queried using Spark DataFrame query functions

// count the number of rows
println(s"The number of customers from Illinois is ${customersFromIllinois.count()}")

// print out the DataFrame
customersFromIllinois.show()

Instead of specifying a MemSQL table as a path option, you can also create a DataFrame from a SQL query by using the query option. This option minimizes the amount of data transferred from MemSQL to Spark, and pushes down distributed computations to MemSQL instead of Spark. For best performance, either specify the database name using the option database, or ensure a default database is set in the Spark configuration. Either of these settings enables the connector to query the MemSQL leaf nodes directly, instead of going through the master aggregator.

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
	.setAppName("MemSQL Spark Connector Example")
	.set("spark.memsql.host", "10.0.0.190")
	.set("spark.memsql.password", "foobar")
val spark = SparkSession.builder().config(conf).getOrCreate()

val customersFromIllinois = spark
	.read
	.format("com.memsql.spark.connector")
	.options(Map("query" -> ("select age_group, count(*) from customers.illinois where number_of_orders > 3 GROUP BY age_group"),
				 "database" -> "customers"))
	.load()

customersFromIllinois.show()
// +-----------+---------+
// | age_group | count(*)|
// +-----------+---------+
// |  13-18    |   128   |
// |  19-25    |   150   |
// |  26+      |   140   |
// +-----------+---------+

Saving Data to MemSQL

Similarly, use Spark SQL’s Data Sources API to save a DataFrame to MemSQL. To save a DataFrame in the MemSQL table students:

...

val rdd = sc.parallelize(Array(Row("John Smith", 12), Row("Jane Doe", 13)))
val schema = StructType(Seq(StructField("Name", StringType, false),
                            StructField("Age", IntegerType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df
	.write
	.format("com.memsql.spark.connector")
	.mode("error")
	.save("people.students")

The mode specifies how to handle duplicate keys when the MemSQL table has a primary key. If unspecified, the default is error, which means that if a row with the same primary key already exists in MemSQL’s people.students table, an error will be thrown. Other save modes include:

Save Mode String Description
“error” MemSQL will error when encountering a record with duplicate keys
“ignore” MemSQL will ignore records with duplicate keys and, without rolling back, continue inserting records with unique keys.
“overwrite” MemSQL will replace the existing record with the new record

The second interface to save data to MemSQL is via the saveToMemSQL implicit function on a DataFrame you wish to save:

...

val rdd = sc.parallelize(Array(Row("John Smith", 12), Row("Jane Doe", 13)))
val schema = StructType(Seq(StructField("Name", StringType, false),
                            StructField("Age", IntegerType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.saveToMemSQL("people.students")
      // The database name can be omitted if "spark.memsql.defaultDatabase" is set
      // in the Spark configuration df.sqlContext.sparkContext.getConf.getAll

Types

When saving a DataFrame from Spark to MemSQL, the SparkType of each DataFrame column will be converted to the following MemSQL type:

SparkType MemSQL Type
ShortType SMALLINT
FloatType FLOAT
DoubleType DOUBLE
LongType BIGINT
IntegerType INT
BooleanType BOOLEAN
StringType TEXT
BinaryType BLOB
DecimalType DECIMAL
TimeStampType TIMESTAMP
DateType DATE

When reading a MemSQL table as a Spark DataFrame, the MemSQL column type will be converted to the following SparkType:

MemSQL Type SparkType
TINYINT, SMALLINT ShortType
INTEGER IntegerType
BIGINT (signed) LongType
DOUBLE, NUMERIC DoubleType
REAL FloatType
DECIMAL DecimalType
TIMESTAMP TimestampType
DATE DateType
TIME StringType
CHAR, VARCHAR StringType
BIT, BLOG, BINARY BinaryType

MemSQL Spark 2.0 Connector does not support GeoSpatial or JSON MemSQL types since Spark 2.0 has currently disabled user defined types (see JIRA issue). These types, when read, will become BinaryType.

Changes from MemSQL Spark Connector 1.x

While the MemSQL Spark Connector 1.x relied on Spark SQL experimental developer APIs, the MemSQL Spark 2.0 Connector uses only the official and stable APIs for loading data from an external data source documented here. In certain cases, the Spark Connector 2.0 can “push down” distributed computations to MemSQL. This means that instead of having Spark perform a a transformation (e.g. filter, join, etc) on the data it retrieved from MemSQL, you can let MemSQL do the operation on the data and pass the result to Spark. The MemSQL Spark Connector 2.0 supports column and filter pushdown; if you would like to push down joins or aggregates, consider explicitly including it in the user-specified query option. For example, instead of:

val people = spark.read.format("com.memsql.spark.connector").options(Map("path" -> ("db.people"))).load()
val department = spark.read.format("com.memsql.spark.connector").options(Map("path" -> ("db.department"))).load()
val result = people.join(department, people("deptId") === department("id"))

Use the following instead:

val result = spark
	.read
	.format("com.memsql.spark.connector")
	.options(Map("query" -> ("select * from people join department on people.deptId = department.id")))
	.load()
Was this article useful?