Arrêter l'exécution de coroutines via une websocket

quel événement !

a marqué ce sujet comme résolu.

Bonjour,

cela fait pas mal de temps que je galère sur un problème et j'ai besoin d'un regard neuf si ce n'est d'une solution à mon problème.

Côté techno j'utilise :

  • python 3
  • tornado
  • asyncio

Je ne suis même pas sûr de bien concevoir la chose, alors si vous pensez qu'il faut que je revoie ma manière de faire, je le ferai

J'ai un projet dans ma boîte, qui me demande d'envoyer un certain nombre de documents, soit par mail soit selon d'autres méthodes (mais pour l'instant gardons le mail, c'est plus simple) et d'observer les réponses des serveur qui recevront les documents.

Comme l'observation des réponses doit se faire "en temps réel", j'ai donc choisi cette architecture :

  1. affichage du portail de configuration des envois + ouverture d'une websocket
  2. appui sur le bouton de soumission du formulaire -> envoie d'une requête POST en ajax
  3. à chaque réponse du serveur (rejet du mail, acceptation mise en quarantaine) on affiche dynamiquement cette réponse

Mon "architecture" marche très bien quand je connais à l'avance le nombre de documents/mails que je dois envoyer.

Par contre on me demande de pouvoir faire un "envoi à l'infini jusqu'à ce qu'on appui sur "stop" ou que la connexion soit perdue".

Je fais donc ceci :

le handler du post

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ChargeSMTPHandler(RequestHandler):
    env = Environment(loader=FileSystemLoader(join(dirname(__file__), "templates")))
    nb_recipients = 0
    nb_recipients_min = 0
    nb_recipients_max = 0

    @asynchronous
    def post(self, *args, **kwargs):
        """lauch email charge

        :param args:
        :param kwargs:
        :return:
        """
        context = {"pending": True}
        data_json = json_decode(self.request.body)
        server, port = data_json.get('server'), data_json.get('port')
        from_addr, to_addr = data_json.get('from'), data_json.get('to').split(',')
        nb_threads = data_json.get('nb_threads')

        self.finish()

        if data_json.get("type_nb_rec") == "nb":
            self.nb_recipients = min(1, data_json.get("nb_rec"))
        else:
            self.nb_recipients_min = min(1, data_json.get("nb_rec_1"))
            self.nb_recipients_max = max(self.nb_recipients_min, data_json.get("nb_rec_2"))

        if data_json.get("mode_param") == MODE_RAND_INFINITE:

            ChargeSMTPHandler.charge_randomly_infinite(data_json["corpus"], server, port,
                                                       from_addr, to_addr, nb_threads,
                                                       nb_recipients=self.nb_recipients,
                                                       nb_recipients_min=self.nb_recipients_min,
                                                       nb_recipients_max=self.nb_recipients_max,
                                                       mail_from=data_json["from"],
                                                       rcpt_to=data_json["to"],
                                                       helo=data_json["helo"])

    @staticmethod
    def charge_randomly_infinite(data, server, port, from_addr, to_addrs, nb_threads, **kwargs):
        corpus = [join(SAMPLE_DIR, mail_file) for mail_file in data]
        try:
            asyncio.get_event_loop().run_until_complete(send_corpus_mails_infinite(server, port, from_addr, to_addrs, handle_mail, nb_threads, kwargs, *corpus))
        except RuntimeError:
            pass

Puis, dans ma websocket je mets :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ChargeSMTPSocket(WebSocketHandler):

    def open(self, *args, **kwargs):
        #init
        pass

    def on_close(self):
        # todo: close as on_message("stop")
        pass

    def on_message(self, message):
        try:
            if message == "stop":
                loop = asyncio.get_event_loop()
                if loop.is_running():
                    loop.stop()
                    loop.close()
                else:

                    self.logger.info("No mail was being sent.")
            else:
                self.logger.warning("Unknown message recieved : " + message)
        except Exception as e:
            self.logger.error(str(e))
    def write_message(self, message, binary=False):
        ## render
        pass

Seulement deux problèmes s'offrent à moi :

  • dans la version que je viens de donner, la boucle ne s'arrête pas (dans une ancienne, elle s'arrêtait mais impossible de la redémarrer ou alors le serveur s'éteignait)
  • python me dit qu'on ne peut pas "close a running event loop"

En fait, il faudrait que je génère un événement qui dise à la loop "arrête ce que tu es en train de faire maintenant" mais je n'arrive pas à le faire.

Salut,

Je vois deux solutions à ton problème. La première est d'annuler la coroutine démarrée dans la loop asyncio à la place de réaliser un loop.stop. Il faut garder la task quelquepart d'accessible pour ton ChargeSMTPSocket puis faire un task.cancel(). Attention à attraper l'exception CancelledException qui risque d'être lancée au niveau de ton loop.run_until_complete(...).

L'autre solution, que je trouve plus propre, est d'avoir une variable must_stop à False tant que ta boucle infinie tourne. Ton ChargeSMTPSocket pourrait alors la faire passer à True, la boucle s'arrêterait alors d'elle même.

Ton post est ancien, ça m'intéresse de savoir comment tu as fait finalement ?

+2 -0

Clairement y'a un problème de compréhension dans ton post. Si tu arrêtes une event-loop, au mieux il t'engueule (et il a raison), au pire il cesse complètement de fonctionner.

Pour prendre un parallèle un peu foireux : c'est comme si, pour tuer un processus en particulier, tu disais au processeur : "éteins ton ordonnanceur".

La solution du "flag" positionné par ailleurs est souvent celle que j'utilise pour résoudre ce genre de soucis.

+0 -0
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte