Symfony Messenger : Gestion des Messages en file d’attente

Messenger, apparu au printemps 2018 avec la version 4.1 de Symfony est devenu aujourd’hui un composant majeur du framework français. Et pour cause, Messenger répond parfaitement aux problématiques de performance que nos applications peuvent rencontrer. Comment ? Et bien c’est ce que nous allons découvrir à travers cet article qui couvrira les principales spécificités du composant Messenger.

Symfony Messenger, kézako ?

Comme le définit la documentation Symfony, Messenger aide les applications à échanger des messages avec d’autres applications (ou composants d’une même application).

Ces messages peuvent être transmis de manière synchrone sans sortir de l’application ou de manière asynchrone via des systèmes de queue.

Ok, super ! Mais concrètement qu’est ce que cela signifie ?

Imaginez que vous ayez une chaîne sur une plateforme de streaming. En pleine période de confinement liée au Coronavirus vous vient l’idée, géniale et pleine d’altruisme, de proposer à vos abonnés un tutoriel sur la meilleure manière de confectionner des masques en tissu. Vous êtes tellement généreux !

Une fois votre tuto réalisé, vous allez télécharger votre vidéo sur la plateforme. 1Go c’est lourd, certes, mais vous êtes dans les clous du “maximum upload size”.

Par contre du côté de l'application, ce n’est pas le même son de cloche. En effet le serveur, afin que vos futurs spectateurs n’aient pas à télécharger 1Go pour visionner votre vidéo, va devoir compresser et ré-encoder votre fichier. Et tant que le traitement n’est pas terminé votre page continuera de charger. Autant vous dire que cela peut être très long.

C’est donc là que Messenger intervient. Au lieu de lancer directement le traitement de la vidéo, Messenger va simplement le reporter à plus tard.

Dans un premier temps le serveur, une fois la vidéo téléchargée, va vous retourner la réponse suivante :

J’ai bien reçu votre vidéo et vous serez notifié une fois celle-ci publiée.

Vous pouvez alors passer à autre chose.

En parallèle le serveur, lui, va déposer dans un bus de messages une instruction (appelée message) destinée au service d’encodage de la vidéo :

Encode et compresse la vidéo suivante : id 542368

En arrière-plan de l’application, le bus de message remet l’instruction à un système de queue (transport) qui gère une file d’attente. Cette file d’attente cumule toutes les instructions en attente d’être traitées.

Dès que le service d’encodage est disponible, le message est remis dans le bus de message pour lui être enfin délivré. Ce dernier se chargera alors d’encoder et compresser la vidéo et les autres événements comme la publication de la vidéo et la notification de l’utilisateur pourront s'exécuter.

De votre côté, tout aura été transparent. Vous avez pu profiter pleinement de l’application sans noter aucun ralentissement de la plateforme.

Architecture de Messenger

Voyons dorénavant de plus près comment s’articule le fonctionnement de Messenger.

Avant toute chose, il faut savoir que Messenger traite, par défaut, les messages dès qu’ils sont envoyés. Les messages sont alors consommés de manière synchrone et ne quittent jamais l’application.

Messenger, Synchrone

NB: Plusieurs messages peuvent être envoyés simultanément et traités par différents handlers.

1- Un publisher ou producer (exemple un controller, un service, une commande....) émet un message ( ex: “envoyer un email de bienvenue au nouvel inscrit” ) dans le bus de message.

2- Une fois réceptionné, le bus transmet le-dit message à son destinataire, le handler.

3- Dans la foulée le handler exécute l’instruction et un email est envoyé au nouvel inscrit.

Mais l'intérêt principal de Messenger, comme nous l’avons vu précédemment, réside dans sa faculté à traiter de manière asynchrone les messages. Avec la configuration de ce que l’on appelle un transport vous pourrez repousser le moment où devront s'exécuter les tâches et ensuite les effectuer en arrière-plan de votre application.

Messenger, Asynchrone

NB: Vous pouvez configurer plusieurs transports et files d’attente et disposez d’autant de workers que vous le souhaitez.

1- Un publisher ou producer (exemple un controller, un service, une commande....) émet un message ( ex: “envoyer un email de bienvenue au nouvel inscrit” ) dans le bus de message.

2- Une fois réceptionné par le bus, le message est envoyé, via un transport, vers une file d’attente. Le transport n’est autre qu’un système de queue tels que RabbitMQ, Doctrine, vous permettant ainsi de traiter les messages de manière asynchrone.

3- En parallèle un worker va chercher en temps réel les messages depuis le système de queue.

4- Le worker diffuse alors le message en direction du bus  afin que ce dernier l’envoie à son destinataire final.

