Python: télécharger et traiter des fichiers en approche concurrente et parallèle

structure à adopter

a marqué ce sujet comme résolu.

Bonjour à tous,

C’est mon premier message ici et je suis ravi de franchir enfin le pas :). Je débute tout juste en programmation asynchrone et j’éprouve quelques difficultés à gérer cette nouvelle notion.

Voici ce que je voudrais faire de sorte à optimiser mon programme :

1) Télécharger des fichiers volumineux à partir d’une liste prédéfinie : 2 + 3 threads Je m’explique, je télécharge avec deux scripts externes dont l’un supporte 2 téléchargements simultanés (a), et l’autre 3 (b). Cette limite est liée aux serveurs et non aux scripts.

2) À chaque fois qu’un fichier est prêt, réaliser des traitements dessus : 5 threads (c). Cette partie est réalisée via un script externe.

La première partie est donc un cas d’école de programmation concurrente (I/O bound), la seconde de programmation parallèle (CPU bound). Pour l’instant je suis parti sur le module concurrent.futures : ThreadPoolExecutor pour 1) et ProcessPoolExecutor pour 2).

Voici mes difficultés :

  • comment faire en sorte d’avoir la même queue pour le thread de téléchargement (a) et (b) de sorte à ce qu’il y ait toujours 5 fichiers en téléchargement mais que les deux scripts ne téléchargent pas deux fois le même fichier.

  • comment faire communiquer la partie téléchargement et traitement de sorte à ce qu’il n’y ait aucune attente et que tous les threads soient "pleins".

Est ce que je suis sur la bonne voie, sinon quelle approche me conseillez-vous ?

Un exemple concret. (Les commandes seront à remplacer par mes scripts de téléchargement). Comment faire en sorte que chaque fonction ne traite pas le dossier déjà traité par l’autre fonction ?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import concurrent.futures
from subprocess import Popen

dossiers = ["dossier1", "dossier2", "dossier3", ...]

def liste_sans_détails(dossier):
    command = 'ls {0}'.format(dossier)
    process = Popen(command.split(), stdout=PIPE)
    return process.communicate()[0]

def liste_avec_détails(dossier):
    command = 'ls -a {0}'.format(dossier)
    process = Popen(command.split(), stdout=PIPE)
    return process.communicate()[0]

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future_to_liste1 = {executor.submit(liste_sans_détails, dossier): dossier for dossier in dossiers}
    future_to_liste2 = {executor.submit(liste_avec_détails, dossier): dossier for dossier in dossiers}
    for future in concurrent.futures.as_completed(future_to_liste1):
        print(future.result())
    for future in concurrent.futures.as_completed(future_to_liste2):
        print(future.result())
+0 -0

La première partie est donc un cas d’école de programmation concurrente (I/O bound), la seconde de programmation parallèle (CPU bound)

Pas sûr que la seconde soit forcément CPU bound.

De plus si le traitement d’un fichier est réalisé par un script externe ça n’a juste plus aucune importance : il suffit d’appeler le script dans le même thread que celui qui télécharge le fichier.

J’ai l’impression que tu t’es compliqué la vie pour pas grand chose… Tu pourrais nous montrer du vrai code stp ?

+0 -0

Salut,

J’ai vraiment du mal à comprendre ta/tes question(s). J’ai lu ton sujet hier soir et je pensais que ça irait mieux après la nuit mais je ne saisis toujours pas ce qui te pose problème.

Déjà, comment fonctionnent les scripts (a) et (b) dont tu parles ? Où obtiennent-ils les adresses des fichiers à télécharger, et comment signalent-ils qu’un téléchargement est terminé ?

Merci pour vos réponses. Désolé de ne pas être assez clair, j’ai un peu de mal à organiser tout ça et à manipuler ces nouveaux concepts.

Les 2 scripts de téléchargement ne téléchargent qu’un fichier à la fois chacun. Il faut donc les lancer plusieurs fois en parallèle pour en télécharger plusieurs. Le premier script ne peut être lancé que 2 fois, l’autre 3 fois. Au delà, les serveurs refusent la connexion (et de toute façon ma bande passante serait trop "grignotée").

Le script de traitement (coûteux en ressources et chronophage) n’accepte qu’un fichier à la fois. Comme il est gourmand, je ne veux le lancer que 5 fois en parallèle (sur des cores différents).

Pour résumer, ces scripts ne dépendent pas de moi et la façon dont ils fonctionnent n’est pas très importante. Ce qui m’intéresse vraiment c’est que ma liste de fichiers à télécharger soit consommée au fur et à mesure (au mieux 5 à la fois), et qu’ils soient traités directement derrière mais pas par plus de 5 processus simultanément.

Ce que je voudrais c’est quelque chose comme ça :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# longue liste de fichiers volumineux à télécharger
fichiers_a_telecharger = [nom_fichier1, nom_fichier2, nom_fichier3 ....]

# lorsque qu'un processus a téléchargé un fichier, il le rajoute dans cette queue
queue_a_traiter = queue()

# en faire une queue : lorsqu'un nom_fichier est pris par un processus de
# téléchargement, celui-ci sera retiré pour ne pas être téléchargé une seconde fois
queue_a_telecharger = queue(fichiers_a_telecharger)

def download_serverA():
    récupérer nom_fichier de queue_a_telecharger   # nom_fichier est retiré de la queue
    lancer_script_1(nom_fichier)
    mettre nom_fichier dans queue_a_traiter

def download_serverB():
    récupérer nom_fichier de queue_a_telecharger
    lancer_script_2(nom_fichier)
    mettre nom_fichier dans queue_a_traiter

def traiter_fichier():
    récupérer élément dans queue_a_traiter
    traiter élément

tant que queue_a_télécharger n'est pas vide:
    avoir max 2 process de download_serverA
    avoir max 3 process de download_serverB
    avoir max 5 process de traiter_fichier

J’ai du mal à mettre du vrai code, car je ne sais pas trop où aller…

+0 -0

Ce que j’essaye de t’expliquer dans mon premier message, c’est que si tes opérations sont dans un script à part que tu lances avec subprocess (ce que tu sembles décrire), alors tu n’as pas besoin d’un ProcessPoolExecutor : les threads suffisent.

Du coup à ta place je me simplifierais la vie, j’écrirais une fonction qui appelle un (des deux) scripts de download + le script de traitement, soit une fonction qui traite complètement un fichier. Pour ces histoires de limitation, plutôt que de gérer ça aver un pool, j’utiliserais des sémaphores.

Et pour faire bonne mesure, je serais même tenté d’utilisé asyncio pour faire tout ça sans me compliquer la vie.

+0 -0

Me revoilà après de nombreuses lectures et de nombreux tests. J’ai du mal à intégrer une deuxième fonction "download" qui grignoterait également "files". Une question également concernant l’output de download: il m’est impossible d’accéder aux éléments du tupple retourné par la coroutine. Qu’en pensez-vous ?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
from random import randint
from datetime import datetime

files = "ABCDEFGH"

DL_LIMIT = asyncio.Semaphore(3)
PROC_LIMIT = asyncio.Semaphore(2)

async def download(f):
    """
    download DL_LIMIT files simultaneously
    """
    async with DL_LIMIT:
        download_time = randint(1,5)
        # old-school progress bar
        for i in range(1,5):
            print("> {}".format(f),i*("*"))
            await asyncio.sleep(download_time/4)
        print("   > {} downloaded in {} seconds".format(f, download_time))
        # f.set_result("done")
        return f,download_time

async def process(f):
    """
    process PROC_LIMIT files simultaneously
    """
    async with PROC_LIMIT:
        process_time = 3
        # process_time = await f[1]
        await asyncio.sleep(process_time)
        print("      > {} processed in {} seconds".format(await f, process_time))


async def main():
    start_time = datetime.now()
    downloads = []
    for f in files:
        downloads.append(asyncio.ensure_future(download(f)))
    processings = []
    for dl in asyncio.as_completed(downloads):
        processings.append(asyncio.ensure_future(process(dl)))
    await asyncio.gather(*downloads,*processings)
    duration = datetime.now() - start_time

    print ("\ndownloaded and processed {} files in {}".format(len(files),duration))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
+0 -0

il m’est impossible d’accéder aux éléments du tupple retourné par la coroutine

Ça c’est facile : ta liste downloads contient des futures. Ceux-ci ont une méthode future.result() qui te retourne le résultat de la coroutine lorsque celle-ci est terminée.

+0 -0

En essayant d’utiliser la méthode future.result() dans la coroutine process ou main, j’ai des erreurs:

AttributeError: ’generator’ object has no attribute ’result’

1
2
3
4
5
6
async def process(f): # l 28
    """
    process PROC_LIMIT files simultaneously
    """
    async with PROC_LIMIT:
        print(await f.result())

ou

1
2
    for f in files:  # l 41
        downloads.append( asyncio.ensure_future(download(f)).result() )

ou

1
2
    for dl in asyncio.as_completed(downloads):  # l 43
        res = dl.result()

Je ne vois pas trop mon erreur.

+0 -0

l’exemple avec ensure_future est le bon : asyncio.ensure_future retourne un objet Future, seulement tu ne peux appeler la méthode result() que lorsque la coroutine est finie, donc dans ta boucle sur as_completed.

Edit: ah non j’ai probablement dit une bêtise. Je fais un test et je te dis.

+0 -0

Bon, alors en fait tu pourrais arriver à tes fins en créant explicitement des objets Future que tu passes à tes coroutines (la doc officielle montre ce genre d’exemple), mais dans ton cas tu as plus simple :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import random

FILES = "ABCDEFGHIJ"
SITES = {
    "site1.com": 3,
    "site2.com": 2
}
DOWNLOAD_SEM = {k: asyncio.Semaphore(v) for k, v in SITES.items()}
PROCESS_SEM = asyncio.Semaphore(5)

async def download(filename, site):
    async with DOWNLOAD_SEM[site]:
        print("> download {} from {}".format(filename, site))
        await asyncio.sleep(random.randrange(10)/10)
        print("< {} ({})".format(filename, site))

async def process(filename):
    async with PROCESS_SEM:
        print("> processing", filename)
        await asyncio.sleep(1)
        print("< done:", filename)

async def process_file(filename, site):
    await download(filename, site)
    await process(filename)

async def main():
    weighted_sites = [s for s, w in SITES.items() for _ in range(w)]
    jobs = [
        asyncio.ensure_future(process_file(f, random.choice(weighted_sites)))
        for f in FILES
    ]
    await asyncio.gather(*jobs)

loop = asyncio.get_event_loop()
try:
        loop.run_until_complete(main())
finally:
        loop.close()

Je répartis le download entre les deux sites de façon pas très élégante en utilisant un choix random avec des poids. Il y a sûrement moyen de faire la même chose plus joliment mais celle-ci a le mérite de fonctionner et d’être simple.

+0 -0

Bonsoir à tous,

Je me permets de reposter dans ce sujet. En expérimentant avec une seule fonction de téléchargement + 1 process, l’approche suivante fonctionne.

Est-ce que ça se fait d’utiliser des threads dans d’autres threads, ou c’est déconseillé ?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

FILES = "ABCDEFG"

def download(filename):
    print("> downloading {}".format(filename))
    time.sleep(2)
    print("  < downloaded {}".format(filename))
    return filename

def process(filename):
    print("    > processing {}".format(filename))
    time.sleep(3)
    print("        < {} done".format(filename))


with ThreadPoolExecutor(max_workers=3) as executor1:
    future_to_filename1 = {executor1.submit(download, filename): filename for filename in FILES}
    with ThreadPoolExecutor(max_workers=3) as executor2:
        future_to_filename2 = {executor2.submit(process, future.result()): future for future in as_completed(future_to_filename1)}

Je me suis renseigné sur parallel, mais je n’ai pas l’impression que ça puisse m’aider à gérer aussi finement ma file d’exécution qu’asyncio… En tout cas c’est intéressant pour faire de la parallélisation plus rapidement sans se casser la tête.

Je me suis aperçu que cette solution ne marchait pas à tous les coups. Il arrive qu’un téléchargement saute et qu’il n’y en ait que 4 en simultané au lieu de 5. Ça a sans doute à voir avec l’odre dans lequel les sites sont choisis. Et si en plus, on rajoute des conditions après que le mot soit choisi, ça augmente encore plus les chances qu’il saute.

Je n’ai pas encore réussi à faire fonctionner mes essais avec des queues mais j’y travaille…

Bon, alors en fait tu pourrais arriver à tes fins en créant explicitement des objets Future que tu passes à tes coroutines (la doc officielle montre ce genre d’exemple), mais dans ton cas tu as plus simple :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import random

FILES = "ABCDEFGHIJ"
SITES = {
    "site1.com": 3,
    "site2.com": 2
}
DOWNLOAD_SEM = {k: asyncio.Semaphore(v) for k, v in SITES.items()}
PROCESS_SEM = asyncio.Semaphore(5)

async def download(filename, site):
    async with DOWNLOAD_SEM[site]:
        print("> download {} from {}".format(filename, site))
        await asyncio.sleep(random.randrange(10)/10)
        print("< {} ({})".format(filename, site))

async def process(filename):
    async with PROCESS_SEM:
        print("> processing", filename)
        await asyncio.sleep(1)
        print("< done:", filename)

async def process_file(filename, site):
    await download(filename, site)
    await process(filename)

async def main():
    weighted_sites = [s for s, w in SITES.items() for _ in range(w)]
    jobs = [
        asyncio.ensure_future(process_file(f, random.choice(weighted_sites)))
        for f in FILES
    ]
    await asyncio.gather(*jobs)

loop = asyncio.get_event_loop()
try:
        loop.run_until_complete(main())
finally:
        loop.close()

Je répartis le download entre les deux sites de façon pas très élégante en utilisant un choix random avec des poids. Il y a sûrement moyen de faire la même chose plus joliment mais celle-ci a le mérite de fonctionner et d’être simple.

nohar
+0 -0
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte