Analyzing Time Series Data With MemSQL

Time series data describe sequences of events, with each event labeled with a timestamp. Examples include sequences of events generated by utilities, energy production infrastructure, financial applications, software services, and Internet of Things (IoT) devices. This topic describes how to ingest, structure, and query time series information in MemSQL.

Storing Time Series Data

Time series data can be stored in MemSQL using rowstore or columnstore tables. Each row should have a time-valued attribute to hold the event time. The timestamp attribute should normally be declared to have a data type of datetime(6). Use the datetime attribute if resolution of fractional seconds for timestamps is not needed. Do not use the timestamp data types for time series information because they are automatically updated by the system, and typically you will want to have your application provide the timestamp directly. By convention, it is common to use the column name ts for the time attribute.

Here’s an example of a table created to hold a time series for events coming from a wind turbine.

CREATE TABLE turbine_reading(
  tid int NOT NULL, -- turbine ID
  ts datetime(6) NOT NULL,
  rpm double,
  temperature double,
  vibration double,
  output double,
  wind_direction double,
  wind_speed double,
  SHARD(tid),
  KEY(ts)
);

This table is a rowstore. Rowstore is a good starting point for managing time series data, as long as it will fit in available RAM. If your data is expected to become larger than RAM, use a columnstore table. Columnstores are disk-based and can thus handle larger data sets. In addition, they provide the highest possible performance for analytical queries that process large amounts of data.

It is recommended to create a KEY on the timestamp column ts since it is common to query time series data by filtering on ranges of values. This KEY definition for a rowstore table will create an index on ts so that range filters on ts can be processed efficiently. The KEY definition on ts for a columnstore table will cause the table to be kept in order by ts so range filters on ts can also be processed efficiently using segment elimination.

For time series data sets with large numbers of attributes, where it is very common to retrieve all attributes of a table in the application, and the data is not bigger than the available table RAM you can provide, use a rowstore table to store time series data. Rowstore query processing can seek to find small numbers of rows in a narrow time range efficiently, and also can assemble rows with large numbers of attributes for return the the client application more efficiently than columnstore query processing.

The tables used to store time series events are very similar to fact tables used in data warehouses and data marts. The terms event table and fact table may be used interchangeably.

Descriptive Data

For descriptive property information about time series elements that is static from one element to the next, it’s recommended to normalize this information into another table. For example, information about individual turbines could be kept in a separate table like this:

CREATE REFERENCE TABLE turbine(
  tid int, 
  name varchar(60),
  model varchar(60),
  max_output double,
  lattitude double, 
  longitude double,
  PRIMARY KEY(tid)
);

For small collections of descriptive properties, use a reference table. For larger ones, use a standard (partitioned) table.

A descriptive data table like the one described above can be thought of as a dimension table that is linked to the fact table containing the time series events. Dimensional modelling concepts used in data warehouses also apply for time series data.

In examples below, we’ll use the following data in the table turbine:

INSERT INTO turbine VALUES
  (1, 'Hood River A', 'Volkswind Mega 5', 5.0, 47.130, 113.187),
  (2, 'Hood River B', 'Volkswind Mega 5+', 5.3, 47.141, 113.199);

Ingesting Time Series Data

For bulk loading of historical collections of time series data, use the LOAD DATA command. For ingesting time series data from files or Kafka queues, use pipelines. For single rows or smaller batches of rows coming from an application, you can use INSERT operations. MemSQL can load data very efficiently using any of these mechanisms.

Querying Time Series Data

Continuing the wind turbine example from above, suppose the following data is added to the turbine_reading table:

INSERT turbine_reading VALUES
  (1, '2020-03-14 13:00:33', 10, 33, 100, 1000000, 90, 15),
  (1, '2020-03-14 13:00:34', 10, 33, 100, 1000000, 90, 15),
  (1, '2020-03-14 13:00:35', 11, 33, 105, 1050000, 91, 16),
  (1, '2020-03-14 13:00:36', 11, 33.1, 104, 1000000, 90, 16),
  (2, '2020-03-14 13:00:33', 18, 30, 170, 2000000, 0, 23),
  (2, '2020-03-14 13:00:34', 18, 30, 170, 2000000, 0, 23),
  (2, '2020-03-14 13:00:35', 18.5, 30, 176, 2050000, 0, 23.5),
  (2, '2020-03-14 13:00:36', 19, 30.1, 174, 2070000, 1, 23.6),
  (1, '2020-03-15 13:00:33', 11, 32, 99, 1010000, 45, 15.1),
  (1, '2020-03-15 13:00:34', 11, 32, 99, 1020000, 45, 15.2),
  (1, '2020-03-15 13:00:35', 12, 32.1, 101, 1030000, 45, 15.2),
  (1, '2020-03-15 13:00:36', 13, 32.15, 102, 1030000, 46, 15.2);

The following query illustrates how to compute a simple average aggregate over all time series values in the table.

-- average RPM by turbine
SELECT tid, AVG(rpm)
FROM turbine_reading
GROUP BY tid;

+-----+----------+
| tid | AVG(rpm) |
+-----+----------+
|   2 |   18.375 |
|   1 |   11.125 |
+-----+----------+

Time Bucketing

The following queries illustrate how to perform “time bucketing” to aggregate and group data for different time series by a fixed time interval. Bucketing by day can be easily accomplished by casting a high-resolution datetime(6) value to a date type. Bucketing by a number of seconds N can be done by first converting to a unix timestamp (number of seconds since the logical starting point of time or “epoch”), dividing the result by N with the integer division operator DIV, then multiplying again by N, and converting back to a timestamp value. Using DIV by N and then multiplying by N returns a number divisible by N; the remainder is eliminated. This provides a standardized time useful as the beginning of a time bucket.

-- Find high, low, and average output for each turbine, bucketed by day,
-- sorted by day.
SELECT tid, ts :> date, MIN(output), MAX(output), AVG(output)
FROM turbine_reading
GROUP by 1, 2
ORDER BY 1, 2;

+-----+------------+-------------+-------------+-------------+
| tid | ts :> date | MIN(output) | MAX(output) | AVG(output) |
+-----+------------+-------------+-------------+-------------+
|   1 | 2020-03-14 |     1000000 |     1050000 |     1012500 |
|   1 | 2020-03-15 |     1010000 |     1030000 |     1022500 |
|   2 | 2020-03-14 |     2000000 |     2070000 |     2030000 |
+-----+------------+-------------+-------------+-------------+

-- Find high, low, and average output for each turbine, 
-- bucketed by three second intervals, sorted by interval start time.

SELECT tid, 
       from_unixtime(unix_timestamp(ts) DIV 3 * 3) as ts,
       MIN(output), MAX(output), AVG(output)
FROM turbine_reading
GROUP by 1, 2
ORDER BY 1, 2;

+-----+---------------------+-------------+-------------+--------------------+
| tid | ts                  | MIN(output) | MAX(output) | AVG(output)        |
+-----+---------------------+-------------+-------------+--------------------+
|   1 | 2020-03-14 13:00:33 |     1000000 |     1050000 | 1016666.6666666666 |
|   1 | 2020-03-14 13:00:36 |     1000000 |     1000000 |            1000000 |
|   1 | 2020-03-15 13:00:33 |     1010000 |     1030000 |            1020000 |
|   1 | 2020-03-15 13:00:36 |     1030000 |     1030000 |            1030000 |
|   2 | 2020-03-14 13:00:33 |     2000000 |     2050000 | 2016666.6666666667 |
|   2 | 2020-03-14 13:00:36 |     2070000 |     2070000 |            2070000 |
+-----+---------------------+-------------+-------------+--------------------+

For convenience, you can create time bucketing functions. For example the following functions bucket by second and minute, respectively.

DELIMITER //
CREATE OR REPLACE FUNCTION bucket_s(n int, ts datetime(6)) 
RETURNS datetime AS
BEGIN
  RETURN from_unixtime(unix_timestamp(ts) DIV n * n); 
END //

CREATE OR REPLACE FUNCTION bucket_m(n int, ts datetime(6)) 
RETURNS datetime AS
BEGIN
  RETURN from_unixtime(unix_timestamp(ts) DIV (60 * n) * (60 * n)); 
END //
DELIMITER ;

For example, you can use bucket_m(5, ts) to find the average time series value grouped by 5 minute intervals, as follows:

SELECT tid, bucket_m(5, ts), AVG(output)
FROM turbine_reading
GROUP BY 1, 2
ORDER BY 1, 2;

Smoothing

Time series can be smoothed using AVG as a windowed aggregate. For example, the following query yields output and the moving average of output over a two-element window, on a specified date.