5- Une fois le message émis au bon handler celui-ci traite la commande, ici l’envoi d’un email au nouvel inscrit.

Un worker est un processus dont le travail consiste à surveiller la file d’attente et à envoyer les messages au bus de messages dès qu’un nouveau message est ajouté à la pile.

Cette gestion des messages en file d’attente vous garantit :

  • un traitement asynchrone

L’expéditeur (exemple un controller) et le récepteur du message (exemple un service) ne sont pas contraints de s'attendre l'un et l'autre. L'expéditeur n’a pas à attendre que le récepteur commence à traiter son message, il poste son information et peut passer à autre chose.

  • aucune perte de données

Les messages étant stockés le temps que le traitement se fasse, une interruption du serveur n’entraîne donc pas la perte d’un traitement en cours.

  • un lissage de la charge

S’il y a un pic de message, les messages s’accumulent dans la pile de message et seront consommés plus tard.

  • une réexpédition des messages & un failure transport

Si jamais un message échoue l’appel n’est pas perdu et sera rejoué automatiquement plus tard. Une fois qu’il atteint son montant maximum d’expédition, le message est par défaut rejeté. Pour éviter cela Messenger vous accorde la possibilité de mettre en place un transport, mode échec, enregistrant tous les messages ayant été rejetés. Vous ne perdez alors aucun message et pouvez même réexpédier à nouveau les messages via ce transport.

Vous pouvez paramétrer le nombre maximum d’expéditions du transport. Par défaut ce nombre est limité à 3.

Mise en application de Messenger

Après la théorie, place à la pratique.

Supposons l'implémentation d’un système de notifications dans notre application. Dans notre exemple chaque utilisateur sera notifié par email de son inscription à l’application.

Installation

Pour ce faire, commençons d’abord par installer dans notre application Symfony Flex, le composant Messenger :

composer require symfony/messenger

Créer un Message & son Handler

Comme nous l’avons vu dans la section précédente, Messenger tourne autour de 2 concepts principaux, le message et le handler. Le message contient l’information et le handler, lui, lit et exécute les instructions du message.

Concernant le message ce n’est ni plus ni moins qu’un objet PHP sérialisable :

<?php

namespace App\Message;

class SendNotification
{
    private $message;

    private $email;

    public function __construct(string $message, string $email)
    {
        $this->message = $message;

        $this->email = $email;
    }

    public function getMessage()
    {
        return $this->message;
    }

    public function getEmail()
    {
        return $this->email;
    }
}

Maintenant que nous disposons de notre message, nous devons le traiter avec un handler. Symfony recommande d’implémenter l’interface MessageHandlerInterface et la fonction __invoke() avec comme paramètre notre message :

<?php

namespace App\MessageHandler;

use App\Message\SendNotification;

class SendNotificationHandler implements MessageHandlerInterface

{
    private $mailer;

    public function __construct(\Swift_Mailer $mailer)
    {
        $this->mailer = $mailer;
    }

    public function __invoke(SendNotification $notification)
    {

       $this->mailer->send(
               (new \Swift_Message('[Demo] Notification'))
                   ->setFrom('hello@example.com')
                   ->setTo($notification->getEmail())
                   ->setBody(                '<h1>Notification</h1><p>'.$notification->getMessage().'</p>',
                   'text/html'
                )
            );

    }
}

Personnellement je préfère implémenter l’interface MessageSubscriberInterface avec la fonction getHandledMessages(). Cela se rapproche du EventSubscriberInterface.

Grâce à l’autoconfiguration et notre paramètre typé SendNotification, le bus de message sera en mesure d’expédier un message SendNotification vers ce gestionnaire.

Pour tester si votre handler est bien configuré, saisissez la commande suivante :

php bin/console debug:messenger

Vous devriez obtenir le résultat suivant :

Messenger
=========

messenger.bus.default
---------------------

 The following messages can be dispatched:
 ------------------------------------------------------------------------------------------------- 
  App\Message\SendNotification                                                               
      handled by App\MessageHandler\SendNotificationHandler
-------------------------------------------------------------------------------------------------

Ok on récapitule, jusqu’ici nous avons créé :

1- notre message

2- son handler

3- nous avons vérifié que le handler était bien configuré

Super ! Il ne nous reste plus qu’à poster le message et pour ceci rien de plus simple, puisque c’est le bus de message qui s’occupe de tout. Fantastisch !

<?php

namespace App\Controller;

use App\Message\SendNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class SignUpController extends AbstractController
{
    /**
     * @Route("/signup", name="signup")
     */
    public function index(Request $request, MessageBusInterface $bus)
    {
        // ...

        // SendNotificationHandler sera automatiquement appelé
        $bus->dispatch(new SendNotification('Salut le nouveau !',$request->get('email')));

        // vous pouvez utiliser le shortcut suivant
        $this->dispatchMessage(new SendNotification('Salut le nouveau !',$request->get('email')));

        // ...
    }
}

