Product Solutions Support
Try MemSQL

Query Tuning Guide

Look for Hot Queries

You can look for frequently run queries and long-running queries on the Query Explorer Page or with SHOW PLANCACHE.

For example, you can use a query like the following to see the select queries with highest average execution time. Of course you can modify the query to filter by database, look for specific types of queries, find queries using the most total time or the highest memory use, etc.

select Database_Name, Query_Text, Commits, Execution_Time, Execution_Time/Commits as avg_time_ms, Average_Memory_Use
from information_schema.plancache
where Query_Text like 'select%'
order by avg_time_ms desc;

Check if Queries are Using Indexes

One important query performance consideration is adding appropriate indexes for your queries. You can use EXPLAIN to see whether queries are using indexes. We will show examples of a few simple cases to look for where indexes can greatly improve query performance.

Indexes for filters

Consider an example table:

create table t (a int, b int);

Suppose we are running queries like

select * from t where a=3;

EXPLAIN shows us that running the query against the current table schema requires a full Table Scan - scanning all the rows of t, which is unnecessarily expensive if a small fraction of the values in t equal 3.

memsql> explain select * from t where a=3;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.a = 3]      |
| TableScan db.t        |
+-----------------------+

If we add an index, we can see that the query instead uses an Index Range Scan on the key a:

memsql> alter table t add index (a);
memsql> explain select * from t where a=3;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

A query that filters on both a and b is unable to take advantage of the filtering on b to reduce the rows we need to scan, though. As you can see, the scan uses a=3 only.

memsql> explain select * from t where a=3 and b=4;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| Filter [t.b = 4]                            |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

Adding an index on (a, b) instead allows the query to scan more selectively:

memsql> alter table t add index (a, b);
memsql> explain select * from t where a=3 and b=4;
+------------------------------------------------------------+
| EXPLAIN                                                    |
+------------------------------------------------------------+
| Project [t.a, t.b]                                         |
| Gather partitions:all                                      |
| Project [t.a, t.b]                                         |
| IndexRangeScan db.t, KEY a_2 (a, b) scan:[a = 3 AND b = 4] |
+------------------------------------------------------------+

However, a query that filters on b only still does not match an index, since the query filters must match a prefix of the index column list to be able to effectively take advantage of the index:

memsql> explain select * from t where b=4;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.b = 4]      |
| TableScan db.t        |
+-----------------------+

Indexes for group-by and order-by

Another class of cases where indexes can improve query performance is group-by and order-by.

Consider this example table:

create table t (a int, b int);

Consider the following query:

memsql> explain select a, sum(b) from t group by a;
+------------------------------------------------------+
| EXPLAIN                                              |
+------------------------------------------------------+
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| Gather partitions:all                                |
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t                                       |
+------------------------------------------------------+

Executing the above query requires a hash group-by: MemSQL builds a hash table with an entry for each group of a.

However, with an index on a, MemSQL is able to execute the query with a streaming group-by, because by scanning the index on a we can process all elements of a group consecutively.

memsql> alter table t add index (a);
memsql> explain select a, sum(b) from t group by a;
+-----------------------------------------------------------+
| EXPLAIN                                                   |
+-----------------------------------------------------------+
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| GatherMerge [t.a] partitions:all                          |
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t, KEY a (a)                                 |
+-----------------------------------------------------------+

Similarly, for order-by, without an index MemSQL needs to sort:

memsql> explain select * from t order by b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| Sort [t.b]                       |
| TableScan db.t                   |
+----------------------------------+

With an index, MemSQL can avoid the need for a sort:

memsql> alter table t add index (b);
memsql> explain select * from t order by b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| TableScan db.t, KEY b (b)        |
+----------------------------------+

Fanout vs Single-Partition Queries

MemSQL’s distributed architecture is great for allowing you to take advantage of CPUs from many servers in your queries. This allows for very fast performance on aggregation queries which scan millions of rows. However, for transactional queries which select relatively few rows, this is not ideal. Instead, it is best for each query to only involve a single partition. When a query has equality filters which completely match the shard key of the table, MemSQL can optimize it to only require execution on a single partition.

We will use this example table:

create table urls (
  domain varchar(128),
  path varchar(8192),
  time_sec int,
  status_code binary(3),
  ...
  shard key (domain, path, time_sec)
);

The following query is single partition, because it has equality filters on all columns of the shard key, so the rows which match can only be on a single partition. In the explain, Gather partitions:single indicates MemSQL is using a single-partition plan.

memsql> explain SELECT status_code 
FROM   urls 
WHERE  domain = 'youtube.com' 
AND    path = '/watch?v=euh_uqxwk58'
AND    time_sec = 1

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single                                                                                                                              |
| Project [urls.status_code]                                                                                                                            |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58" AND time_sec = 1] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.66 sec)

But the following query which does not filter on time_sec does not match a single partition. The query therefore requires selecting from all partitions, indicated in the explain by Gather partitions:all.

memsql> explain SELECT status_code 
FROM   urls 
WHERE  domain = 'youtube.com'
AND    path = '/watch?v=euh_uqxwk58';

+--------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Project [urls.status_code]                                                                                                           |
| Gather partitions:all                                                                                                                |
| Project [urls.status_code]                                                                                                           |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58"] |
+--------------------------------------------------------------------------------------------------------------------------------------

To fix this, we could instead shard the table urls on domain. This would make it easier to write queries which route to a single partition. However, some domains will have much more pages than other domains and this could lead to skew. Choosing a shard key is often a balancing act: we want the least restrictive shard key possible while also ensuring that we do not have skew. A good compromise in this case would be to shard on (domain, path).

Distributed Joins

MemSQL’s query execution architecture allows you to run arbitrary SQL queries on any table regardless of data distribution. However, you can often improve performance by optimizing your schema to minimize data movement during query execution.

Collocating Joins

Consider the following tables:

CREATE TABLE lineitem(
    l_orderkey int not null,
    l_linenumber int not null,
    ...
    primary key(l_orderkey, l_linenumber)
);

CREATE TABLE orders(
    o_orderkey int not null,
    ...
    primary key(o_orderkey)
);

When we join lineitem and orders with the current schema, we will perform a distributed join and repartition data from the lineitem table.

memsql> explain SELECT Count(*) 
FROM lineitem 
JOIN orders 
ON o_orderkey = l_orderkey ;

+---------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------+
| Project [`count(*)`]                                                                                                            |
| Aggregate [SUM(`count(*)`) AS `count(*)`]                                                                                       |
| Gather partitions:all est_rows:1                                                                                                |
| Project [`count(*)`] est_rows:1 est_select_cost:1762812                                                                         |
| Aggregate [COUNT(*) AS `count(*)`]                                                                                              |
| NestedLoopJoin                                                                                                                  |
| |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = r0.l_orderkey] est_table_rows:565020 est_filtered:565020 |
| TableScan r0 storage:list stream:no                                                                                             |
| Repartition [lineitem.l_orderkey] AS r0 shard_key:[l_orderkey] est_rows:587604                                                  |
| TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) est_table_rows:587604 est_filtered:587604                       |
+---------------------------------------------------------------------------------------------------------------------------------+

We can improve this performance of this query by adding an explicit shard key to the lineitem table on l_orderkey. Now we can perform a local join between lineitem and orders.

+--------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------+
| Project [`count(*)`]                                                                                               |
| Aggregate [SUM(`count(*)`) AS `count(*)`]                                                                          |
| Gather partitions:all                                                                                              |
| Project [`count(*)`]                                                                                               |
| Aggregate [COUNT(*) AS `count(*)`]                                                                                 |
| ChoosePlan                                                                                                         |
| |   :estimate                                                                                                      |
| |       SELECT COUNT(*) AS cost FROM test.lineitem                                                                 |
| |       SELECT COUNT(*) AS cost FROM test.orders                                                                   |
| |---NestedLoopJoin                                                                                                 |
| |   |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = lineitem.l_orderkey]                    |
| |   TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber)                                                |
| +---NestedLoopJoin                                                                                                 |
|     |---IndexRangeScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) scan:[l_orderkey = orders.o_orderkey] |
|     TableScan test.orders, PRIMARY KEY (o_orderkey)                                                                |
+--------------------------------------------------------------------------------------------------------------------+

Reference Table Joins

Consider the following schema:

CREATE TABLE customer(
    c_custkey int not null,
    c_nationkey int not null,
    ...
    primary key(c_custkey),
    key(c_nationkey)
);

CREATE TABLE nation(
    n_nationkey int not null,
    ...
    primary key(n_nationkey)	
);

With the current schema, when we join the customer and nation tables together on nationkey will have to broadcast the nation table every time the query is run.

memsql> explain SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;

+-------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                                                            |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                                                       |
| Gather partitions:all est_rows:1                                                                                                                |
| Project [`Count(*)`] est_rows:1 est_select_cost:1860408                                                                                         |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                                                              |
| NestedLoopJoin                                                                                                                                  |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = r1.n_nationkey] est_table_rows:1856808 est_filtered:1856808 |
| TableScan r1 storage:list stream:no                                                                                                             |
| Broadcast [nation.n_nationkey] AS r1 est_rows:300                                                                                               |
| TableScan test.nation, PRIMARY KEY (n_nationkey) est_table_rows:300 est_filtered:300                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------+

