Understanding Operations on Encoded Data

MemSQL performs some query processing operations directly on encoded data in columnstore tables. Columnstore data is stored encoded in a number of different forms, including dictionary encoding, run-length encoding, and value encoding. For these encodings, certain operations, including filtering and aggregation, can be run very efficiently, through the use of special coding techniques, as well as Single-Instruction, Multiple-Data (SIMD) instructions on processors that support the Intel AVX2 instruction set.

SIMD support on your hardware is not required to benefit from operations on encoded data. Performance may speed up for operations on encoded data anywhere from a few times to 30 times without SIMD support. Use of SIMD may give an additional increase in performance. A gain of another 2 to 3 times due to SIMD is not unusual. Your results will depend on your data and queries. Portions of larger queries may be done without operating on encoded data, so the end-to-end speedup you experience may vary.

Operations on encoded data are performed automatically, by default. You don’t need to change any settings to benefit from it. The decision to use operations on encoded data is made by the query execution system at runtime, not by the query optimizer.

What Is Encoded Data and What Does It Mean to Operate on It?

MemSQL supports several different kinds of data encodings to help compress data. All columnstore data is said to be encoded in one way or another. Common encodings are run-length, dictionary, and LZ4. Only certain encodings can be processed directly, i.e. “operated on” directly. These include dictionary encoding, run-length encoding, and integer value encoding. For example, with dictionary encoding, for a segment of a string column with only 3 distinct values, a 2-bit ID number is stored for each string. The ID numbers are used as references into the dictionary. These IDs are packed tightly together when stored in the columnstore.

For example, consider this dictionary:

ID value
0 red
1 green
2 blue

This can be represented as a bit vector of packed string IDs when it is stored into the column segment:

  • Strings: “red”, “blue”, “green”, “green”, “red”
  • String IDs: 0, 2, 1, 1, 0
  • Bit vector: 00 10 01 01 00 (2 bits per string ID)

As an example of operating directly on encoded data, MemSQL can perform a filter operation, say “t.a = ‘xyz’”, on a string dictionary-encoded column segment by first finding the result of the filter for every entry in the dictionary. Then, while scanning the segment, the query execution system simply takes the encoded ID number for each value in t.a and uses it to look up the result of the string comparison for that ID that was computed in the initial scan of the dictionary. That tends to be much quicker than doing an actual string comparison. Other kinds of operations can also be done directly on encoded data ID values, including the comparison of ID values needed to do group-by operations. The details of these are beyond the scope of this documentation.

Most query processing in MemSQL outside of columnstore scan is done row-at-a-time. Columnstore processing on encoded data is done in a vectorized fashion, where large batches of data from a column are processed in one or more relatively simple loops. These loops tend to be more friendly to modern CPUs than row-at-a-time processing, resulting in lower instruction count, better cache usage, and improved efficiency of the processor’s instruction pipeline.

Queries That Can Benefit from Operations on Encoded Data

Queries that can benefit from operations on encoded data are those that run over columnstore tables, and have one or more of these components:

  • Filters
  • Group-by
  • Aggregates
  • Expressions in aggregates
  • Expressions in group-by
  • Star joins on integer columns

Operations on encoded data can be done regardless of the number of distinct values of the encoded columns being processed. Performance tends to be better when the number of distinct values is smaller. This is because when dictionaries are smaller the columnstore data is compressed to a smaller size, and intermediate working lookup tables fit more easily into the cache memory of the processor.

Here’s an example of a query that can benefit from operations on encoded data in several ways.

select
  dayofweek(shipdate), location,
  sum(price * (1 - discount)), avg(quantity)
from lineitem
where
  status = 'S'
group by 1, 2;

Here’s another example of a query that can benefit from join operations on encoded data, if the table f is a columnstore.

select d1.x, d2.y, sum(f.m)
from f, d1, d2
where f.d1_key = d1.key
and f.d2_key = d2.key
and d1.v > 100
and d2.v in (1, 2, 3) 
group by d1.x, d2.y

This query is a simple example of a star join. If you are using a star schema (with a fact table linked to multiple dimension tables), and the joins are on integer columns, you can benefit from operations on encoded data. Use of a group by clause and aggregate functions is required to benefit from join operations on encoded data.

