Plongée au cœur de l'asynchrone en Python

Sans boire la tasse

Le modèle asynchrone a pris beaucoup d’ampleur dans les dernières versions de Python. La bibliothèque asyncio a été ajoutée en Python 3.4, ont suivi les mots-clés async et awaiten Python 3.5, et d’autres nouveautés dans les versions suivantes.

Grâce à nohar vous savez déjà comment fonctionnent les coroutines et la programmation asynchrone en Python. Mais vous êtes-vous déjà demandé comment Python gérait cela ?

Dans ce tutoriel, j’aimerais vous faire découvrir ce qui se cache derrière les mots-clés async et await, comment ils s’interfacent avec asyncio. Mais aussi de quoi est faite cette bibliothèque et comment on pourrait la réécrire.

Cet article présuppose une version de Python supérieure ou égale à 3.5. Une connaissance minimale du modèle asynchrone de Python et de la bibliothèque asyncio sont préférables.

Un monde de coroutines

Depuis Python 3.5, une coroutine se définit à l’aide des mots-clés async def :

async def simple_print(msg):
    print(msg)

Techniquement, simple_print n’est pas une coroutine. C’est en fait une fonction qui renvoie une nouvelle coroutine à chaque appel. Comme toute fonction, simple_print peut donc recevoir des arguments, qui seront utilisés par la coroutine et influeront sur son comportement.

>>> simple_print
<function simple_print at 0x7f0873895950>
>>> simple_print('Hello')
<coroutine object simple_print at 0x7f08738959e0>

Le contenu d’une coroutine ne s’exécute pas directement, il faut la lancer dans un environnement asynchrone. Par exemple avec un await utilisé depuis une autre coroutine.

Ici nous allons faire appel à asyncio, le moteur asynchrone de la bibliothèque standard. Il possède une méthode run permettant d’exécuter le contenu d’une coroutine.

>>> import asyncio
>>> asyncio.run(simple_print('Hello'))
Hello

Derrière cette simple ligne, asyncio se charge d’instancier une nouvelle boucle événementielle, de démarrer notre coroutine et d’attendre que celle-ci se termine. Si l’on omet les opérations de finalisation qu’ajoute asyncio.run, le code précédent est équivalent à :

>>> loop = asyncio.new_event_loop()
>>> asyncio.set_event_loop(loop)
>>> loop.run_until_complete(simple_print('Hello'))
Hello

Il s’agit donc d’une boucle événementielle, chargée d’exécuter et de cadencer les différentes tâches. La boucle est propre au moteur asynchrone utilisé, et permet une utilisation concurrente des tâches.

Mais de quoi est donc faite une coroutine ? Comment fait ce run_until_complete pour exécuter notre code ?

En inspectant l’objet renvoyé par simple_print, on remarque qu’il possède une méthode __await__.

>>> coro = simple_print('Hello')
>>> dir(coro)
['__await__', ...]

La coroutine serait donc un objet avec une méthode spéciale __await__. Nous voilà un peu plus avancés, plus qu’à en apprendre davantage sur cette méthode.

On voit qu’elle s’appelle sans arguments et qu’elle renvoie un objet de type coroutine_wrapper. Mais en inspectant à nouveau, on remarque que cet objet est un itérateur !

>>> aw = coro.__await__()
>>> aw
<coroutine_wrapper object at 0x7fcde8f30710>
>>> dir(aw)
[..., '__iter__', ..., '__next__', ..., 'send', 'throw']

Plus précisément, il s’agit ici d’un générateur, reconnaissable aux méthodes send et throw.

En résumé, les coroutines possèdent donc une méthode __await__ qui renvoie un itérateur. Cela semble logique si vous vous souvenez des articles donnés en introduction, qui montrent que la coroutine est un enrobage autour d’un générateur.

Les coroutines pouvant être converties en itérateurs, on comprend maintenant comment la boucle événementielle est capable de les parcourir. Une simple boucle for pourrait faire l’affaire, en itérant manuellement sur l’objet renvoyé par __await__.

>>> for _ in simple_print('Hello').__await__():
...     pass
... 
Hello

La coroutine présentée ici ne réalise aucune opération asynchrone, elle ne fait qu’afficher un message. Voici un exemple plus parlant d’une coroutine plus complexe faisant appel à d’autres tâches.

async def complex_work():
    await simple_print('Hello')
    await asyncio.sleep(0)
    await simple_print('World')

Le comportement est le même : itérer sur l’objet renvoyé par __await__ permet d’exécuter le corps de la coroutine.

>>> for _ in complex_work().__await__():
...     pass
... 
Hello
World

Mais avec cette simple boucle on ne voit pas clairement ce qui délimite les itérations. Impossible de savoir en voyant le code précédent combien la boucle a fait d’itérations (et donc à quel moment elle a repris la main).

Les itérateurs ne s’utilisent pas uniquement avec des boucles for, on peut aussi les parcourir pas à pas à l’aide de la fonction next. next renvoie à chaque appel l’élément suivant de l’itérateur, et lève une exception StopIteration en fin de parcours. C’est donc cette fonction que nous allons utiliser pour l’exécution, qui rendra visible chaque interruption de la tâche.

>>> it = complex_work().__await__()
>>> next(it)
Hello
>>> next(it)
World
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Cela apparaît très clairement maintenant, notre boucle réalise deux itérations. Chaque interruption permet à la boucle de reprendre la main, de gérer les événements et de cadencer les tâches (choisir de les suspendre ou de les continuer), c’est ainsi qu’elle peut en exécuter plusieurs « simultanément » (de façon concurrente).

