Bonjour à tous!
J’utilise depuis longtemps Python pour le traitement de données. D’habitude je charge les données depuis des fichiers sur le disque dur pour ensuite les traiter. Pour ça je n’ai pas de soucis.
Par contre, aujourd’hui, je fais face à un problème un peu différent : je récupère mes données en lançant un grand nombre de commande (appel à un programme extérieure).
Pour faire ça de manière séquentielle et synchrone je n’ai aucun soucis mais comme je dois lancer un grand nombre de commande qui dure chacune assez longtemps (plusieurs minutes), j’essaye de faire tourner tout ça de manière concurrente et asynchrone.
Pour le moment je suis arrivé à cette version qui fonctionne parfaitement mais qui présente à mes yeux encore des défauts :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import asyncio import random def generate_input(n): return range(n) async def get_data(i): sleep_time = random.randint(2,10) print("Get data {i} ({sleep_time}s)".format(i=i,sleep_time=sleep_time)) job = await asyncio.create_subprocess_exec("sleep",str(sleep_time)) res = await job.wait() print("Get data {i} finished".format(i=i)) return i async def get_datas(n): jobs = [] for i in generate_input(n): job = get_data(i) jobs.append(job) results,_ = await asyncio.wait(jobs) return [result.result() for result in results] |
Je lance alors la récupération des données de la manière suivante :
1 2 3 | loop = asyncio.get_event_loop() results = loop.run_until_complete(get_datas(10)) print(results) |
et qui me donne la sortie suivante :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | Get data 0 (9s) Get data 8 (5s) Get data 7 (7s) Get data 2 (6s) Get data 9 (3s) Get data 6 (5s) Get data 1 (10s) Get data 5 (7s) Get data 4 (3s) Get data 3 (3s) Get data 9 finished Get data 4 finished Get data 3 finished Get data 8 finished Get data 6 finished Get data 2 finished Get data 7 finished Get data 5 finished Get data 0 finished Get data 1 finished [8, 6, 7, 1, 0, 2, 5, 9, 4, 3] |
Les résultats ne sortent pas dans l’ordre mais cela n’a pas d’importance pour moi.
Cette méthode fonctionne parfaitement pour moi mais j’aimerais l’améliorer de 2 manières:
-
Je travaille dans une session interactive (python ou IPython) et je souhaite pouvoir continuer à travailler dans cette session lorsque la fonction get_datas est lancée. J’ai essayé pas mal de chose (loop.call_soon, ThreadPoolExecutor + loop.run_in_executor, …) Est-ce possible? Si oui avait vous des idées?
-
Je souhaite pouvoir récupérer les résultats de get_datas au fur et à mesure qu’ils arrivent en créant par exemple une fonction de traitement des résultats.
Pour cela j’ai essayé de modifié la fonction get_datas pour qu’elle retourne des "futures" des résultats de la fonction get_data pour qu’une fonction de traitement puisse les consommer au fur et à mesure mais je n’ai pas réussi. Avez-vous des idées pour ce problème. Dans l’idéal j’aimerais pouvoir définir plusieurs fonctions de traitement qui puissent être lancé de la même manière que la fonction get_data : sans bloquer la session interactive.
Merci pour votre aide!