Examples

The following example shows encoded group-by in action:

create database db;
use db;
drop table if exists r;
drop table if exists is_tables;

/* Extract rows for 50 different tables into a scratch table, is_tables. */
create table is_tables as 
select * from information_schema.tables 
limit 50;

/* Create a table with a million rows, with every table name 
   appearing in most or all segments. */
create table r (index using clustered columnstore(n)) as
select s.*, (row_number () over (order by s.table_name) % 1000) as n
from (select t1.* 
      from is_tables t1, is_tables t2, is_tables t3, is_tables t4 
      limit 1000000) as s;
```sql
The table r created above is a columnstore with one million rows. You can see the 
encoding of the TABLE_NAME column using this query:
```sql
select count(*), encoding, column_name 
from information_schema.columnar_segments 
where table_name  = 'r' and database_name  = 'db' 
and column_name = 'TABLE_NAME'
group by column_name, encoding

The result indicates that StringDictionary encoding is used for the column.

Now, run the following group-by/aggregate query that groups on the table_name column, in profile mode:

profile select table_name, count(*) from r group by table_name;

Finally, output the JSON profile for the query:

show profile json;

The result includes the following text:

inputs":[
    {
        "executor":"ColumnStoreScan",
        "db":"db",
        "table":"r",
        ...
        "segments_scanned":{ "value":16, "avg":2.000000, "stddev":0.000000, "max":2, "maxPartition":0 },
        "segments_skipped":{ "value":0 },
        "segments_fully_contained":{ "value":0 },
        "segments_encoded_group_by":{ "value":16, "avg":2.000000, "stddev":0.000000, "max":2, "maxPartition":0 },
        "inputs":[]
    }

Notice the portion labeled “segments_encoded_group_by”. This part shows that 16 segments were processed in total and all of them were processed with encoded group-by operations. These operations are taking place directly on encoded data.

Now, here is another query (run in profile mode) that is similar to the first, but also has a filter on column table_name:

profile select table_name, count(*) from r 
where table_name like '%COLUMN%'
group by table_name;

show profile json;

The JSON profile for this query contains the text below:

"inputs":[
    {
        "executor":"ColumnStoreScan",
        "db":"db",
        "table":"r",
        ...
        "segments_scanned":{ "value":16, "avg":2.000000, "stddev":0.000000, "max":2, "maxPartition":0 },
        "segments_skipped":{ "value":0 },
        "segments_fully_contained":{ "value":0 },
        "segments_ops_compressed_data":{ "value":16, "avg":2.000000, "stddev":0.000000, "max":2, "maxPartition":0 },
        "segments_encoded_group_by":{ "value":16, "avg":2.000000, "stddev":0.000000, "max":2, "maxPartition":0 },
        "inputs":[]
    }

Notice that it includes sections for both “segments_ops_compressed_data” and “segments_encoded_group_by”. The “segments_ops_compressed_data” section indicates that 16 segments had filters applied on encoded data.

The final decision about use of operations on encoded data is made at runtime and depends on column encodings for each segment and (for group-by) the number of distinct values in the segment. So the segments_ops_compressed_data and segments_encoded_group_by values may be less that the number of segments scanned minus the number skipped. In this case, the “encoded_group_by_bailout” section will indicate the reasons that encoded group by was disabled as well as the number of segments disabled for each reason.

Encoded joins

This example shows encoded joins in action.

/* Create a simulated dimension table with 1000 rows with unique
   values for n, and around 100 categories. */
create table d as 
select n, max(concat('cat', n % 100)) as category
from r 
group by n;

/* Join dimension table d to "fact" table r on an integer column
   and group by category. This is a very simple star join. */
select d.category, count(*)
from r, d
where r.n = d.n
and d.category like 'cat1%'
group by d.category;

The above join query takes only 0.02 seconds on a two-core laptop. Yet is is fully scanning the million-row table r and joining it with 110 rows from d.

You can see that encoded joins are being used in the profile query execution plan for the above statement by using MemSQL Studio’s graphical plan display or show profile json output. On the HashJoin operator you will see the property "encoded_join_enabled":"yes". In addition, you can see that operations on encoded data are pushed to the ColumnStoreScan operator since it has the property segments_encoded_group_by with value set to 8. The HashJoin, ColumnStoreScan, and various GroupBy operators can work together for a star join with a sequence of one or more hash joins over the scan.

Example Performance Results

Operations on encoded data can yield some astonishingly short query execution times. Below are example queries and runtimes, on tables with 25 to 50 million rows of data, running on a single core.

query time
select count(*) from y where b in (‘thomas1’,‘thomas3’,‘thomas5’) 0.424s
select count(*) from col320 group by b {320 groups} 0.154s
select sum(j) from col10 group by b {10 groups} 0.053s
select count(*) from col10 where c < 900000 group by b {90% selectivity} 0.114s
select count(*) from col10 where c < 100000 group by b {10% selectivity} 0.097s

Best case performance in other internal tests have shown a processing rate of over one billion rows per second per core for queries like the following, when column a has eight or fewer distinct values, using a recent model high-performance Intel processor.

select a, count(*) from t group by a

These results can be extrapolated to much larger data sets on a large MemSQL cluster, although memory bandwidth limits may limit total throughput in rows per second per core across the cluster, below the best-case numbers shown here. Your results will of course depend on your data, hardware, and queries.

Performance Considerations

Because operations on encoded data can process data so fast, their performance can become limited by the bandwidth of main memory (RAM). And the bandwidth of RAM is normally far higher than the bandwidth of your I/O system. E.g. RAM bandwidth could be 50 GB/sec and the bandwidth of an SSD could be only 600 MB/sec.

So, it is recommended that you configure your system so that little or no I/O is being done while processing columnstore data, under a steady query workload. You can do this by ensuring that the operating system’s file system buffer cache has enough main memory to hold the working set of your columnstore data, i.e. the segments of columns that are frequently accessed. A good rule of thumb is to have enough RAM in the file system buffer cache to hold at least 20% of your compressed columnstore data. If your workload does a lot of full table scans that touch all the columns of the table, you may want to consider adding more than that.

The Linux operating system will allocate available system memory to the file system buffer cache in an on-demand way, without the need to set any configuration knobs. Simply ensure that there is sufficient memory remaining after all the memory dedicated to MemSQL, the operating system, and any other applications running on the hardware.

Relevant Hints

MemSQL can perform group-by on encoded data using a HashGroupBy operation, and can also do a group-by on columnstore data using a StreamingGroupBy operation on the sort key of the columnstore. Due to the improvement of HashGroupBy with the introduction of operations on encoded data, it may be the case that HashGroupBy is faster than StreamingGroupBy, yet the query optimizer may choose StreamingGroupBy. If this occurs and it is important for your workload to tune the query to get better performance, you can use the disable_ordered_scan query hint. For example:

select sum(a) from t with (disable_ordered_scan=true) group by b

You can use EXPLAIN to see the plan for your query to check the type of group-by operation chosen.

Data Encodings Supported

Operations on encoded data only are supported for these cases:

  • String columns with string dictionary and string run-length encodings
  • Integer columns with value and run-length encodings

Group-by/aggregate operations on encoded data are supported for all these encodings. Filter acceleration on encoded data is supported for strings and integers.

It is recommended that you allow MemSQL to choose automatically how to encode data for columnstores. However, in the rare event that it chooses an encoding that does not support operations on encoded data, and it is important to you for your application that operations on encoded data be performed, you may wish to force the encoding chosen. You can do this with the option '<encoding>' notation. For example:

create table t (a int, b varchar(50) option 'StringDictionary', 
  key(a) using clustered columnstore);

Operations Supported, and Limitations

To summarize the operations supported on encoded data, as well as limitations to operations on encoded data, they include:

  • Scan
    • Faster decoding for integer encoding
  • Filter
    • Filters on the following:
    • Strings for string dictionary and string run-length encoding
    • Integers for run-length encoding
    • Or-of-ands of filter expressions; filter expressions must involve a single string column for faster processing
    • Support for Bloom filters on string columns (elimination of rows with no matches for joins on a single string column)
  • Aggregates

    • Aggregates supported: sum, min, max, count, any
    • Aggregate input data types supported: all numeric
    • Aggregate expressions supported: multiple table columns allowed in a single expression
    • Aggregates encodings supported: faster processing for integer encoding but all encodings supported
  • Group-by:

    • Operations on encoded data are not done for aggregates without a group-by (a.k.a. scalar aggregates)
    • Group-by count star is very fast
    • Group-by:
      • multiple group-by columns are allowed
      • a mix of columns and expressions is allowed
      • group-by columns must only use these encodings: string dictionary, string run-length, integer, integer run-length
      • there is a limit on number of distinct values per column of a few thousand rows, beyond which, the system reverts to row-at-a-time processing and the local aggregation of data for a row segment will output rows to the parent global aggregate operator.
      • similarly, there is a limit on total number of groups, of a few thousand groups, beyond which the optimized group-by processing is not used
      • group-by on the sort key may not be as efficient as group-by on other columns because ordered group-by may be performed while hash groupby might be better.
    • Group-by on expressions:
      • only one input column is allowed in a single expression
      • the expression result must be integer
      • each table column can be used only once in the set of group-by columns and expressions
  • Joins:

    • joins must be on some type of integer column, or another column type represented internally as an integer, such as datetime
    • the join or joins selected by the query optimizer must be of type HashJoin
    • the HashJoin (or a sequence of HashJoins) must appear over a ColumnStoreScan
    • a Group By operation and aggregate must be present in the query over the result of the join

Encoded group by bailout reasons

The final decision on whether to use encoded group by is made at runtime. The possible bailout reasons are as follows:

  • Unsupported column encoding
  • Aggregate not guaranteed to fit in aggregate type (the result may overflow the internal integer or decimal type)
  • Reached dynamic dictionary size limit (too many unique group by values)
  • Group by column cannot be placeholder blob (column does not yet exist in columnstore)

Additional Tuning Considerations

If you have upgraded from a MemSQL release below 6.0, you can benefit immediately from operations on encoded data. There is no need to rebuild your columnstore tables. However, columnstore encoding choices have changed slightly in the 6.0 release, with dictionary encoding being more preferred. So queries may run faster on newly loaded data. Encodings are chosen separately for each segment, so even if you don’t use OPTIMIZE on your columnstore tables, as new data is inserted and older data is removed, the encodings chosen will naturally evolve to be those preferred by the latest version of MemSQL.

If you find that operations on encoded data are not occurring as much as you expect them to you can rebuild the columnstore table involved. Before doing this, you can query information_schema.columnar_segments as described earlier to see if dictionary, run-length, or integer encodings are not used. If not, then consider rebuilding the columnstore table.

One way to rebuild it is to use

create table newTable (index using clustered columnstore(<columns>)) as 
select *
from oldTable;

Then drop oldTable and rename newTable to oldTable. Alternatively you can use OPTIMIZE TABLE… FULL. However, be aware that this can take significantly longer than loading the data in the first place, because it cause a single sorted run to be created.

Scalar count(*)

Because encoded group-by is so fast, and scalar aggregates are not done on encoded data, you may find the following unexpected behavior. A query like this:

select a, count(*) from t group by a;

may be faster than this:

select count(*) from t;

Although it is usually not necessary because count(*) scalar aggregates are so fast anyway, if you want the fastest possible count(*), consider this workaround:

select sum(t2.c) 
from (select a, count(*) as c from t group by a) as t2;

The subquery will be done with encoded group-by, so the containing query may run faster that a scalar count(*);

Aggregation of Decimal Types

When creating decimal type columns that are aggregated in queries, for best performance, it is recommended to use precision of 18 digits or less if your application can accommodate this (i.e. the extra precision is not needed to represent meaningful information). Decimal values of 18 digits or less can be processed more efficiently than others because they can be handled internally as 64-bit integers during aggregation. Processing these 64-bit values takes much less time than interpreting decimal values of more than 18 digits, which must be handled in a more general way.

Was this article useful?