C’est ici l’expression await asyncio.sleep(0) qui est responsable de l’interruption dans notre itération, elle est similaire à un yield pour un générateur. await est l’équivalent du yield from, il délégue l’itération à une sous-tâche. Il ne provoque pas d’interruption en lui-même, celle-ci ne survient que si elle est déclenchée par la sous-tâche (nous verrons par la suite par quel moyen).

asyncio.sleep(0) est un cas particulier de sleep qui ne fait qu’une simple interruption, sans attente. Le comportement serait différent avec une durée non nulle en paramètre.

Attendez-moi !

À la découverte des tâches asynchrones

Les coroutines ne sont pas les seuls objets qui peuvent s’utiliser derrière le mot-clé await. Plus généralement on parle de tâches asynchrones (ou awaitables) pour qualifier ces objets.

Ainsi, un awaitable est un objet caractérisé par une méthode __await__ renvoyant un itérateur. Les coroutines sont un cas particulier de tâches asynchrones construites autour d’un générateur (avant Python 3.5, on créait d’ailleurs une coroutine à l’aide d’un décorateur — asyncio.coroutine — appliqué à une fonction génératrice).

Voici par exemple un équivalent à notre fonction complex_work. ComplexWork est ici une classe dont les instances sont des tâches asynchrones.

class ComplexWork:
    def __await__(self):
        print('Hello')
        yield
        print('World')

Avec le mot-clé yield, notre méthode __await__ devient une fonction génératrice et renvoie donc un itérateur. On utilise yield sans paramètre, les valeurs renvoyées lors de l’itération ne nous intéressent pas pour l’instant, seule l’exécution importe.

Nous pouvons exécuter notre tâche asynchrone dans une boucle événementielle asyncio :

>>> loop.run_until_complete(ComplexWork())
Hello
World

Et notre objet respecte le protocole établi : il est possible d’itérer sur le retour d'__await__.

>>> it = ComplexWork().__await__()
>>> next(it)
Hello
>>> next(it)
World
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Synchronisation entre tâches

En pratique, il est assez peu fréquent d’avoir besoin de définir un awaitable autre qu’une coroutine. C’est néanmoins utile si l’on souhaite conserver un état associé à notre tâche, pour pouvoir interagir avec elle depuis l’extérieur en altérant cet état.

Prenons par exemple la classe Waiter qui suit, qui permet d’attendre un résultat.

class Waiter:
    def __init__(self):
        self.done = False

    def __await__(self):
        while not self.done:
            yield

Le principe est relativement simple : l’objet est initialisé avec un état booléen done à False et son générateur (__await__) s’interrompt continuellement tant que l’état ne vaut pas True. Cela bloque la tâche asynchrone appelante puisque la boucle événementielle itérera sur un générateur infini en attendant son changement d’état.
Une fois que cet état passe à True, le générateur prend fin et la tâche asynchrone est donc terminée. Ça permet alors à la boucle événementielle de reprendre l’exécution à la suite de cette tâche.

On utilise Waiter pour synchroniser deux tâches asynchrones. En effet, avec un objet waiter partagé entre deux tâches, une première peut attendre sur cet objet tandis qu’une seconde exécute un calcul avant de changer l’état du waiter (signalant que le calcul est terminé et permettant à la première tâche de continuer).

async def wait_job(waiter):
    print('start')
    await waiter # wait for count_up_to to be finished
    print('finished')

async def count_up_to(waiter, n):
    for i in range(n):
        print(i)
        await asyncio.sleep(0)
    waiter.done = True
>>> waiter = Waiter()
>>> loop.run_until_complete(asyncio.gather(
...     wait_job(waiter),
...     count_up_to(waiter, 10),
... ))
start
0
1
2
3
4
5
6
7
8
9
finished
[None, None]

Waiter permet donc ici à wait_job d’attendre la fin de l’exécution de count_up_to avant de continuer. Il est possible de faire varier le temps de sleep pour constater qu’il ne s’agit pas d’un hasard : la première tâche se met en pause tant que la seconde n’a pas terminé son traitement.

gather est un utilitaire d'asyncio servant à exécuter « simultanément » (en concurrence) plusieurs tâches asynchrones dans la boucle événementielle. La fonction renvoie la liste des résultats des sous-tâches (le [None, None] que l’on voit dans la fin de l’exemple, nos tâches ne renvoyant rien).

D’autres utilisations de Waiter sont possibles, à des fins de synchronisation, par exemple pour gérer des verrous (mutex) entre plusieurs tâches.

Boucle d'or et les trois tâches

Une première boucle événementielle

Après avoir défini différentes tâches aysnchrones, il serait intéressant de construire le moteur pour les exécuter, la boucle événementielle. Cette boucle se charge de cadencer et d’avancer dans les tâches, tout en tenant compte des événements qui peuvent survenir.

Nous nous appuyions jusque-là sur la boucle fournie par asyncio (asyncio.run, new_event_loop) et sur son environnement (sleep, gather), mais il va nous être nécessaire de nous en détacher pour bien comprendre comment s’agencent les tâches, et donc de recoder ces outils.

Nous avons déjà un algorithme basique de boucle événementielle, que nous suivons pour le moment manuellement, pour traiter une tâche :

  • Faire appel à __await__ pour récupérer l’itérateur associé.
  • Appeler continuellement next sur cet itérateur.
  • S’arrêter quand une exception StopIteration est levée.

Il nous est donc possible d’écrire cela sous la forme d’une fonction run_task prenant une unique tâche en paramètre.

def run_task(task):
    it = task.__await__()

    while True:
        try:
            next(it)
        except StopIteration:
            break