Vous êtes dorénavant en mesure d’envoyer un email à chaque nouvelle inscription, ce qui est bien mais pas top puisque la page signup continuera de charger tant que l’email n’est pas envoyé. Et franchement vous ne voulez pas ça ! Ce que vous souhaitez en revanche c’est que votre utilisateur obtienne aussi rapidement que possible une confirmation de son inscription et qu’une notification email lui soit expédiée mais plus tard.

Et si vous vous souvenez bien, c’est avec un système de queue que nous allons pouvoir réaliser ce traitement asynchrone.

Messenger & Doctrine

Pour aborder le sujet du traitement asynchrone je vous propose d’utiliser Doctrine. C’est le transport le plus simple à mettre en place et il vous permettra en un rien de temps de comprendre la mécanique du système de queue.

Configuration

Un transport est enregistré à l'aide d'un DSN (data source name). Grâce à la recette Flex de Messenger, votre fichier .env contient déjà quelques exemples :

# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Veuillez décommenter le transport Doctrine (ou le rajouter dans votre fichier .env.local).

Ensuite, dans config/packages/messenger.yaml, définissons un transport appelé async qui utilise cette configuration :

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

Routage

Dorénavant pour faire en sorte que notre message puisse être expédié dans le transport, il faut le router vers celui-ci :

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            App\Message\SendNotification:  async

Dorénavant App\Message\SendNotification sera envoyé à Doctrine et son handler ne sera pas appelé immédiatement. Par contre tous les messages n’ayant aucune route seront traités directement.

Vous pouvez envoyer vos messages vers différents transports.

Donc si je récapitule nous avons jusqu’ici :

1- un message

2- son handler

3- son transport

Il nous manque plus qu’à consommer le message.

Workers

Pour consommer les messages, Messenger dispose d’une ligne de commande qui scrute les nouveaux messages dans le transport et les diffuse dans le bus de messages. Cette ligne de commande est appelé worker :

php bin/console messenger:consume async -vv

Votre worker s'exécute ainsi en arrière-plan de votre application et se met en ordre de marche dès lors qu’un message est ajouté à la file d’attente.

Bon à savoir, en environnement de production Symfony recommande l’installation de Supervisor. Supervisor est un excellent outil pour garantir que vos processus soient toujours opérationnels même s'ils se ferment en raison d'un échec, du dépassement d'une limite de messages ou grâce à la commande messenger: stop-workers.

Dans le cadre de notre exemple vous pouvez lancer la commande et soumettre en parallèle une nouvelle inscription. Vous devriez obtenir le résultat suivant :

11:43:04 INFO [messenger] Received message App\Message\SendNotification ["message" => App\Message\SendNotification^ { ...},"class" => "App\Message\SendNotification"]

11:43:19 INFO [messenger] Message App\Message\SendNotification handled by App\MessageHandler\SendNotificationHandler::__invoke ["message" => App\Services\Message\SendNotification^ { ...},"class" => "App\Message\SendNotification","handler" => "App\MessageHandler\SendNotificationHandler::__invoke"]

11:43:19 INFO [messenger] App\Message\SendNotification was handled successfully (acknowledging to transport). ["message" => App\Message\SendNotification^ { ...},"class" => "App\Message\SendNotification"]

1- Les logs indiquent que votre message a bien été réceptionné par le bus de messages

2- Le message a bien été envoyé au handler après avoir passé un peu de temps dans la file d’attente, ici 15 secondes.

3- Le message a été traité avec succès, il est retiré de la queue

Veuillez noter également, que Messenger, lors de la première utilisation du transport Doctrine, crée automatiquement une nouvelle table dans votre base de données nommée messenger_messages. Elle correspond tout simplement à la file d’attente du transport Doctrine.

Symfony Messenger : conclusion

Vous l’aurez compris Messenger est un Must Have dans votre application. En plus de répondre efficacement aux contraintes de performance, Messenger accorde un grand niveau de personnalisation qui répondra sans nul doute à tous vos besoins.

Pour ceux qui souhaitent approfondir leur connaissance du sujet et découvrir les autres fonctionnalités dont dispose Messenger (messenger events,envelopes & stamps…) je vous invite à jeter un œil sur la documentation Symfony.

Quant à moi, je vous retrouve très vite pour un nouvel article sur Messenger et le transport AMQP. See you soon !

Commentaires

Il n'y a actuellement aucun commentaire. Soyez le premier !

Articles liés