We can avoid this broadcast by making nation reference table. nation makes a good reference table because it is relatively small and changes rarely. While broadcasting such a small table will likely have negligible effect on single query latency, repeatedly doing so can have an outsize effect on concurrent workloads.

CREATE REFERENCE TABLE nation(
    n_nationkey int not null,
    ...
    primary key(n_nationkey)	
);

memsql> explain SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                        |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                   |
| Gather partitions:all                                                                                       |
| Project [`Count(*)`]                                                                                        |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                          |
| ChoosePlan                                                                                                  |
| |   :estimate                                                                                               |
| |       SELECT COUNT(*) AS cost FROM test.customer                                                          |
| |       SELECT COUNT(*) AS cost FROM test.nation                                                            |
| |---NestedLoopJoin                                                                                          |
| |   |---IndexSeek test.nation, PRIMARY KEY (n_nationkey) scan:[n_nationkey = customer.c_nationkey]          |
| |   TableScan test.customer, PRIMARY KEY (c_custkey)                                                        |
| +---NestedLoopJoin                                                                                          |
|     |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = nation.n_nationkey] |
|     TableScan test.nation, PRIMARY KEY (n_nationkey)                                                        |
+-------------------------------------------------------------------------------------------------------------+

Joins on the Aggregator

Consider the following schema and row counts:

CREATE TABLE customer(
    c_custkey int not null,
    c_acctbal decimal(15,2) not null,
    primary key(c_custkey)
);
CREATE TABLE orders(
    o_orderkey int not null,
    o_custkey int not null,
    o_orderstatus varchar(20) not null,
    primary key(o_orderkey),
    key(o_custkey)
);

memsql> select count(*) from orders;
+----------+
| count(*) |
+----------+
|   429786 |
+----------+

memsql> select count(*) from orders where o_orderstatus = 'open';
+----------+
| count(*) |
+----------+
|     1000 |
+----------+

memsql> select count(*) from customer;
+----------+
| count(*) |
+----------+
|  1014726 |
+----------+

memsql> select count(*) from customer where c_acctbal > 100.0;
+----------+
| count(*) |
+----------+
|      988 |
+----------+
1 row in set (0.06 sec)

Note that while customer and orders are fairly large, when we filter on open orders and account balances greater than 100 we match relatively few rows. As a result, when we join orders and customer with these filters, we can perform the join on the aggregator. This is shown in the explain by having a separate Gather operator for orders and customer and a HashJoin operator above the Gather in the explain.

memsq> explain SELECT o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Project [orders.o_orderkey]                                      |
| HashJoin [orders.o_custkey = customer.c_custkey]                 |
| |---TempTable                                                    |
| |   Gather partitions:all                                        |
| |   Project [orders_0.o_orderkey, orders_0.o_custkey]            |
| |   Filter [orders_0.o_orderstatus = "open"]                     |
| |   TableScan test3.orders AS orders_0, PRIMARY KEY (o_orderkey) |
| TableScan 0tmp AS customer storage:list stream:yes               |
| TempTable                                                        |
| Gather partitions:all                                            |
| Project [customer_0.c_custkey]                                   |
| Filter [customer_0.c_acctbal > 100.0]                            |
| TableScan test3.customer AS customer_0, PRIMARY KEY (c_custkey)  |
+------------------------------------------------------------------+

By default, MemSQL will perform this optimization when each Gather pulls less than 120,000 rows to aggregator. This threshold can be changed via the max_subselect_aggregator_rowcount variable. We can also manually disable this optimization on this query via the leaf_pushdown hint. The leaf_pushdown hint forces the optimizer to perform as much work as possible on the leaves.

memsql> explain SELECT WITH(LEAF_PUSHDOWN=TRUE) o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

memsql> set max_subselect_aggregator_rowcount=500;

memsql> explain SELECT o_orderkey 
FROM   customer 
JOIN   orders 
where  c_acctbal > 100.0 
AND    o_orderstatus = 'open' 
AND    o_custkey = c_custkey;

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

Managing Concurrency for Distributed Joins

Queries which require cross-shard data movement, i.e. queries which involve Broadcasts and Repartitions, generally require much larger numbers of connections and threads than other queries. The concurrency of such queries is automatically managed by Workload Management.

In most scenarios, the default settings for workload management will schedule your workload appropriately to utilize cluster resources without exhausting connection and thread limits. If you encounter performance problems with distributed join queries run at high concurrency, see the Workload Management page for information about how to configure workload management settings.

Run ANALYZE

The ANALYZE command collects data statistics on a table to facilitate accurate query optimization. This is especially important for optimizing complex analytical queries. It is generally a best practice to ANALYZE all of your tables. See the ANALYZE page for more information.

Was this article useful?