Execution concurrente et traitement progressif des résultats

a marqué ce sujet comme résolu.

Bonjour à tous!

J’utilise depuis longtemps Python pour le traitement de données. D’habitude je charge les données depuis des fichiers sur le disque dur pour ensuite les traiter. Pour ça je n’ai pas de soucis.

Par contre, aujourd’hui, je fais face à un problème un peu différent : je récupère mes données en lançant un grand nombre de commande (appel à un programme extérieure).

Pour faire ça de manière séquentielle et synchrone je n’ai aucun soucis mais comme je dois lancer un grand nombre de commande qui dure chacune assez longtemps (plusieurs minutes), j’essaye de faire tourner tout ça de manière concurrente et asynchrone.

Pour le moment je suis arrivé à cette version qui fonctionne parfaitement mais qui présente à mes yeux encore des défauts :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import random

def generate_input(n):
    return range(n)

async def get_data(i):
    sleep_time = random.randint(2,10)
    print("Get data {i} ({sleep_time}s)".format(i=i,sleep_time=sleep_time))
    job = await asyncio.create_subprocess_exec("sleep",str(sleep_time))
    res = await job.wait()
    print("Get data {i} finished".format(i=i))
    return i

async def get_datas(n):
    jobs = []
    for i in generate_input(n):
        job = get_data(i)
        jobs.append(job)
    results,_ = await asyncio.wait(jobs)
    return [result.result() for result in results]

Je lance alors la récupération des données de la manière suivante :

1
2
3
loop = asyncio.get_event_loop()
results = loop.run_until_complete(get_datas(10))
print(results)

et qui me donne la sortie suivante :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
Get data 0 (9s)
Get data 8 (5s)
Get data 7 (7s)
Get data 2 (6s)
Get data 9 (3s)
Get data 6 (5s)
Get data 1 (10s)
Get data 5 (7s)
Get data 4 (3s)
Get data 3 (3s)
Get data 9 finished
Get data 4 finished
Get data 3 finished
Get data 8 finished
Get data 6 finished
Get data 2 finished
Get data 7 finished
Get data 5 finished
Get data 0 finished
Get data 1 finished
[8, 6, 7, 1, 0, 2, 5, 9, 4, 3]

Les résultats ne sortent pas dans l’ordre mais cela n’a pas d’importance pour moi.

Cette méthode fonctionne parfaitement pour moi mais j’aimerais l’améliorer de 2 manières:

  • Je travaille dans une session interactive (python ou IPython) et je souhaite pouvoir continuer à travailler dans cette session lorsque la fonction get_datas est lancée. J’ai essayé pas mal de chose (loop.call_soon, ThreadPoolExecutor + loop.run_in_executor, …) Est-ce possible? Si oui avait vous des idées?

  • Je souhaite pouvoir récupérer les résultats de get_datas au fur et à mesure qu’ils arrivent en créant par exemple une fonction de traitement des résultats.

Pour cela j’ai essayé de modifié la fonction get_datas pour qu’elle retourne des "futures" des résultats de la fonction get_data pour qu’une fonction de traitement puisse les consommer au fur et à mesure mais je n’ai pas réussi. Avez-vous des idées pour ce problème. Dans l’idéal j’aimerais pouvoir définir plusieurs fonctions de traitement qui puissent être lancé de la même manière que la fonction get_data : sans bloquer la session interactive.

Merci pour votre aide!

Salut,

Pour ta première question, il suffit d’exécuter la boucle d’asyncio elle-même dans un thread séparé.

Pour la seconde, tu peux envelopper l’appel à get_data dans une couroutine qui appelle elle-même get_data puis qui traite les données quand elles sont disponibles.

+0 -0

Salut,

Merci pour ta réponse! En effet, pour que je puisse garder la main sur la session interactive l’exécution de la coroutine dans un thread séparé est la bonne solution.

Par contre, je galère vraiment à l’implémenter. J’ai réussi à avoir un code qui fonctionne que lorsque la fonction get_data ne lance pas de process.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import threading

import random

loop = asyncio.get_event_loop()

def generate_input(n):
    return range(n)

async def get_data(i):
    sleep_time = random.randint(2,10)
    print("Get data {i} ({sleep_time}s)".format(i=i,sleep_time=sleep_time))
    await asyncio.sleep(sleep_time)
    print("Get data {i} finished".format(i=i))
    return i

async def get_datas(n):
    jobs = []
    for i in generate_input(n):
        job = get_data(i)
        jobs.append(job)
    results = []
    for next_job in asyncio.as_completed(jobs):
        result = await next_job
        results.append(result)
    return results

def run_in_thread(coro):
    loop = asyncio.new_event_loop()
    def _thred_run():
        asyncio.set_event_loop(loop)
        res = loop.run_until_complete(coro)
        print(res)
    thread = threading.Thread(target=_thred_run)
    thread.start()

Lorsque j’appel la fonction suivante tout fonctionne parfaitement :

1
run_in_thread(get_datas(10))

J’ai pas mal chercher d’info sur le lancement d’une boucle d’évènement dans un autre thread. J’ai trouvé qu’il faut bien faire attention à créer une boucle d’évènement dans le thread. Il me semble que je fais les choses correctement dans cette nouvelle version.

Par contre j’ai besoin que ma coroutine exécuté dans le thread lance des process. Voici la version modifiée de ma coroutine get_data qui lance un process :

1
2
3
4
5
6
7
async def get_data(i):
    sleep_time = random.randint(2,10)
    print("Get data {i} ({sleep_time}s)".format(i=i,sleep_time=sleep_time))
    job = await asyncio.create_subprocess_exec("sleep",str(sleep_time))
    res = await job.wait()
    print("Get data {i} finished".format(i=i))
    return i

Dans ce cas la j’obtiens l’erreur suivante :

1
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached

J’ai pas mal chercher d’info sur le lancement de process dans une coroutine exécuté dans un autre thread. J’ai vu qu’il y avait des subtilités dans la gestion des signaux des sous-process qui sont envoyé au thread principale.

La doc https://docs.python.org/3/library/asyncio-subprocess.html#subprocess-and-threads conseil d’appeler avant toute chose la fonction suivante :

1
asyncio.get_child_watcher()

Cela permet d’éviter l’exception "RuntimeError: Cannot add child handler, the child watcher does not have a loop attached" mais le thread ne finit jamais et s’arrète aprés le lancement de tous les process :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Get data 8 (4s)
Get data 4 (3s)
Get data 1 (6s)
Get data 5 (3s)
Get data 0 (9s)
Get data 9 (6s)
Get data 3 (6s)
Get data 7 (9s)
Get data 2 (3s)
Get data 6 (3s)

As-tu déjà eu ce genre de problème? Merci!

+0 -0

Hmm tu es dans un cas assez particulier.

Là comme ça je ne vois que trois solutions :

  • lancer la session interactive elle-même dans un thread séparé et le traitement dans le thread principal,
  • lancer le traitement dans un processus séparé au lieu d’un thread,
  • coder la console interactive dans une coroutine qui tourne dans la même boucle que les traitements.

De base j’aurais une préférence pour la troisième solution.

+0 -0

Salut,

Problème intéressant ! Je suis avec intérêt ce sujet.

Une autre piste possible : si tu travailles avec IPython, tu peux utiliser un backgroundjobs pour lancer un thread. Dans celui-ci tu pourrais utiliser un ProcessPoolExecutor pour lancer tes processus avec subprocess.run et attendre leur résultat avec concurrent.futures.as_completed.

Cette piste n’utilise donc pas asyncio, mais juste un thread et le module concurrent.futures

Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte