Concurrent Multi-Insert Examples min read


Python

Dependencies

Code

#!/usr/bin/python

import os
import sys
import time
import threading
import argparse

from memsql.common import database

parser = argparse.ArgumentParser()

parser.add_argument("--host", default=None, help="The hostname of the MemSQL node to connect to")
parser.add_argument("--port", default=None, type=int, help="The port of the MemSQL node to connect to")
parser.add_argument("--user", default="root", help="The user of the MemSQL node to connect to")
parser.add_argument("--password", default="", help="The password of the MemSQL node to connect to")

parser.add_argument("--database", default="simple_benchmark", help="The database to use - note: this database should not exist")

parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
parser.add_argument("--time", type=int, default=30, help="The number of seconds to run the benchmark for")

options = parser.parse_args()

HOST = None
PORT = None

TABLE = "tbl"
BATCH_SIZE = 5000

# Pre-generate the workload query
QUERY_TEXT = "INSERT INTO %s (val) VALUES %s" % (TABLE, ",".join(["(1)"] * BATCH_SIZE))

def get_connection(host=None, port=None, db=options.database):
    """ Returns a new connection to the database. """
    if host is None:
        host = HOST
    if port is None:
        port = PORT

    return database.connect(
        host=host,
        port=port,
        user=options.user,
        password=options.password,
        database=db)

class InsertWorker(threading.Thread):
    """ A simple thread which inserts empty rows in a loop. """

    def __init__(self, stopping):
        super(InsertWorker, self).__init__()
        self.stopping = stopping
        self.daemon = True
        self.exception = None

    def run(self):
        with get_connection() as conn:
            while not self.stopping.is_set():
                conn.execute(QUERY_TEXT)

def test_connection():
    try:
        with get_connection(db="information_schema") as conn:
            conn.ping()
    except database.MySQLError:
        print("Unable to connect to MemSQL with provided connection details.")
        print("Please verify that MemSQL is running @ %s:%s" % (HOST, PORT))
        sys.exit(1)

def setup_test_db():
    """ Create a database and table for this benchmark to use. """

    with get_connection(db="information_schema") as conn:
        print('Creating database %s' % options.database)

        try:
            # note: the following query will fail if there is an existing database
            conn.query('CREATE DATABASE %s' % options.database)
        except database.MySQLError:
            print("Database %s already exists - since we drop the database at" % options.database)
            print("the end of this script, please specify an un-used database")
            print("with the --database flag.")
            sys.exit(1)

        conn.query('USE %s' % options.database)

        conn.query('CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT PRIMARY KEY, val INT)' % TABLE)

def warmup():
    print('Warming up workload')
    with get_connection() as conn:
        conn.execute(QUERY_TEXT)

def run_benchmark():
    """ Run a set of InsertWorkers and record their performance. """

    stopping = threading.Event()
    workers = [ InsertWorker(stopping) for _ in range(options.num_workers) ]

    print('Launching %d workers' % options.num_workers)
    print('Workload will take approximately %d seconds.' % options.time)

    [ worker.start() for worker in workers ]
    time.sleep(options.time)

    print('Stopping workload')

    stopping.set()
    [ worker.join() for worker in workers ]

    with get_connection() as conn:
        count = conn.get("SELECT COUNT(*) AS count FROM %s" % TABLE).count

    print("%d rows inserted using %d threads" % (count, options.num_workers))
    print("%.1f rows per second" % (count / float(options.time)))

def cleanup():
    """ Cleanup the database this benchmark is using. """
    try:
        with get_connection() as conn:
            conn.query('DROP DATABASE IF EXISTS %s' % options.database)
    except database.MySQLError:
        pass

if __name__ == '__main__':
    HOST = options.host or "127.0.0.1"
    PORT = options.port or 3306

    cleanup()

    try:
        test_connection()
        setup_test_db()
        warmup()
        run_benchmark()
    except KeyboardInterrupt:
        print("Interrupted... exiting...")

Bash

Dependencies

  • mysql client program
  • Bourne-Again Shell (bash)

Code

#!/bin/bash

MHOST="127.0.0.1"
MPORT="3306"
MUSER="root"
MDB=""
NUM_WORKERS=128
BATCH_SIZE=256

memsql_exec()
{
        mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e "$1"
}

memsql_exec_multi()
{
        mysql -h $MHOST -P $MPORT -u $MUSER $MDB -e \
                "$(for ((b = 0; b < $BATCH_SIZE; b++)); do
                        echo "$1;"
                done)"
}

echo "Creating database test"
memsql_exec "CREATE DATABASE IF NOT EXISTS test"
MDB="test"

echo "Creating table tbl"
memsql_exec "CREATE TABLE IF NOT EXISTS tbl (id INT AUTO_INCREMENT PRIMARY KEY)"