SELECT tid, ts, output, AVG(output) OVER w
FROM turbine_reading
WHERE DATE(ts) = '2020-03-14'
WINDOW w as (PARTITION BY tid ORDER BY ts 
             ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY 1, 2;

+-----+----------------------------+---------+--------------------+
| tid | ts                         | output  | AVG(output) OVER w |
+-----+----------------------------+---------+--------------------+
|   1 | 2020-03-14 13:00:33.000000 | 1000000 |            1000000 |
|   1 | 2020-03-14 13:00:34.000000 | 1000000 |            1000000 |
|   1 | 2020-03-14 13:00:35.000000 | 1050000 |            1025000 |
|   1 | 2020-03-14 13:00:36.000000 | 1000000 |            1025000 |
|   2 | 2020-03-14 13:00:33.000000 | 2000000 |            2000000 |
|   2 | 2020-03-14 13:00:34.000000 | 2000000 |            2000000 |
|   2 | 2020-03-14 13:00:35.000000 | 2050000 |            2025000 |
|   2 | 2020-03-14 13:00:36.000000 | 2070000 |            2060000 |
+-----+----------------------------+---------+--------------------+

Finding a Row Current AS OF a Point in Time

A common operation on time series data is to find the row that is current AS OF a point in time. You can do this with a query that uses ORDER BY and LIMIT as follows.

-- find turbine reading for tid 1 that is current
-- AS OF 2020-03-14 13:00:35.5
SELECT *
FROM turbine_reading
WHERE ts <= '2020-03-14 13:00:35.5'
AND tid = 1
ORDER BY ts DESC
LIMIT 1;

+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+
| tid | ts                         | rpm  | temperature | vibration | output  | wind_direction | wind_speed |
+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+
|   1 | 2020-03-14 13:00:35.000000 |   11 |          33 |       105 | 1050000 |             91 |         16 |
+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+

You can use EXPLAIN to see the query plan for the query above. It is efficient because it seeks the index on ts and scans in reverse order.

To find the current row for each turbine as of a specific point in time, you can use a stored procedure, as shown below.

DELIMITER //
CREATE OR REPLACE PROCEDURE get_turbine_readings_as_of(_ts datetime(6))
AS
DECLARE 
  q_turbines QUERY(tid int) = SELECT tid FROM turbine;
  a ARRAY(RECORD(tid int));
  _tid int;
BEGIN
  DROP TABLE IF EXISTS r;
  CREATE TEMPORARY TABLE r LIKE turbine_reading;

  a = COLLECT(q_turbines);
  FOR x IN a LOOP
    _tid = x.tid;
    INSERT INTO r
      SELECT *
      FROM turbine_reading t
      WHERE t.tid = _tid
      AND ts <= _ts
      ORDER BY ts DESC
      LIMIT 1;
  END LOOP;
  ECHO SELECT * FROM r ORDER BY tid;
  DROP TABLE r;
END //
DELIMITER ;

CALL get_turbine_readings_as_of('2020-03-14 13:00:35.5');

+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+
| tid | ts                         | rpm  | temperature | vibration | output  | wind_direction | wind_speed |
+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+
|   1 | 2020-03-14 13:00:35.000000 |   11 |          33 |       105 | 1050000 |             91 |         16 |
|   2 | 2020-03-14 13:00:35.000000 | 18.5 |          30 |       176 | 2050000 |              0 |       23.5 |
+-----+----------------------------+------+-------------+-----------+---------+----------------+------------+

Managing the Life Cycle of Time Series Data

You can manage the life cycle of time series data by first moving it from a row store table to a column store table as it ages if the data becomes larger than available memory, and then ultimately removing data that is no longer needed using the DELETE statement.

Interpolation

You may have a time series with gaps that you wish to fill, so that there is a data point at every point in time using your chosen time granularity. For example, you might want to have a data point every second. A common way you may get a time series with missing points is when you convert a time series with points at irregular intervals (an irregular time series) to one with data points at regular intervals (a regular time series) by bucketing data at your chosen interval. For example, if you have data point arriving at random approximately once every half second, there may be seconds with no data arriving. This can cause gaps when you bucket to one second intervals.

You can interpolate missing points using a stored procedure. This is illustrated in the following example by using a simple set of stock ticks for data points that are missing when the original data is already bucketed to one second intervals.

DROP TABLE IF EXISTS tick;
CREATE TABLE tick(ts datetime(6), symbol varchar(5), 
   price numeric(18,4));
INSERT INTO tick VALUES
  ('2019-02-18 10:55:36.000000', 'ABC', 100.00),
  ('2019-02-18 10:55:37.000000', 'ABC', 102.00),
  ('2019-02-18 10:55:40.000000', 'ABC', 103.00),
  ('2019-02-18 10:55:42.000000', 'ABC', 104.00);

DELIMITER //
CREATE OR REPLACE PROCEDURE driver() AS
DECLARE 
  q query(ts datetime(6), symbol varchar(5), price numeric(18,4));
BEGIN
  q = SELECT ts, symbol, price FROM tick ORDER BY ts; 
  ECHO SELECT 'Input time series' AS message;
  ECHO SELECT * FROM q ORDER BY ts;
  ECHO SELECT 'Interpolated time series' AS message;
  CALL interpolate_ts(q);
END //
DELIMITER ;

DELIMITER //
CREATE OR REPLACE PROCEDURE interpolate_ts(
  q query(ts datetime(6), symbol varchar(5), price numeric(18,4)))
    -- Important: q must produce sorted output by ts
AS 
DECLARE
  c array(record(ts datetime(6), symbol varchar(5), price numeric(18,4)));
  r record(ts datetime(6), symbol varchar(5), price numeric(18,4));
  r_next record(ts datetime(6), symbol varchar(5), price numeric(18,4));
  n int;
  i int; 
  _ts datetime(6); _symbol varchar(5); _price numeric(18,4);
  time_diff int;
  delta numeric(18,4);
BEGIN
  DROP TABLE IF EXISTS tmp;
  CREATE TEMPORARY TABLE tmp LIKE tick;
  c = collect(q);
  n = length(c);
  IF n < 2 THEN
    ECHO SELECT * FROM q ORDER BY ts;
    return;
  END IF;
  
  i = 0;
  r = c[i];
  r_next = c[i + 1];

  WHILE (i < n) LOOP
    -- IF at last row THEN output it and exit
    IF i = n - 1 THEN
      _ts = r.ts; _symbol = r.symbol; _price = r.price;
      INSERT INTO tmp VALUES(_ts, _symbol, _price);
      i += 1;
      CONTINUE;
    END IF;
       
    time_diff = unix_timestamp(r_next.ts) - unix_timestamp(r.ts);

    IF time_diff <= 0 THEN
      RAISE user_exception("time series not sorted or has duplicate timestamps");
    END IF;

    -- output r
    _ts = r.ts; _symbol = r.symbol; _price = r.price;
    INSERT INTO tmp VALUES(_ts, _symbol, _price);

    IF time_diff = 1 THEN
      r = r_next; -- advance to next row
    ELSIF time_diff > 1 THEN
      -- output time_diff-1 rows by extending current row and interpolating price
      delta = (r_next.price - r.price) / time_diff;
      FOR j in 1..time_diff-1 LOOP
        _ts += 1; _price += delta;
        INSERT INTO tmp VALUES(_ts, _symbol, _price);
      END LOOP;
      r = r_next; -- advance to next row
    ELSE 
      RAISE user_exception("time series not sorted");
    END IF;

    i += 1;
    IF i < n - 1 THEN r_next = c[i + 1]; END IF;
  END LOOP;
  ECHO SELECT * FROM tmp ORDER BY ts; 
  DROP TABLE tmp;
END //
DELIMITER ;

The output of the driver() procedure is as follows:

memsql> CALL driver();
+-------------------+
| message           |
+-------------------+
| Input time series |
+-------------------+
1 row in set (0.02 sec)

+----------------------------+--------+----------+
| ts                         | symbol | price    |
+----------------------------+--------+----------+
| 2019-02-18 10:55:36.000000 | ABC    | 100.0000 |
| 2019-02-18 10:55:37.000000 | ABC    | 102.0000 |
| 2019-02-18 10:55:40.000000 | ABC    | 103.0000 |
| 2019-02-18 10:55:42.000000 | ABC    | 104.0000 |
+----------------------------+--------+----------+
4 rows in set (0.06 sec)

+--------------------------+
| message                  |
+--------------------------+
| Interpolated time series |
+--------------------------+
1 row in set (0.16 sec)

+----------------------------+--------+----------+
| ts                         | symbol | price    |
+----------------------------+--------+----------+
| 2019-02-18 10:55:36.000000 | ABC    | 100.0000 |
| 2019-02-18 10:55:37.000000 | ABC    | 102.0000 |
| 2019-02-18 10:55:38.000000 | ABC    | 102.3333 |
| 2019-02-18 10:55:39.000000 | ABC    | 102.6666 |
| 2019-02-18 10:55:40.000000 | ABC    | 103.0000 |
| 2019-02-18 10:55:41.000000 | ABC    | 103.5000 |
| 2019-02-18 10:55:42.000000 | ABC    | 104.0000 |
+----------------------------+--------+----------+
7 rows in set (0.16 sec)

The gaps between 37 and 40 seconds and 40 and 42 seconds have been filled in with data points that are linearly interpolated.

Supplemental Material

Additional time series examples are given in the following MemSQL blog on time series: What MemSQL Can Do For Time Series Applications.

These examples include a method for creating candlestick charts with window functions, a general function for convenient time bucketing, and FIRST and LAST user-defined aggregate functions that can be used as regular aggregates, not just window functions.

Was this article useful?