Introduction
Fin 2018, Redis version 5 sortait et on découvrait alors une nouvelle structure de données : les streams (flux).
Conformément à la philosophie de Redis, ce nouveau type est versatile et se plie à une gamme de problèmes variée. Ainsi vais-je présenter dans cette courte série de billets les streams sous trois facettes : en tant que Pub/Sub (ce billet), en tant que queue de tâches distribuée (messaging queue) et enfin en tant que tampon d’écriture devant une base de données (write buffer).
Avoir une connaissance préalable des patterns classiques d’un système distribué (Pub/Sub et Messaging Queues) n’est pas requis. Les concepts sont introduits au fil de la rédaction (de façon très sommaire, cependant).
Principes généraux des streams
Un stream est une séquence ordonnée d’éléments avec une particularité notable : chaque nouvel élément du stream doit toujours être apposé au bout de la séquence. Il n’est ainsi pas possible d’en ajouter un au début ou au milieu du stream. C’est une structure append-only.
Les éléments du stream sont des structures associatives clé-valeur simples (sans imbrication).
Redis ordonne les éléments d’un stream grâce à un ID unique assigné pour chacun d’entre eux. Cet ID ne peut que croître au fur et à mesure que l’on ajoute des éléments, même s’il n’y a aucun impératif quant à l’intervalle entre deux ID subséquents. Dans la pratique, on laisse Redis fixer automatiquement l’ID d’un nouvel élément. Il prend alors le timestamp Unix du moment, exprimé en millisecondes.
Ajoutons l’élément {"message": "Coucou", "auteur": "sgble"}
dans le stream dénommé mon_stream
. L’ID spécial *
indique à Redis de choisir lui-même un ID, comme évoqué précédemment.
Exemple (CLI Redis) :
127.0.0.1:6379> XADD mon_stream * message Coucou auteur sgble
"1615046454218-0"
127.0.0.1:6379> XADD mon_stream * message "Coucou 2" auteur sgble
"1615046464715-0"
Redis répond avec l’ID qu’il a choisi, soit par défaut le timestamp en millisecondes au moment de l’insertion. Mais on note que les ID ont un suffixe de la forme -0
. Ce suffixe permet de gérer le cas des multiples insertions qui auraient lieu sur la même milliseconde (ce qui est bien possible compte tenu la vélocité de Redis !).
Avec XRANGE
, on lit un intervalle entier entre deux ID dans un stream. La structure de données est optimisée pour être interrogée de cette façon1 :
127.0.0.1:6379> XRANGE mon_stream 0 + # intervalle des ID 0 à +
1) 1) "1615046454218-0"
2) 1) "message"
2) "Coucou"
3) "auteur"
4) "sgble"
2) 1) "1615046464715-0"
2) 1) "message"
2) "Coucou 2"
3) "auteur"
4) "sgble"
Le +
est un ID spécial qui désigne le dernier élément du stream. Dans l’exemple ci-dessus, il est donc question d’obtenir tous les éléments du stream, soit de 0
(ID minimal possible) jusqu’à +
.
Pour ne récupérer qu’un seul élément, on indique deux bornes identiques et égales à l’ID ciblé, par exemple :
127.0.0.1:6379> XRANGE mon_stream 1615046464715-0 1615046464715-0
1) 1) "1615046464715-0"
2) 1) "message"
2) "Coucou 2"
3) "auteur"
4) "sgble"
À première vue, les streams sont une sorte de simple structure pour organiser un journal d’évènements. Nous pourrions nous en tenir là et déjà imaginer quelques cas d’utilisation assimilés à de la journalisation partagée entre plusieurs systèmes, comme l’implémentation d’un système de messagerie ou d’événements, par exemple.
Mais allons plus loin encore et présentons d’autres commandes qui nous permettront de faire bien plus.
Pub/Sub : un système de surveillance
Esquissons une solution pour le problème suivant : un restaurant possède plusieurs chambres et réfrigérateurs dans lesquels des denrées doivent être entreposées dans un certain intervalle de température pour des raisons de qualité et d’hygiène. Chaque emplacement dispose d’un capteur tout-en-un qui peut transmettre à intervalle régulier diverses informations comme la température et le taux d’humidité dans un stream de Redis appelé room_temp
.
Pour chaque température rapportée, on veut exécuter certaines actions :
- vérifier que l’on est dans l’intervalle prévu sur la température et l’humidité ;
- à défaut, prévenir immédiatement les responsables par email ;
- enregistrer chaque mesure de façon durable dans une base de données SQL qui sert à tracer des graphiques plus tard.
Implémentons notre solution avec un modèle Pub/Sub (pour publishers et subscribers, soit éditeurs et abonnés en français). Les systèmes équipés de capteurs qui renseignent leur température dans Redis sont les éditeurs, tandis que les programmes chargés d’effectuer les diverses actions qui suivent sont les abonnés. Redis agit en qualité de broker (courtier) en mettant en lien les éditeurs et les abonnées.
L’architecture illustrée :
Du côté d’un publisher, une commande XADD
ajoute un élément pour rapporter les mesures. Voici un script en Python qui simule les mesures prises toute les 3 secondes en fixant aléatoirement la température et l’humidité pour l’exemple :
import time
import random
import redis
ROOM_ID = 'room-1' # À changer pour chaque instance du script
STREAM = 'room_state'
r = redis.Redis()
while True:
temp = random.normalvariate(mu=5, sigma=0.5)
humid = random.normalvariate(mu=30, sigma=2)
# XADD
r.xadd(STREAM, {
'room_id': ROOM_ID,
'temp': temp,
'humid': humid
})
time.sleep(3)
Afin de suivre les exemples en temps réel, on peut d’ores et déjà laisser une ou plusieurs instances de ce programme tourner en fond. Pour mieux suivre, on veillera à attribuer un ROOM_ID
unique à chaque instance.
Enfin, du côté des subscribers, c’est la commande XREAD
qui permet de se tenir informé en continu de ce qui survient, elle diffère quelque peu de la commande XRANGE
vue précédemment.
XREAD
permet d’écouter en continu un stream (optionnellement de façon bloquante) en indiquant un ID de début.
L’ID spécial $
désigne le dernier élément exclu, ce qui revient à indiquer que seuls les prochains éléments à survenir après l’instant où l’on commence l’écoute nous intéressent. Il est probable de commencer par écouter à partir de l’ID $
s’il n’existe de dernier ID connu au préalable. Mais autrement, on fixe généralement cet ID avec celui du dernier élément vu, comme nous le voyons !
import time
import redis
STREAM = 'room_state'
r = redis.Redis()
last_timestamp = '$' # Pas d'ID préalable connu, on commence par $
while True:
# Ici notre XREAD
events = r.xread({STREAM: last_timestamp}, block=0)
for ev in events:
_, data = ev
timestamp, info = data[0]
room_id = info[b'room_id']
temp = float(info[b'temp'])
humid = float(info[b'humid'])
if not 4 <= temp <= 6:
print(f'ALERT TEMP! {room_id}: {temp}%')
if not 24 <= humid <= 36:
print(f'ALERT HUMID! {room_id}: {humid}%')
last_timestamp = timestamp # On refixe l'ID à partir duquel on veut écouter
Et bien-sûr, n’oublions pas un autre programme qui lira lui aussi les données de façon concomitante :
import time
import redis
STREAM = 'room_state'
r = redis.Redis()
last_timestamp = '$'
sql = 'INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid);'
while True:
# Ici notre XREAD
events = r.xread({STREAM: last_timestamp}, block=0)
for ev in events:
_, data = ev
timestamp, info = data[0]
room_id = info[b'room_id']
temp = float(info[b'temp'])
humid = float(info[b'humid'])
# Simulation d'une exécution SQL
print('Req:', sql, [timestamp, room_id, temp, humid])
last_timestamp = timestamp
En faisant tourner ces deux programmes, on pourra bien observer qu’ils reçoivent en temps réel tous les deux les informations.
$ python abonné_alerte.py
%
b'1649107932476-0'
b'1649107933326-0'
b'1649107935481-0'
b'1649107936329-0'
ALERT TEMP! b'room-2': 6.12191050500779%
b'1649107938484-0'
b'1649107939334-0'
...
...
$ python abonné_sql.py
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107932476-0', b'room-2', 4.77177761522936, 30.35734886658456]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107933326-0', b'room-1', 5.708066609483226, 30.516484411754252]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107935481-0', b'room-2', 4.2003788955207435, 28.884695064157125]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107936329-0', b'room-1', 4.9264638938932865, 31.445630632770374]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107938484-0', b'room-2', 6.12191050500779, 31.110434314778836]
Req: INSERT INTO temp (t, room_id, temp, humid) VALUES (:t, :room_id, :temp, :humid); [b'1649107939334-0', b'room-1', 5.403059986341991, 27.360490108746863]
...
...
Si nous coupons le ou les programmes qui alimentent le flux de données, les deux programmes qui écoutent le stream se contentent de rester à l’écoute. Inversement, si rien n’écoute le stream, il continue malgré tout de s’alimenter des données reçues par les éditeurs, Redis enregistre donc bien les données qui pourront être interrogées ultérieurement.
Les publishers et les subscribers sont découplés grâce à l’intermédiaire qu’est Redis : les éditeurs et les abonnés n’ont aucun besoin de se connaître entre eux, ni de savoir ce qui existe dans le système.
À noter : la commande XREAD
permet d’écouter sur plus d’un stream à la fois, permettant ainsi de souscrire à plusieurs topics, en jargon Pub/Sub. Ici nous n’écoutons qu’un seul stream pour nos besoins, cependant.
Nous laissons à Redis le soin d’horodater les enregistrements lui-même par le biais de l’attribution automatique des ID en timestamp. Cependant, il convient de retenir que l’horodatage de Redis ne correspond pas à la date de l’évènement, mais plutôt à la date d’enregistrement de l’évènement dans Redis. La différence est subtile mais importante dans un système réel où on ne peut faire le pari d’un système globalement sans latence. Dans notre présentation, nous confondons les deux pour simplifier les exemples.
- L’implémentation de Redis utilise des arbres Radix qui permettent d’obtenir rapidement un intervalle complet d’éléments, vous trouverez plus d’informations dans cette présentation donnée par Salvatore Sanfilippo, le créateur de Redis : https://www.youtube.com/watch?v=Ty1rQuRJijk↩
Redis permet d’implémenter un Pub/Sub minimaliste et de surcroît une architecture découplée entre plusieurs éditeurs et plusieurs abonnés. Les subscribers peuvent recevoir le même message (broadcast), comme cela a été illustré.
Mais nous pourrions envisager et souhaiter un comportement essentiellement différent : plusieurs consommateurs (nouvelle terminologie pour subscriber dans ce contexte) qui consultent le même flux de messages, mais sans que l’un d’eux ne reçoive un même message qu’un autre. Cela est fort utile pour distribuer des tâches (souvent coûteuses en temps) parmi plusieurs workers parallèles qui font tous la même chose.
Toujours avec les streams, Redis a bien d’autres commandes en stock qui nous permettront d’implémenter un tel dispositif. Ce sera ainsi l’objet de notre prochain billet : les queues partagées.