Ajout dynamique de tâche dans une file de tâches séquentielles

a marqué ce sujet comme résolu.

Bonjour,

Un service externe fait appel à une API de type REST exposée par une application Django REST Framework qui joue le rôle de backend. Voici comment on veut gérer ces appels dans un contexte spécifique.

  • Le service lance parallèlement plusieurs appels au même service Web (c-à-d à une même URL) et se met en attente de la réponse à ces requêtes (appels synchrones donc).
  • Le backend, réceptionnant ces requêtes, doit les classer en groupes de tâches. Dans chaque groupe, les tâches doivent être exécutées les unes à la suite des autres. En revanche, les groupes doivent être lancés simultanément.
  • Comme on ne sait pas à l’avance quelles sont les tâches que va comporter un groupe, il faut pouvoir ajouter des tâches à un groupe déjà existant à l’exécution (ou de manière dynamique).

Après quelques recherches, j’ai choisi d’utiliser Celery pour répondre à ce besoin. Je suis cependant dans une impasse.

  • En regardant la documentation, la primitive chain semblait parfaitement correspondre au concept de groupe de tâches séquentielles. Cependant, je ne vois pas comment je peux ajouter dynamiquement / à l’exécution de tâches à une chaine pré-existante.
  • En outre, je suis parti sur l’idée qu’il y aurait 1 worker par service Web exposé. Du coup, une file de tâches n’est traitée que par 1 seul worker. Le problème, c’est que les files sont groupées selon le service Web mais aussi un identifiant métier supplémentaire. Cela signifie que pour un service Web, il y a plusieurs files correspondant à un groupe métier différent.
  • Je précise que côté Celery, j’ai activé le protocole RPC avec RabbitMQ pour récupérer les résultats des tâches qui doivent être renvoyés au service appelant.

En conséquence, je me pose premièrement la question de savoir si Celery est le bon outil pour répondre à mon besoin. Si oui, comment résoudre les problèmes décrits plus hauts ? Si non, quel outil Python est adapté à ce problème ?

+0 -0

Salut,

Je ne connais plus en détails les options de Celery, mais est-ce qu’il n’est pas possible d’exécuter des tâches avec une forme de verrou (une chaîne de caractère pouvant s’assimiler à un jeton par exemple) qui définirait les groupes, et ainsi tu rajouterais une tâche à un groupe en réutilisant un même jeton.

Je sais que d’autres systèmes de tâches asynchrones mettent en place un mécanisme du genre, donc je me dis que ça doit être possible avec celery.

Pour essayer de clarifier le problème, j’ai réalisé le schéma suivant :

Diagramme illustrant des files, tâches et workers Celery RabbitMQ avec Django

C’est la manière dont fonctionne actuellement la première implémentation que j’ai réalisée pour répondre au problème. Ceci dit, celle-ci reste un contournement et la manière de fonctionner est différente de ce que l’on voulait faire initialement.

C’est pourquoi je vais séparer en deux parties l’exposition des problèmes que je rencontre.

Problèmes au niveau du contournement

Le service externe lance des requêtes REST synchrones simultanément sur toutes les opérations (a, b, c, d) au service Web exposé par Django. Les opérations a, b et d appartiennent au lot 1, l’opération c au lot 2. On crée donc dynamiquement une file pour le lot 1 et le service Web A, et une autre file pour le lot 2 et le même service Web A. Ces files sont attribuées au worker A, qui est créé en avance et auquel on destine toutes les files qui contiennent les tâches du service Web A. Comme son paramètre concurrency est réglé à 1, il ne va traiter qu’une tâche à la fois. Donc les tâches dans les files du service Web seront bien traitées l’une à la suite de l’autre, ce qui est bien ce que l’on veut. Par contre, on voudrait que les tâches a_A, b_A et d_A soient bien réalisées séquentiellement, mais avec c_A qui soit réalisée en parallèle, car l’opération c appartient à un lot différent. C’est ici que se situe l’écart avec ce que l’on voulait initialement, mais il est temporairement acceptable.

En revanche, un gros problème bien plus problématique se situe dans le retour des réponses des tâches à l’application Celery et Django. En effet, il est apparu au cours d’un test que certaines tâches, bien qu’indiquées comme étant réalisés dans les journaux des workers, ne voyaient jamais leurs réponses apparaître dans les journaux du serveur Django. Je me suis rendu compte après avoir fouillé que les réponses étaient comme « bloquées » dans les files RabbitMQ. J’en ai eu la confirmation car en arrêtant le serveur RabbitMQ, le serveur Django a affiché des erreurs sur ces tâches qui n’apparaissaient environ 24 heures auparavant dans les journaux alors que les workers Celery les avaient bien terminées.

Ma question principale ici est donc comment résoudre ce problème des tâches bloquées dans RabbitMQ ? Comment je peux analyser et débloquer ces tâches ? Est-ce un problème de délai d’expiration (je n’en ai indiqué aucun dans la définition des tâches Celery) ? Peut-on les relancer (je n’ai rien trouvé qui permette de le faire) ?. Est-ce parce l’on utilise le serveur runserver de Django et pas un vrai serveur de production comme Gunicorn ? Je ne trouve que peu d’informations et difficilement sur ces sujets sur le Web, si vous avez des liens qui expliquent ces sujets je suis preneur.

La solution répondant au problème initial

Dans un deuxième temps, on vient à la suggestion d’entwanne sur l’utilisation d’un verrou. Tel que je le comprends, cela permet de s’assurer qu’une tâche n’est exécutée qu’une et une seule fois, par un seul worker. Ce souci peut arriver dans mon cas d’usage si on décide d’augmenter la valeur du paramètre concurrency des workers. Par contre, cela ne permettrait pas de s’assurer que les tâches dans une même file soit exécutées séquentiellement. En effet, si le worker A peut exécuter 2 tâches à la fois, il pourrait exécuter les tâches a_A et d_A en même temps, alors que ce n’est pas ce que l’on veut. On veut par contre qu’il puisse réaliser a_A et c_A en même temps par exemple. À moins qu’il n’y ait quelque chose que j’ai mal compris.

+0 -0

Désolé je ne connais pas assez bien Celery pour répondre à ces questions. Est-ce que tu arrives à reproduire le problème avec un code minimal, et si oui est-ce que tu pourrais poster ce code ?

Pour ce qui est du lock, c’était une idée car je sais que certains gestionnaires de tâches permettent ainsi une exécution séquentielle par groupes.
Si tu envoies tes tâches séquentiellement avec un même lock, elles ne sont pas exécutées dans cet ordre ? Tu n’as rien que tu puisses configurer pour avoir ce comportement ?

Au niveau du contournement, j’ai trouvé le problème : il s’agissait du backend utilisé pour stocker le résultat des tâches qui posait problème : RPC (RabbitMQ). Il semblerait que je ne suis pas le seul à avoir recontré ce ce genre de problèmes.

J’ai par hasard essayé avec Redis comme backend des résultats (mais toujours RabbitMQ comme broker), et je n’ai plus eu ce souci.

Pour la solution au problème initial, je n’avais pas vu le verrou sous cet angle. Je n’ai pas encore commencé à le tester ; je reviendrai partager mes résultats ici.

+0 -0
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte