Timeseries in Redis with Streams

Redis is often thought of as a “data structure server”, providing a network interface to a few simple data structure primitives. Streams are the first major new general-purpose data structure since Redis introduced sorted sets many years ago. Let’s take a look at one of the major uses for this new structure: modeling time series data.

Streams: the new Redis data structure

Redis Streams represent an append-only timeseries of key-value pairs.

Any number of clients can write to a Stream and each time they do they receive a unique ascending ID of the item that was inserted into the timeseries.

Clients reading data can block while listening for new data coming in, can maintain a “bookmark” of the last message read for batch processing, or can be organized into more complex “consumer groups” for sharing workloads and acknowledging messages.

Streams are a large topic, so this post will just look briefly at how to use them to model timeseries data in Redis. The new data structure makes this dramatically more simple than previous approaches like using List or Sorted Set types to model timeseries.

For the code examples here we’ll use the ioredis client for node.js:

var Redis = require('ioredis');
var redis = new Redis();

Sending data to a Stream

Streams are append-only, so usually the only details you need are the name of Redis key that you want to write to, and your set of key-value pairs.

I’m going to record measurements from my air quality sensor, sending a stream of key-value pairs to the site:pdx key - specifically I’m sending the current Air Quality Index and the temperature in Celsius:

redis.xadd('site:pdx', '*',
           'aqi', 37,
           'tempc', 5.1).then(function(id) {
  console.log("id:", id);
});

> 1527974818120-0

I send an XADD command whenever there is a new measurement to be recorded. The response is a unique, always-ascending ID that can be used in queries. The first part of the ID, 1527974818120 is the timestamp assigned by the Redis server. The second part of the ID is an incrementing number to avoid collisions when multiple clients may write at the same time.

Supplying a * as the second argument in the write command, as shown above, lets Redis know to store the data with its own timestamp

It’s also possible to provide a timestamp when writing data, but is usually not preferred: letting Redis choose the timestamp allows many clients to write data to a single Stream simultaneously without the clients needing to coordinate about ID selection and ordering – the Redis server just handles those details for the clients.

Simple reading

Once a few values are in the stream you can retrieve a range of values by supplying a range of IDs or timestamps. This could be used by an app displaying the most recent readings of my air quality metrics on a graph:

redis.xrange('site:pdx',
             '1527974818120-0',
             '+',
             'COUNT', 5).then(function(resp) {

  // resp now holds 5 readings, pass them to the open graph:
  // console.log(resp);
});

> [ [ '1543947167906-0', [ 'aqi', '31', 'tempc', '5.1' ] ],
> [ '1543947168312-0', [ 'aqi', '31', 'tempc', '5.3' ] ],
> [ '1543947168901-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947170033-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947171460-0', [ 'aqi', '31', 'tempc', '5.6' ] ] ]

Ranges of numbers can be sampled from anywhere in the Stream, allowing a graphing system to query historical data without performance penalties.

Blocking and polling

Querying ranges of data is useful for graphs and historical monitoring, but sometimes you want to build a system that can respond immediately to data as it comes in - Redis streams are a great fit for this also, using XREAD:

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '$').then(function(resp) {

  // close the windows if aqi > 50
  console.log(resp);
});

A blocking operation like this will wait until data comes in, or until the timeout (1000ms here), so to maintain a continual poll you block as in the example above until data is available, and simply call that same XREAD command again each time after data is received or the command timesout.

To ensure no data is missed between connections to your stream, you can supply the last ID read off of your stream when reading from it, and that way you will pick up exactly where you left off.

In the case above I just wanted whatever data came in on the stream so used the special token ‘$’ to indicate “new data only”; the command looks only slightly different with a supplied ID:

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '1543947171460-0');

You can also read across many different streams at once, returning values from the first stream that accepts data:

redis.xread('BLOCK', 10000,
            'STREAMS',
              'site:pdx', 'site:global',
              '1543947171460-0', '$');

In this example the command will return when any data more recent than ID 1543947171460-0 is written on site:pdx, or when any new data is written on site:global.

Consumer coordination

Once you have a lot of data flowing through a stream, you may want to have multiple copies of consumers processing the inbound messages. These consumers should pick up a message, take an action on it and then “acknowledge” that the work has been done. Redis streams provide primitives for these operations as well - too much to get into here but see the documentation for details.

Quick-reference for stream commands

There are a lot of new Redis commands available for streams! Here’s a quick reference, you can find more detail in the official documentation.

Simple commands:

  • XADD: add an item (a bundle of key-value pairs) to a stream
  • XRANGE and XREVRANGE: select ranges or iterate over stream items
  • XREAD: fetch items more recent than some ID (optionally blocking)
  • XTRIM: discard old items to trim the stream
  • XDEL: remove a specific item from a stream
  • XLEN: count the items in a stream
  • XINFO: inspect stream metadata

Consumer group commands:

  • XGROUP: create, delete, or reset consumer groups, and remove their members
  • XREADGROUP: like XREAD (above), but receive messages using a consumer group
  • XPENDING: inspect messages that have been delivered to a consumer group but not ack’d.
  • XACK: acknowledge a message for a consumer group, removing it from the pending list
  • XCLAIM: take over a message from a dead consumer

Use Cases

The most commonly-cited use case for Streams is IoT workloads where sensors put data on a stream for consumption by consumers to use in many ways (analysis, archival to cold storage, display on graphs). Streams with capped size are a great use case here, allowing you to allocate a fixed size stream with a predictable memory footprint.

Streams are also suitable for use in applications that previously used other Redis data structures. Queueing applications like Celery and Sidekiq could advantage of Streams’ consumer groups to provide inspection of read receipts in a Redis-native way. There are lots of blog posts demonstrating simple chat apps using Redis pubsub which could be made more robust with Redis Streams, since pubsub does not retain messages after they are published to clients.

Ready to try it?

You can try Redis streams on a new Memetria server with just a few clicks.

Last updated 07 Dec 2018. Originally written by Brian P O'Rourke

← Back to docs