Ce premier prototype de boucle fonctionne, nous pouvons l’utiliser pour exécuter l’une de nos tâches.

>>> run_task(complex_work())
Hello
World

Mais il est assez limité, ne traitant pas du tout la question de l’exécution concurrente ou du cadencement. Pour l’améliorer, nous créons donc la fonction run_tasks, recevant une liste de tâches. Les itérateurs de ces tâches seront placés dans une file (FIFO) par la boucle, qui pourra alors à chaque itération récupérer la prochaine tâche à traiter et la faire avancer d’un pas. Après quoi, si la tâche n’est pas terminée, elle sera ajoutée en fin de file pour être continuée plus tard.

def run_tasks(*tasks):
    tasks = [task.__await__() for task in tasks]

    while tasks:
        # On prend la première tâche disponible
        task = tasks.pop(0)
        try:
            next(task)
        except StopIteration:
            # La tâche est terminée
            pass
        else:
            # La tâche continue, on la remet en queue de liste
            tasks.append(task)

On obtient maintenant une exécution réellement concurrente. Le mécanisme de file (algorithme type round-robin) permet de traiter toutes les tâches de la même manière, sans en laisser sur le carreau. Ce sont néanmoins les tâches qui contrôlent la cadence, choisissant explicitement quand elles rendent la main à la boucle (yield / await asyncio.sleep(0) ou équivalents), lui permettant de passer à la tâche suivante.

Pour nous assurer du bon fonctionnement, on peut tester notre fonction avec nos coroutines wait_job et count_up_to.

>>> waiter = Waiter()
>>> run_tasks(wait_job(waiter), count_up_to(waiter, 10))
start
0
1
2
3
4
5
6
7
8
9
finished

Construire un environnement asynchrone

Cependant, un moteur asynchrone n’est rien sans les utilitaires qui vont avec. Nous avons vu la fonction sleep pour asyncio qui permet de patienter un certain nombre de secondes, et il serait utile d’en avoir un équivalent dans notre environnement.

Vous me direz que l’on utilise déjà await asyncio.sleep(0) dans nos coroutines et que ça ne pose pas de problème particulier, mais c’est justement parce que le paramètre vaut 0. Une autre valeur provoquerait une erreur parce que ne serait pas gérée par notre boucle événementielle.

Commençons par une tâche élémentaire toute simple, qui nous servira à construire le reste. Pour rendre la main à la boucle événementielle, il est nécessaire d’avoir un itérateur qui produit une valeur. Mais nous ne pouvons pas le faire directement depuis nos coroutines avec un yield, il faut nécessairement passer par une autre tâche que l’on await.

Il nous serait pratique d’avoir une tâche interrupt, où un await interrupt() serait équivalent à un yield / await asyncio.sleep(0). C’est le cas avec la classe suivante.

class interrupt:
    def __await__(self):
        yield

La tâche est peu utile en elle-même, mais elle permet de construire autour d’elle un environnement de coroutines. Par exemple, on peut imaginer une coroutine qui rendrait la main à la boucle (et donc patienterait) tant qu’un temps (absolu) n’a pas été atteint.

import time

async def sleep_until(t):
    while time.time() < t:
        await interrupt()

Partant de là, une coroutine sleep se construit facilement en transformant une durée (temps relatif) en temps absolu.

async def sleep(duration):
    await sleep_until(time.time() + duration)

À titre d’exemple, voici une coroutine qui affiche des messages reçus en arguments, espacés par une certaine durée.

async def print_messages(*messages, sleep_time=1):
    for msg in messages:
        print(msg)
        await sleep(sleep_time)

On l’utilise ensuite avec run_tasks en instanciant deux coroutines pour bien voir que leurs messages s’intermêlent, et donc qu’il n’y a pas d’attente active : la boucle est capable de passer à la tâche suivante quand la première est bloquée, il lui suffit de rencontrer une interruption.

>>> run_tasks(
...     print_messages('foo', 'bar', 'baz'),
...     print_messages('aaa', 'bbb', 'ccc', sleep_time=0.7),
... )
foo
aaa
bbb
bar
ccc
baz

(Le résultat n’est pas très parlant ici vu qu’il manque de dynamisme, je vous invite à l’exécuter chez vous pour mieux vous en rendre compte.)

Interagir avec la boucle

La « boucle » que nous utilisons pour le moment ne permet aucune interaction : une fois lancée, il n’est par exemple plus possible d’ajouter de nouvelles tâches. Ça limite beaucoup les cas d’utilisation.

Pour remédier à cela, nous allons donc transformer notre fonction en classe afin de lui ajouter un état (la liste des tâches en cours) et une méthode add_task pour programmer de nouvelles tâches à la volée.

class Loop:
    def __init__(self):
        self.tasks = []

    def add_task(self, task):
        if hasattr(task, '__await__'):
            task = task.__await__()
        self.tasks.append(task)

    def run(self):
        while self.tasks:
            task = self.tasks.pop(0)
            try:
                next(task)
            except StopIteration:
                pass
            else:
                self.add_task(task)

Les deux premières lignes de la méthode add_task sont utiles pour reprogrammer une tâche déjà en cours (appel ligne 18), qui aura déjà été transformée en itérateur auparavant.

On peut aussi ajouter une méthode utilitaire, run_task, pour faciliter le lancement d’une tâche seule.

class Loop:
    [...]

    def run_task(self, task):
        self.add_task(task)
        self.run()

À l’utilisation, on retrouve le même comportement que précédemment.

>>> loop = Loop()
>>> loop.run_task(print_messages('foo', 'bar', 'baz'))
foo
bar
baz

