Bonjour à tous,
C’est mon premier message ici et je suis ravi de franchir enfin le pas :). Je débute tout juste en programmation asynchrone et j’éprouve quelques difficultés à gérer cette nouvelle notion.
Voici ce que je voudrais faire de sorte à optimiser mon programme :
1) Télécharger des fichiers volumineux à partir d’une liste prédéfinie : 2 + 3 threads Je m’explique, je télécharge avec deux scripts externes dont l’un supporte 2 téléchargements simultanés (a), et l’autre 3 (b). Cette limite est liée aux serveurs et non aux scripts.
2) À chaque fois qu’un fichier est prêt, réaliser des traitements dessus : 5 threads (c). Cette partie est réalisée via un script externe.
La première partie est donc un cas d’école de programmation concurrente (I/O bound), la seconde de programmation parallèle (CPU bound). Pour l’instant je suis parti sur le module concurrent.futures : ThreadPoolExecutor pour 1) et ProcessPoolExecutor pour 2).
Voici mes difficultés :
-
comment faire en sorte d’avoir la même queue pour le thread de téléchargement (a) et (b) de sorte à ce qu’il y ait toujours 5 fichiers en téléchargement mais que les deux scripts ne téléchargent pas deux fois le même fichier.
-
comment faire communiquer la partie téléchargement et traitement de sorte à ce qu’il n’y ait aucune attente et que tous les threads soient "pleins".
Est ce que je suis sur la bonne voie, sinon quelle approche me conseillez-vous ?
Un exemple concret. (Les commandes seront à remplacer par mes scripts de téléchargement). Comment faire en sorte que chaque fonction ne traite pas le dossier déjà traité par l’autre fonction ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import concurrent.futures from subprocess import Popen dossiers = ["dossier1", "dossier2", "dossier3", ...] def liste_sans_détails(dossier): command = 'ls {0}'.format(dossier) process = Popen(command.split(), stdout=PIPE) return process.communicate()[0] def liste_avec_détails(dossier): command = 'ls -a {0}'.format(dossier) process = Popen(command.split(), stdout=PIPE) return process.communicate()[0] with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: future_to_liste1 = {executor.submit(liste_sans_détails, dossier): dossier for dossier in dossiers} future_to_liste2 = {executor.submit(liste_avec_détails, dossier): dossier for dossier in dossiers} for future in concurrent.futures.as_completed(future_to_liste1): print(future.result()) for future in concurrent.futures.as_completed(future_to_liste2): print(future.result()) |