echo "Launching $NUM_WORKERS workers"
sleep 1
declare -a WORKERS
for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
        (while [ 1 ]; do
                echo "Worker $worker inserting"
                memsql_exec_multi "INSERT INTO tbl VALUES (NULL)"
        done) &
        WORKERS[$worker]=$!
done

sleep 10

for ((worker = 0; worker < $NUM_WORKERS; worker++)); do
        echo "Killing worker $worker"
        kill ${WORKERS[$worker]}
        wait ${WORKERS[$worker]} 2>/dev/null
done
echo "Cleaning up"
sleep 1
memsql_exec "DROP DATABASE test"

Java

Dependencies

  • JDBC library (package libmysql-java on Debian-based distributions)

Code

import java.sql.*;
import java.util.Properties;
import java.util.concurrent.*;

public class Sample {
    private static final String dbClassName = "com.mysql.jdbc.Driver";
    private static final String CONNECTION = "jdbc:mysql://127.0.0.1:3306/";
    private static final String USER = "root";
    private static final String PASSWORD = "";
    private static void executeSQL(Connection conn, String sql) throws SQLException {
        try (Statement stmt = conn.createStatement()) {
            stmt.execute(sql);
        }
    }
    private static void ResetEnvironment() throws SQLException {
        Properties p = new Properties();
        p.put("user", USER);
        p.put("password", PASSWORD);
        try (Connection conn = DriverManager.getConnection(CONNECTION, p)) {
            for (String query: new String[] {
                    "DROP DATABASE IF EXISTS test",
                    "CREATE DATABASE test",
                    "USE test",
                    "CREATE TABLE tbl (id INT AUTO_INCREMENT PRIMARY KEY)"
            }) {
                executeSQL(conn, query);
            }
        }
    }
    private static void worker() {
        Properties properties = new Properties();
        properties.put("user", USER);
        properties.put("password", PASSWORD);
        try (Connection conn = DriverManager.getConnection(CONNECTION, properties)) {
            executeSQL(conn, "USE test");
            while (!Thread.interrupted()) {
                executeSQL(conn, "INSERT INTO tbl VALUES (NULL)");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException {
        Class.forName(dbClassName);
        ResetEnvironment();
        ExecutorService executor = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 20; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    worker();
                }
            });
        }
        Thread.sleep(20000);
        executor.shutdownNow();
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            System.err.println("Pool did not terminate");
        }
    }
}

C# / .NET Core

Dependencies

Code

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient; // dotnet add package MySql.Data

namespace MemSQLTest
{
  public class MemSQLTest
  {
    /**
    * Tweak the following globals to fit your environment
    * ###################################################
    */
    public const string HOST = "127.0.0.1";
    public const int PORT = 3306;
    public const string USER = "root";
    public const string PASSWORD = "";

    // Specify which database and table to work with.
    // Note: this database will be dropped at the end of this script
    public const string DATABASE = "test";
    public const string TABLE = "tbl";

    // The number of workers to run
    public const int NUM_WORKERS = 20;
    // Run the workload for this many seconds
    public const int WORKLOAD_TIME = 10; // seconds

    // Batch size to use
    public const int BATCH_SIZE = 5000;
    
    /**
    * Internal code starts here
    * #########################
    */

    private IDbCommand dbCommand;
    private string insertCommand;

    private void GetDbCommand()
    {
      IDbConnection conn = new MySqlConnection();
      conn.ConnectionString = $"Server={HOST};Port={PORT};Uid={USER};Pwd={PASSWORD};";
      conn.Open();
      dbCommand = conn.CreateCommand();

      string[] _batch = new string[BATCH_SIZE];
      Array.Fill(_batch, "(DEFAULT)");
      insertCommand = $"INSERT INTO {TABLE} VALUES {string.Join(",", _batch)}";
    }

    private void SetupTestDb()
    {
      dbCommand.CommandText = $"CREATE DATABASE IF NOT EXISTS {DATABASE}";
      dbCommand.ExecuteNonQuery();
      dbCommand.CommandText = $"USE {DATABASE}";
      dbCommand.ExecuteNonQuery();
      dbCommand.CommandText = $"CREATE TABLE {TABLE} (id int primary key auto_increment)";
      dbCommand.ExecuteNonQuery();
    }
    
    private void Warmup()
    {
      Console.WriteLine("Warming up workload");
      dbCommand.CommandText = insertCommand;
      dbCommand.ExecuteNonQuery(); // FRAGILE: included in count, not included in time
    }

    private void DoBenchmark()
    {
      Console.WriteLine($"Launching {NUM_WORKERS} workers for {WORKLOAD_TIME} sec");
      Thread[] workers = new Thread[NUM_WORKERS];
      for(int i = 0; i < NUM_WORKERS; i++)
      {
        workers[i] = new Thread(new ThreadStart(Worker));
        workers[i].Start();
      }
      Console.WriteLine($"{workers.Length} workers running...");
      for(int i = 0; i < NUM_WORKERS; i++)
      {
        workers[i].Join();
      }
    }

    /*
    // yields authentication error: https://bugs.mysql.com/bug.php?id=75917
    private async Task DoBenchmark()
    {
      List<Task> workers = new List<Task>();
      for(int i = 0; i < NUM_WORKERS; i++)
      {
        workers.Add(Task.Run(Worker));
      }
      Console.WriteLine($"{workers.Count} workers running...");
      await Task.WhenAll(workers);
    }
    */

    private void Worker()
    {
      // Create another connection per thread
      using (IDbConnection conn = new MySqlConnection())
      {
        conn.ConnectionString = $"Server={HOST};Port={PORT};database={DATABASE};Uid={USER};Pwd={PASSWORD};SslMode=None;";
        conn.Open();

        using (IDbCommand dbCommand = conn.CreateCommand())
        {
          dbCommand.CommandText = insertCommand;
          Stopwatch stop = new Stopwatch();
          stop.Start();
          while(stop.ElapsedMilliseconds < WORKLOAD_TIME*1000)
          {
            dbCommand.ExecuteNonQuery();
          }
        }
      }
    }

    private void ShowStats()
    {
      dbCommand.CommandText = $"USE {DATABASE}";
      dbCommand.ExecuteNonQuery();
      dbCommand.CommandText = $"SELECT COUNT(*) FROM {TABLE}";
      using (IDataReader reader = dbCommand.ExecuteReader())
      {
        long count = 0;
        while(reader.Read())
        {
          count = (long)reader["COUNT(*)"];
        }
        Console.WriteLine($"{count} rows inserted using {NUM_WORKERS} workers");
        Console.WriteLine($"{count / WORKLOAD_TIME} rows per second");
      }
    }

    private void CleanupTestDb()
    {
      if (dbCommand != null)
      {
        Console.WriteLine("Cleaning up");
        dbCommand.CommandText = $"USE `information_schema`";
        dbCommand.ExecuteNonQuery();
        dbCommand.CommandText = $"DROP DATABASE IF EXISTS {DATABASE}";
        dbCommand.ExecuteNonQuery();
        dbCommand = null;
      }
    }

    public static int Main(string[] args)
    {
      MemSQLTest tester = new MemSQLTest();
      try
      {
        tester.GetDbCommand();
        tester.SetupTestDb();
        tester.Warmup();
        tester.DoBenchmark();
        tester.ShowStats();
        tester.CleanupTestDb();
        return 0;
      }
      catch (Exception ex)
      {
        Console.WriteLine($"ERROR: {ex.Message}, {ex.GetType()}, {ex.StackTrace}");
        try
        {
          tester.CleanupTestDb();
        }
        catch
        {
          // ignore error
        }
        return 1;
      }
    }

  }
}

C

Dependencies

  • C compiler (e.g. gcc)
  • pthreads library (present on most Linux distributions)
  • mysqlclient library, available from the libmysqlclient-dev package on Debian-based distributions.

Code

/* Compile with:
 *
 * cc multi_threaded_inserts.c -lmysqlclient -pthread -o mti
 */

#include <stdlib.h>
#include <stdio.h>

#include <mysql/mysql.h>

const static char *host = "127.0.0.1";
const static char *user = "root";
const static char *passwd = "";
const static size_t port = 3306;

#define NUM_WORKERS 20

static volatile int keep_going = 1;

void *insert_worker(void *worker_id);

int main()
{
    my_init();

    MYSQL conn;
    mysql_init(&conn);

    printf("Connecting to MemSQL...\n");
    if (mysql_real_connect(&conn, host, user, passwd, NULL, port, NULL, 0) != &conn)
    {
        printf("Could not connect to the MemSQL database!\n");
        goto failure;
    }

    printf("Creating database 'test'...\n");
    if (mysql_query(&conn, "create database test") || mysql_query(&conn, "use test"))
    {
        printf("Could not create 'test' database!\n");
        goto failure;
    }

    printf("Creating table 'tbl' in database 'test'...\n");
    if (mysql_query(&conn, "create table tbl (id bigint auto_increment primary key)"))
    {
        printf("Could not create 'tbl' table in the 'test' database!\n");
        goto failure;
    }

    printf("Launching %lu insert workers...\n", NUM_WORKERS);

    pthread_t workers[NUM_WORKERS];

    size_t i;
    for (i = 0; i < NUM_WORKERS; ++i)
    {
        pthread_create(&workers[i], NULL, &insert_worker, (void *)i);
    }

    printf("Running inserts for %lu seconds...\n", 10);
    sleep(10);
    keep_going = 0;

    size_t rows_inserted = 0;
    for (i = 0; i < NUM_WORKERS; ++i)
    {
        size_t rows_i;
        pthread_join(workers[i], &rows_i);
        rows_inserted += rows_i;
    }

    printf("Inserted %lu rows. Cleaning up...\n", rows_inserted);

    if (mysql_query(&conn, "drop database test"))
    {
        printf("Could not drop the testing database 'test'!\n");
    }
    mysql_close(&conn);

    return 0;

failure:
    mysql_close(&conn);
    return 1;
}

void *insert_worker(void *worker_id)
{
    size_t id = (size_t) worker_id;

    MYSQL conn;
    mysql_init(&conn);
    if (mysql_real_connect(&conn, host, user, passwd, "test", port, NULL, 0) != &conn)
    {
        printf("Worker %lu could not connect to the MemSQL database! Aborting...\n", id);
        exit(1);
    }

    size_t i;
    for (i = 0; keep_going; i += 8)
    {
        if (mysql_query(&conn, "insert into tbl values (null), (null), (null),"
                        "(null), (null), (null), (null), (null)"))
        {
            printf("Worker %lu failed to insert data, aborting...\n", id);
            exit(1);
        }
    }

    mysql_close(&conn);

    return (void *)i;
}

Node.js

Dependencies

Code

const mysql = require('mysql'); // npm install mysql
const util = require('util');

/**
 * Tweak the following globals to fit your environment
 * ###################################################
 */
const HOST = '127.0.0.1';
const PORT = 3306;
const USER = 'root';
const PASSWORD = '';

// Specify which database and table to work with.
// Note: this database will be dropped at the end of this script
const DATABASE = 'test';
const TABLE = 'tbl';

// The number of workers to run
const NUM_WORKERS = 20;

// Run the workload for this many seconds
const WORKLOAD_TIME = 10;

// Batch size to use
const BATCH_SIZE = 5000;

/**
 * Internal code starts here
 * #########################
 */

let isDone = false;

// await-able setTimeout()
function timeout(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// Pre-generate the insert query
const _batch = new Array(BATCH_SIZE).fill().map(_ => '()').join(',');
const insertQuery = `INSERT INTO ${TABLE} VALUES ${_batch}`;;

function getConnection(dbName) {
  return new Promise(function(resolve, reject) {

    const conn = mysql.createConnection({
      host: HOST,
      port: PORT,
      user: USER,
      password: PASSWORD,
      database: dbName
    });

    conn.connect(err => {
      if (err) {
        reject(err);
      } else {
        conn.query = util.promisify(conn.query);
        resolve(conn);
      }
    });

  });
};

async function setupTestDb() {
  const conn = await getConnection('information_schema');
  await conn.query(`CREATE DATABASE IF NOT EXISTS ${DATABASE}`);
  await conn.query(`USE ${DATABASE}`);
  await conn.query(`CREATE TABLE IF NOT EXISTS ${TABLE} (id INT AUTO_INCREMENT PRIMARY KEY)`);
}

async function insertWorker() {
  const conn = await getConnection(DATABASE);
  while (true) {
    // await will process.nextTick()
    await conn.query(insertQuery);
    if (isDone) {
      break;
    }
  }
}

async function warmup() {
  console.log('Warming up workload');
  const conn = await getConnection(DATABASE)
  await conn.query(insertQuery); // FRAGILE: included in count, not included in time
}

async function doBenchmark() {
  console.log(`Launching ${NUM_WORKERS} workers for ${WORKLOAD_TIME} sec`);

  const workers = [];
  for (let i = 0; i < NUM_WORKERS; ++i) {
    workers.push(insertWorker());
  }
  console.log(`${workers.length} workers running...`);

  await timeout(WORKLOAD_TIME * 1000);

  console.log('Stopping workload');
  isDone = true;

  await Promise.all(workers);
}

async function printStats() {
  const conn = await getConnection(DATABASE);
  const rows = await conn.query(`SELECT COUNT(*) AS count FROM ${TABLE}`);
  const count = rows[0].count;
  console.log(`${count} rows inserted using ${NUM_WORKERS} workers`);
  console.log(`${count / WORKLOAD_TIME} rows per second`);
}

async function cleanupTestDb() {
  console.log('Cleaning up');
  const conn = await getConnection('information_schema');
  await conn.query(`DROP DATABASE ${DATABASE}`);
}

async function main() {
  try {
    await setupTestDb();
    await warmup();
    await doBenchmark();
    await printStats();
    await cleanupTestDb();
  } catch (err) {
    console.error('ERROR', err);
    try {
      await cleanupTestDb();
    } catch (err2) {
      console.error(err2);
    }
    process.exit(1);
  }
  process.exit(0); // releases all connections
}

main();
Was this article useful?