Notre boucle possède maintenant un état, mais il n’est toujours pas possible d’interagir avec elle depuis nos tâches asynchrones, car nous n’avons aucun moyen de connaître la boucle en cours d’exécution.
Pour cela, nous ajoutons un attribut de classe current référençant la boucle en cous, réinitialisé à chaque run.

class Loop:
    [...]

    current = None

    def run(self):
        Loop.current = self
        [...]

Dans un environnement réel, il nous faudrait réinitialiser current à chaque tour de boucle dans le run, pour permettre à plusieurs boucles de coexister. Mais le code proposé ici ne l’est qu’à titre d’exemple, on notera aussi que le traitement n’est pas thread-safe.

D’autres utilitaires asynchrones

Cet attribut Loop.current va nous être d’une grande utilité pour réaliser notre propre coroutine gather. Pour rappel, cet outil permet de lancer plusieurs coroutines « simultanément » et d’attendre qu’elles soient toutes terminées.

On peut commencer par reprendre notre classe Waiter pour étendre son comportement. Plutôt que de n’avoir qu’un état booléen, on le remplace par un compteur, décrémenté à chaque notification. On le dote alors d’une méthode set pour le notifier. L’attente d’un objet Waiter se termine une fois qu’il a été notifié n fois.

class Waiter:
    def __init__(self, n=1):
        self.i = n

    def set(self):
        self.i -= 1

    def __await__(self):
        while self.i > 0:
            yield

À partir de ce Waiter il devient très facile de recoder gather. Il suffit en effet d’instancier un Waiter en lui donnant le nombre de tâches, d’ajouter ces tâches à la boucle courante à l’aide de Loop.current.add_task, et d’attendre le Waiter.

Une petite subtilité seulement : les tâches devront être enrobées dans une nouvelle coroutine afin qu’elles notifient le Waiter en fin de traitement.

async def gather(*tasks):
    waiter = Waiter(len(tasks))

    async def task_wrapper(task):
        await task
        waiter.set()

    for t in tasks:
        Loop.current.add_task(task_wrapper(t))
    await waiter

On constate bien l’exécution concurrente des tâches, il est possible de faire varier le temps de pause pour observer les changements.

>>> loop = Loop()
>>> loop.run_task(
...     gather(
...         print_messages('foo', 'bar', 'baz'),
...         print_messages('aaa', 'bbb', 'ccc', sleep_time=0.7),
...     )
... )
foo
aaa
bbb
bar
ccc
baz

Et contrairement à notre précédent run_tasks qui permettait déjà celà, gather peut s’utiliser partout derrière un await, permettant de construire de vrais workflows.

>>> async def workflow():
...     await gather(
...         print_messages('a', 'b'),
...         print_messages('c', 'd', 'e'),
...     )
...     await print_messages('f', 'g')
...
>>> loop.run_task(workflow())
a
c
b
d
e
f
g

Utilitaires réseau (sockets)

Oublions ce print_messages et venons-en à des cas d’utilisation plus concrets. Les environnements asynchrones sont particulièrement adaptés aux programmes qui réalisent beaucoup d’opérations d’entrée/sortie (I/O), tels que des applications réseau. Voici par exemple comment nous pourrions intégrer des sockets (connecteurs réseau) à notre moteur asynchrone.

Nous utiliserons pour cela le type socket de Python, ainsi que la fonction select qui nous permet de savoir quand un fichier est prêt en lecture et/ou écriture. Le principe est alors, pour chaque opération, de vérifier si la socket est prête avant d’exécuter le traitement, et d’interrompre la coroutine le cas échéant afin de réessayer plus tard. À ce moment-là, la boucle événementielle reprend la main et peut continuer ses autres opérations pour ne pas les bloquer.

On construit une classe AIOSocket, reprenant l’interface de socket. Notre classe sera appelée avec une socket déjà instanciée, il ne reste alors plus qu’à instancier les sélecteurs pour la surveiller en lecture et en écriture. Nous ajoutons les méthodes close et fileno pour respecter l’interface, ainsi que le protocole des gestionnaires de contexte.

import select

class AIOSocket:
    def __init__(self, socket):
        self.socket = socket
        self.pollin = select.epoll()
        self.pollin.register(self, select.EPOLLIN)
        self.pollout = select.epoll()
        self.pollout.register(self, select.EPOLLOUT)

    def close(self):
        self.socket.close()

    def fileno(self):
        return self.socket.fileno()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.socket.close()

Et maintenant les coroutines de connexion, sur le modèle donné plus haut : attendre que la socket soit prête et exécuter l’opération ensuite.

class AIOSocket:
    [...]

    async def bind(self, addr):
        while not self.pollin.poll():
            await interrupt()
        self.socket.bind(addr)

    async def listen(self):
        while not self.pollin.poll():
            await interrupt()
        self.socket.listen()

    async def connect(self, addr):
        while not self.pollin.poll():
            await interrupt()
        self.socket.connect(addr)

Toujours sur ce modèle, on ajoute ensuite les coroutines de lecture/écriture. On notera juste que la méthode accept d’une socket renvoie un couple (socket, adresse). Nous ignorons ici l’adresse et emballons la socket dans une instance AIOSocket, afin de renvoyer un objet du même type.

class AIOSocket:
    [...]

    async def accept(self):
        while not self.pollin.poll(0):
            await interrupt()
        client, _ = self.socket.accept()
        return self.__class__(client)

    async def recv(self, bufsize):
        while not self.pollin.poll(0):
            await interrupt()
        return self.socket.recv(bufsize)

    async def send(self, bytes):
        while not self.pollout.poll(0):
            await interrupt()
        return self.socket.send(bytes)

Enfin, on ajoute un utilitaire pour créer une socket asynchrone de toutes pièces, reprenant les paramètres et valeurs par défaut de socket.

import socket

def aiosocket(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
    return AIOSocket(socket.socket(family, type, proto, fileno))

Avec ces sockets asynchrones, nous pouvons facilement créer des coroutines représentant un client et un serveur. Dans l’exemple qui suit, le serveur gère un unique client et ne fait que lui renvoyer le message reçu en l’inversant.

async def server_coro():
    with aiosocket() as server:
        await server.bind(('localhost', 8080))
        await server.listen()
        with await server.accept() as client:
            msg = await client.recv(1024)
            print('Received from client', msg)
            await client.send(msg[::-1])

async def client_coro():
    with aiosocket() as client:
        await client.connect(('localhost', 8080))
        await client.send(b'Hello World!')
        msg = await client.recv(1024)
        print('Received from server', msg)
>>> loop = Loop()
>>> loop.run_task(gather(server_coro(), client_coro()))
Received from client b'Hello World!'
Received from server b'!dlroW olleH'

On constate bien que rien n’est bloquant, les deux coroutines ont pu s’exécuter en concurrence, rendant la main à la boucle quand les I/O étaient indisponibles.

No Future

Mécanisme des futures

Le moteur asynchrone du chapitre précédent est assez peu efficace, notamment sa fonction sleep. En effet : la tâche est bien interrompue le temps de l’attente, mais elle est reprogrammée par la boucle à chaque itération, pour rien. De même pour la tâche Waiter qui n’a normalement pas besoin d’être programmée tant que son compteur ne vaut pas zéro.

On sait qu’une tâche est suspendue car elle attend qu’une condition (temporelle ou autre) soit vraie. il serait alors intéressant que la boucle événementielle ait connaissance de cela et ne cadence que les tâches dont les préconditions sont remplies.

Pour éviter ce problème, asyncio utilise un mécanisme de futures. Une future est une tâche asynchrone spécifique, qui permet d’attendre un résultat qui n’a pas encore été calculé. La future ne peut être relancée par la boucle événementielle qu’une fois ce résultat obtenu.

Il se trouve que le yield utilisé dans nos tâches pour rendre la main à la boucle peut s’accompagner d’une valeur, comme dans tout générateur. Ici, il va nous servir à communiquer avec la boucle, pour lui indiquer la future en cours. C’est ce que fait asyncio.sleep avec une durée non nulle par exemple.

On peut commencer avec un prototype de future tout simple, sur le modèle de notre première classe Waiter.

class Future:
    def __await__(self):
        yield self
        assert self.done

Nous n’avons pas besoin de boucle ici, puisque la tâche ne devrait pas être programmée plus de deux fois : une première fois pour démarrer l’attente, et une seconde après que la condition soit remplie pour reprendre le travail de la tâche appelante. On place néanmoins un assert pour s’assurer que ce soit bien le cas.

Lorsque, depuis une coroutine, on fera un await Future(), la valeur passée au yield remontera le flux des appels jusqu’à la boucle événementielle, qui la recevra en valeur de retour de next. Ainsi, un yield self depuis la classe Future permettra à la boucle d’avoir accès à la future courante. C’est le seul moyen pour la boucle d’en avoir une référence, puisqu’elle ne connaît sinon que la tâche asynchrone englobante.

Pour améliorer notre classe Future, on va l’agrémenter d’une méthode set afin de signaler que le traitement est terminé. En plus de cela, la méthode se chargera aussi de reprogrammer notre tâche au niveau de la boucle événementielle (c’est à dire de l’ajouter à nouveau aux tâches à exécuter, afin qu’elle soit prise en compte à l’itération suivante).

Pour connaître la tâche à cadencer, on va utiliser l’attribut task de l’objet Future. Il n’existe pas encore pour le moment, mais sa valeur lui sera attribuée par la boucle événementielle lorsque la tâche sera interrompue.

class Future:
    def __init__(self):
        self._done = False
        self.task = None

    def __await__(self):
        yield self
        assert self._done

    def set(self):
        self._done = True
        if self.task is not None:
            Loop.current.add_task(self.task)

Intégration à la boucle événementielle

Notre tâche Future est maintenant complète, mais le reste du travail est à appliquer du côté de la boucle, pour qu’elle les traite correctement.

  • Premièrement, il faut que quand une tâche s’interrompt sur une future, la boucle définisse l’attribut task de la future comme convenu.
  • Ensuite, la boucle ne doit pas reprogrammer une telle tâche, puisque ça provoquerait un doublon lorsque la future sera notifiée.
  • Enfin, il est nécessaire de lier les futures à des événements, pour que l’appel à set et donc le déclenchement de la tâche soient automatiques.

On commence par les deux premiers points, faciles à ajouter à la méthode run de Loop.

class Loop:
    [...]

    def run(self):
        Loop.current = self
        while self.tasks:
            task = self.tasks.pop(0)
            try:
                result = next(task)
            except StopIteration:
                continue

            if isinstance(result, Future):
                result.task = task
            else:
                self.tasks.append(task)

Événements temporels

Pour le troisième point, on va formaliser l’idée d’événements. Les plus simples à mettre en place sont les événements temporels, et ce sont donc les seuls que nous allons traiter ici. En effet, la boucle a conscience du temps qui s’écoule et peut déclencher des actions en fonction de ça. Le but sera donc d’associer un temps à une future, et d’y faire appel dans la boucle.

Tout d’abord, on crée une classe TimeEvent associant ces deux éléments. On rend les objets de cette classe ordonnables, en implémentant les méthodes spéciales __eq__ (opérateur ==) et __lt__ (opérateur >) puis en appliquant le décorateur functools.total_ordering pour générer les méthodes des autres opérateurs d’ordre.
On a besoin que les objets soient ordonnables pour trouver facilement les prochains événements à déclencher.

from functools import total_ordering

@total_ordering
class TimeEvent:
    def __init__(self, t, future):
        self.t = t
        self.future = future

    def __eq__(self, rhs):
        return self.t == rhs.t

    def __lt__(self, rhs):
        return self.t < rhs

On intègre les événements temporels à notre boucle en la dotant d’une méthode call_later. Cette méthode reçoit un temps (absolu) et une future, les associe dans un objet TimeEvent qu’elle ajoute à la file des événements. On utilise pour la file un objet heapq qui permet de conserver un ensemble ordonné : le premier événement de la file sera toujours le prochain à exécuter.

heapq est un module fournissant des fonctions (heappush, heappop) qui s’appuient sur une liste (self.handlers dans le code qui suit) pour garder une file cohérente.

import heapq

class Loop:
    [...]

    def __init__(self):
        self.tasks = []
        self.handlers = []

    def call_later(self, t, future):
        heapq.heappush(self.handlers, TimeEvent(t, future))

Dans le cœur de la boucle (méthode run), il suffit alors de regarder l’événement en tête de file, et de le déclencher si besoin (si son temps est atteint). Déclencher l’événement signifie notifier la future qui lui est associée (appeler sa méthode set). L’effet sera donc immédiat, la future ajoutera la tâche suspendue aux tâches courantes, et celle-ci sera prise en compte par la boucle pendant l’itération. Le reste de la méthode run reste inchangé.

class Loop:
    [...]

    def run(self):
        Loop.current = self
        while self.tasks or self.handlers:
            if self.handlers and self.handlers[0].t <= time.time():
                handler = heapq.heappop(self.handlers)
                handler.future.set()

            if not self.tasks:
                continue
            task = self.tasks.pop(0)
            try:
                result = next(task)
            except StopIteration:
                continue

            if isinstance(result, Future):
                result.task = task
            else:
                self.tasks.append(task)

Utilisation des futures

Notre boucle gérant correctement les événements temporels, on peut maintenant réécrire sleep avec une future et un time-handler. Tout ce qu’a à faire sleep c’est de convertir une durée en temps absolu, instancier une future et l’ajouter à la boucle en appelant call_later.

import time

async def sleep(t):
    future = Future()
    Loop.current.call_later(time.time() + t, future)
    await future

Il suffit qu’une coroutine exécute await sleep(...) pour que tout le mécanisme se mette en place :

  • Une future est instanciée, un événement temporel lui est associé dans la boucle, réglé sur la durée demandée.
  • La coroutine est retirée de la liste des tâches à traiter.
  • La boucle continue son travail, en itérant sur les autres tâches, jusqu’à ce que l’événement temporel se produise.
  • Là, elle déclenche la notification de la future, la coroutine est donc rajoutée à la liste des tâches.
  • La boucle reprend alors l’exécution de la coroutine précédemment suspendue.
>>> async def foo():
...     print('before')
...     await sleep(5)
...     print('after')
...
>>> loop = Loop()
>>> loop.run_task(foo())
before
after

Notre boucle possède encore bien des défauts, comme celui de faire de l’attente active (bloquer le processeur) quand il n’y a rien à exécuter. L’implémentation d'asyncio est bien sûr plus évoluée que ce qui est présenté ici.

Et pour quelques outils de plus

async def et await ne sont pas les seuls mots-clés introduits par la version 3.5 de Python. Deux nouveaux blocs ont aussi été ajoutés : les boucles asynchrones (async for) et les gestionnaires de contexte asynchrones (async with).

Ils sont similaires à leurs équivalents synchrones mais utilisent des méthodes spéciales qui font appel à des coroutines. Et ils ne sont utilisables qu’au sein de coroutines (de la même manière qu'await).

Aussi, Python n’a pas arrêté d’évoluer après cette version 3.5, et de nouveaux outils pour la programmation asynchrones sont venus s’y ajouter depuis.

Itérables & générateurs asynchrones

Itérables asynchrones

Pour rappel, un itérable est un objet possédant une méthode __iter__ renvoyant un itérateur. Et un itérateur est un objet possédant une méthode __next__ qui renvoie le prochain élément à chaque appel. Plus d’informations à ce sujet ici.

Sur ce même modèle, un itérable asynchrone est un objet doté d’une méthode __aiter__ qui renvoie un itérateur asynchrone (__aiter__ est une méthode synchrone).
Et un itérateur asynchrone possède une méthode-coroutine __anext__, renvoyant le prochain élément et pouvant user de tous les outils asynchrones.

Un itérateur synchrone se termine quand sa méthode __next__ lève une exception StopIteration. Dans le cas des itérateurs asynchrones, c’est une exception AsyncStopIteration qui sera levée.

La boucle async for parcourant l’itérateur sera suspendue pendant les attentes (rendant la main à la boucle événementielle).

Le code qui suit présente la classe ARange, un itérable asynchrone qui produit des nombres à la manière de range, mais en se synchronisant sur un événement extérieur (ici un sleep(1)). ARange représente l’itérable et ArangeIterator l’itérateur associé (qui n’a jamais besoin d’être utilisé directement). ARange en elle-même n’a rien d’asynchrone, tout le code asynchrone se trouve dans la classe de l’itérateur.

class ARange:
    def __init__(self, stop):
        self.stop = stop

    def __aiter__(self):
        return ARangeIterator(self)


class ARangeIterator:
    def __init__(self, arange):
        self.arange = arange
        self.i = 0

    async def __anext__(self):
        if self.i >= self.arange.stop:
            raise StopAsyncIteration
        await sleep(1)
        i = self.i
        self.i += 1
        return i

Pour tester l’itérable dans notre environnement, définissons une simple coroutine utilisant un async for :

>>> async def test_for():
...     async for val in ARange(5):
...         print(val)
...
>>> loop = Loop()
>>> loop.run_task(test_for())
0
1
2
3
4

Générateurs asynchrones

Les choses se simplifient en Python 3.6 où il devient possible de définir des générateurs asynchrones. Il suffit d’un yield utilisé dans une coroutine pour la transformer en fonction génératrice asynchrone.

async def arange(stop):
    for i in range(stop):
        await sleep(1)
        yield i

arange s’utilise exactement de la même manière que la classe ARange précédente (remplacez ARange(5) par arange(5) dans l’exemple plus haut pour le vérifier), mais avec un code bien plus court.

En Python 3.6 la syntaxe async for devient aussi utilisable dans les listes / générateurs / ensembles / dictionnaires en intension, toujours depuis une coroutine.

>>> async def test_for():
...     print([x async for x in arange(5)])
...
>>> loop = Loop()
>>> loop.run_task(test_for())
[0, 1, 2, 3, 4]

Gestionnaires de contexte asynchrones

Un gestionnaire de contexte est défini par ses méthodes __enter__ et __exit__ permettant d’exécuter du code en entrée et en sortie de bloc with (voir ici).

Le gestionnaire de contexte asynchrone est défini sur le même modèle, avec des coroutines __aenter__ et __aexit__. Elles sont donc exécutées respectivement à l’entrée et à la sortie du bloc async with, utilisé dans une coroutine.

Par exemple, voici un gestionnaire de contexte permettant de créer un serveur autour de l’objet aiosocket que nous avons utilisé au chapitre 3. La coroutine __aenter__ se charge de démarrer le serveur (bind et listen), et __aexit__ le clôt (close).

class Server:
    def __init__(self, addr):
        self.socket = aiosocket()
        self.addr = addr

    async def __aenter__(self):
        await self.socket.bind(self.addr)
        await self.socket.listen()
        return self.socket

    async def __aexit__(self, *args):
        self.socket.close()

Encore une fois, nous définissons une coroutine pour pouvoir tester notre objet. On reprend le même principe que précédemment d’un serveur qui renvoie juste l’inverse du message reçu. Et l’on réutilise client_coro pour jouer le rôle du client.

>>> async def test_with():
...     async with Server(('localhost', 8080)) as server:
...         with await server.accept() as client:
...             msg = await client.recv(1024)
...             print('Received from client', msg)
...             await client.send(msg[::-1])
...
>>> loop = Loop()
>>> loop.run_task(gather(test_with(), client_coro()))
Received from client b'Hello World!'
Received from server b'!dlroW olleH'

Cette fois-ci c’est Python 3.7 qui est venu simplifier les choses, en ajoutant le support des gestionnaires de contexte asynchrones à la contextlib. Ainsi, il devient possible d’utiliser un décorateur asynccontextmanager pour transformer un générateur asynchrone en gestionnaire de contexte asynchrone. L’instruction yield permet de séparer le code d’initialisation de celui de fermeture du bloc async with.

from contextlib import asynccontextmanager

@asynccontextmanager
async def server(addr):
    socket = aiosocket()
    try:
        await socket.bind(addr)
        await socket.listen()
        yield socket
    finally:
        socket.close()

server s’utilise de la même manière que la classe Server précédente.


Bien sûr, tout ce qui est présenté dans cet article ne l’est qu’à titre d’exemple. Le but est d’étudier comment fonctionne dans les grandes lignes un moteur asynchrone, et en particulier asyncio, mais pas de le remplacer. Si vous êtes intéressé par une alternative à asyncio, jetez un œil du côté de trio qui utilise une approche différente.

L’objet de cet article a été présenté à Bordeaux lors de la PyConFr 2019, dont vous pouvez retrouver les diapositives sur Github et la conférence sur Youtube :

Ces contenus pourraient vous intéresser

6 commentaires

En ces temps de confinement, ça fait toujours du bien de piquer une tête !

Ah, et j’ajoute que les exemples de code des différentes sections sont accessibles ici : https://github.com/entwanne/article_python_plongee_asynchrone/tree/master/examples

Merci pour ton article, intéressant et très complet. Désolé, je vais me concentrer sur ce qui aurait pu être mieux à mon sens. Mais ça n’enlève rien à ton super travail. ^^

J’ai tout de suite pensé à https://github.com/AndreLouisCaron/a-tale-of-event-loops qui reconstruit aussi une event loop depuis zéro. C’est une approche différente et donc complémentaire, dont tu peux t’inspirer par endroits. Notamment, passer par une Future pour implémenter sleep() semble inutilement compliqué.

Et sinon, trois autres remarques :

  • J’aurais renommé Waiter en BusyWaiter et expliqué dès le début à quel point il est lent.
  • epoll() ne fonctionne que sous Linux, je conseille le module selectors qui fonctionne aussi sous Windows et macOS.
  • J’aurais aimé voir la gestion des exceptions dans gather(), c’est un point qui est trop souvent oublié et qui fait que asyncio.create_task et future.add_done_callback sont à bannir dans asyncio (plus de détails dans https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).

Merci pour ton retour, je regarderai l’article que tu cites. Je l’ai survolé pour l’instant mais j’ai l’impression que les timers utilisés sont assez sembables à mon implémentation des time handlers pour sleep. La future étant juste le moyen de l’encapsuler.

Par rapport à tes remarques :

  • Je voulais garder le tout le plus simple et montrer que le code qui suivait n’était qu’une amélioration, de façon à ce qu’à l’utilisation ce soit transparent quelle que soit l’implémentation du Waiter.
  • En effet, j’avais pensé aux sélecteurs mais ça me paraissait plus compliqué à mettre en œuvre avec leur mécanisme de callbacks. Je verrai pour leur donner une seconde chance.
  • Oui c’est un point auquel je n’ai pas du tout pensé, il faudrait que je regarde comment c’est géré, et si ça peut être décrit facilement ici.

Excellent article, merci @entwanne !

J’aurais aimé voir la gestion des exceptions dans gather(), c’est un point qui est trop souvent oublié et qui fait que asyncio.create_task et future.add_done_callback sont à bannir dans asyncio (plus de détails dans https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).

Quentin

La partie que j’ai mise en gras m’a fait tiquer, et je m’apprêtais à écrire une réponse un peu lapidaire… avant de me souvenir du dernier projet sur lequel j’ai travaillé en Python, avec du asyncio bien vénère, jusqu’à l’année dernière. :D

En effet, le billet cité en lien (quoi que l’auteur ne se prend quand même pas pour de la m… en se comparant à Dijkstra) aborde un problème sur lequel j’ai eu l’occasion de passer une grosse nuit blanche à réfléchir pour lui trouver une solution simple et structurante, car il s’agit d’une douloureuse lacune d'asyncio : celui-ci nous donne asyncio.ensure_future sans nous donner de mécanisme solide pour s’assurer que toutes les coroutines seront terminées ou annulées à la sortie du programme. Mon boulot en tant que lead aura été de créer une abstraction qui réponde à ce problème sans interdire quoi que ce soit aux développeurs de mon équipe, et je m’aperçois que cette abstraction revient ni plus ni moins à la même chose que ce que Trio appelle une nursery (sauf que dans le contexte de mon framework ça s’appelait un Scheduler, mais c’est pas la question…).

Pour en revenir à la formulation qui m’a fait tiquer : non, il ne faut rien bannir, et ce n’est pas non plus ce que fait Trio. Ce que le billet appelle un "go statement" répond à un besoin bien réel : si je développe dans un contexte concurrent, alors je veux absolument pouvoir être capable lancer une tâche et l’oublier dans le contexte d’où je l’ai lancée (fire and forget) tout en ayant la certitude que quelqu’un, plus haut, s’assurera que cette tâche sera terminée ou annulée en temps voulu. C’est à ce besoin que répond (partiellement) asyncio avec create_task. C’est une façon parfaitement intuitive de penser son code, et il n’y a aucune raison de vouloir l’interdire, sinon cela revient à ce que j’appelle "chier dans les bottes des développeurs", c’est-à-dire leur imposer de faire des contorsions et se mettre à penser d’une manière qui ne leur est pas naturelle. Si asyncio y répond mal, c’est simplement parce qu’il ne fournit pas, par défaut, de poignée sur un contexte (/scope/nursery/scheduler/waitgroup…) qui permette soit d’attendre que toutes ces tâches soient terminées, soit que les erreurs soient propagées : même gather n’y répond pas (il ne permet pas de faire du fire and forget, c’est-à-dire de rajouter dynamiquement une nouvelle tâche au groupe en cours de route). En créant une telle poignée explicite (et franchement, ça ne casse pas non plus trois pattes à un canard…) à laquelle il suffit de passer le résultat de l’appel à create_task, on règle le problème, et on respecte du même coup le zen de Python (explicit is better than implicit).

Pour le coup, le billet cité en lien prend Go en disant "regardez ce qu’il ne faut pas faire" et je trouve que ce faisant, il est à côté de la plaque, car Go fournit dans sa bibliothèque standard des outils qui permettent de faire ça depuis longtemps avec le type WaitGroup, et le package (provisionnel) x/sync/errgroup, dont l’implémentation d’une version qui prend en compte l’annulation lorsqu’une erreur se produit dans une tâche concurrente est complètement triviale. Outre le fait que le problème soit mal posé (toute sa comparaison entre go et goto est un gigantesque bullshit qu’il contredit lui-même plus loin en présentant une solution qui ne change absolument rien à son prétendu problème), je trouve qu’il est très malhonnête de la part de l’auteur de ce billet de ne pas parler de ces solutions et de se poser en inventeur génial d’un concept novateur… que j’ai moi-même dû pondre de mon côté en une nuit il y a deux ans (et je ne suis certainement pas le seul à l’avoir fait !), sans pour autant écrire un billet où je remets en question la moitié des langages de programmation de la Terre.

Enfin, j’aimerais conclure en disant que ce sujet dépasse d’assez loin le cadre de cet article, car il s’agit, au fond, de donner une réflexion sur les bonnes pratiques à adopter lorsque l’on conçoit un programme concurrent, alors qu’ici le principe de l’article est plutôt de décortiquer, techniquement, les mécanismes internes qui rendent la programmation asynchrone possible en Python.

+1 -0

Je suis récemment tombé sur cet article de A. Jesse Jiryu Davis et Guido van Rossum, qui date de l'époque de Python 3.5 et aborde le même sujet sous un angle différent (bottom-up plutôt que top-down comme ici).

On y voit par exemple que le tout est construit autour du mécanisme des futures, pour faire la liaison entre un résultat et la reprise de l’exécution, alors que dans l’approche de mon article elles ont surtout un rôle de bouche-trou (une brique ajoutée pour éviter des itérations inutiles de la boucle)

Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte