Aller au contenu principal

Symfony Messenger : exécution asynchrone du code PHP

Le problème d'exécution synchrone du code PHP

En PHP traditionnel, le code s'exécute de manière synchrone : chaque instruction attend la fin de la précédente. Problématique quand une opération prend plusieurs secondes :

<?php 

public function sendNotification(User $user)
{
    $this->emailService->sendWelcomeEmail($user); // 4 secondes
    $this->smsService->sendConfirmation($user);   // 2 seconde
    $this->analytics->trackAction($user);         // 1 seconde
    
    // Total : 7 secondes d'attente pour l'utilisateur !
    return $this->render('confirmation.html.twig');
}

Résultat : L'utilisateur attend devant un écran qui charge.
Pour résoudre ce problème on peut utiliser "Symfony Messenger" afin de décaler l’exécution des scripts lourd.

 

Comment fonctionne Symfony Messenger ?

Symfony Messenger permet de différer l'exécution de tâches lourdes pour ne pas bloquer l'utilisateur. Le traitement se fait dans un processus séparé (le worker), de manière réellement asynchrone.

Pour comprendre, imaginons que plusieurs utilisateurs souhaitent importer des fichiers avec des centaines de milliers de lignes. Sans Messenger, le traitement se ferait pendant la requête HTTP : le serveur risquerait de planter et aucun utilisateur ne pourrait importer son fichier.

Pour corriger cela, on applique un système de file d'attente :

  1. Quand un utilisateur (le Dispatcher) demande un import, on ne le démarre pas directement. On crée un message contenant les informations nécessaires (par exemple : ID de l'import/message, offset, taille du lot ...).
  2. Ce message est stocké dans une file d'attente (par exemple dans la table messenger_messages).
  3. L'utilisateur est immédiatement informé que sa demande est prise en compte et sera traitée dans quelques minutes. La requête HTTP se termine.
  4. Un Worker (processus indépendant) interroge la file d'attente de manière périodique (par exemple chaque seconde ). Dès qu'un message est disponible, il le traite via son Handler qui exécute la logique métier (envoit de mails, lire un fichier CSV, importer les contacts par lots de 100 ... ).

Pour plus d'information n’hésitez par à regarder la documentation officielle

Dans le suite de ce tutoriel nous allons mettre en place le code nécessaire pour la résolution du problème d'import de fichier.

 

1- Le Dispatcher

Permet de creer le message. Par exemple :

<?php

namespace App\Controller;

use App\Entity\Import;
use App\Message\ImportContactsMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;

final class ImportController extends AbstractController
{
    public function __construct(
        // Le bus de messagerie : c'est lui qui achemine les messages
        private readonly MessageBusInterface $bus,
    ) {
    }

    #[Route('/import/{id}/launch', name: 'app_import_launch', methods: ['POST'])]
    public function launch(Import $import): Response
    {
        // ─── LE DISPATCHER ───
        // On crée un message (le ticket) qui contient :
        //   - L'ID de l'import à traiter
        //   - L'offset de départ (0 = début du fichier)
        //   - La taille d'un lot (100 lignes par batch)
        $message = new ImportContactsMessage(
            importId: $import->getId(),
            offset: 0,
            batchSize: 100,
        );

        // On place le message dans la file d'attente
        $this->bus->dispatch($message);

        // L'utilisateur reçoit une réponse immédiate
        $this->addFlash('success', 'Import placé dans la file d\'attente. Il sera traité prochainement.');

        return $this->redirectToRoute('app_import_show', [
            'id' => $import->getId(),
        ]);
    }
}

 

2 - Le message

Le message doit être le plus simple possible, car l'objet va être sérialiser pour être stocker en BD.

<?php

namespace App\Message;

/**
 * Message transportant les informations nécessaires au traitement d'un lot d'import.
 * 
 * C'est le "ticket" placé dans la file d'attente par le Dispatcher.
 * Le Worker le récupérera et le passera au Handler.
 */
final class ImportContactsMessage
{
    public function __construct(
        /** ID de l'import à traiter */
        private readonly int $importId,
        /** Ligne de départ dans le fichier CSV (0 = début) */
        private readonly int $offset = 0,
        /** Nombre de lignes à traiter dans ce lot */
        private readonly int $batchSize = 100,
    ) {
    }

    public function getImportId(): int
    {
        return $this->importId;
    }

    public function getOffset(): int
    {
        return $this->offset;
    }

    public function getBatchSize(): int
    {
        return $this->batchSize;
    }
}

 

3 - Le Handler

il contient la logique métier qui va être exécute.

<?php

namespace App\MessageHandler;

use App\Entity\Import;
use App\Message\ImportContactsMessage;
use App\Service\ContactImportService;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

/**
 * Le Handler : c'est lui qui exécute la logique métier quand le Worker
 * récupère un message dans la file d'attente.
 */
#[AsMessageHandler]
final class ImportContactsHandler
{
    // Pause de 5 secondes entre chaque lot
    private const DELAY_MS = 5000;

    public function __construct(
        private readonly ContactImportService $importService,
        private readonly EntityManagerInterface $entityManager,
        private readonly MessageBusInterface $bus,
    ) {
    }

    public function __invoke(ImportContactsMessage $message): void
    {
        // 1. Traiter un lot de 100 lignes
        $nextOffset = $this->importService->processBatch(
            importId: $message->getImportId(),
            offset: $message->getOffset(),
            batchSize: $message->getBatchSize(),
        );

        // 2. Nettoyer la mémoire après le traitement
        $this->entityManager->flush();
        $this->entityManager->clear();

        // 3. S'il reste des lignes à traiter, on relance un nouveau message
        //    avec un délai de 5 secondes pour laisser le serveur respirer
        if ($nextOffset !== null) {
            $this->bus->dispatch(
                new ImportContactsMessage(
                    $message->getImportId(),
                    $nextOffset,
                    $message->getBatchSize(),
                ),
                [new DelayStamp(self::DELAY_MS)]
            );
        }
    }
}

Dans le Handler, nous avons  "new DelayStamp(self::DELAY_MS)" qui ajoute un délais avant l'ajout dans la fille d'attente. 

Sans DelayStamp :

Worker : traite le lot 0 (100 lignes)
Worker : flush → clear → dispatch lot 100
Worker : le message est IMMÉDIATEMENT disponible
Worker : récupère le lot 100 → le traite
Worker : flush → clear → dispatch lot 200
Worker : récupère le lot 200 → le traite

Le handler monopolise le CPU et la base de données sans pause. Les autres tâches (autres handler, workers, cron, requêtes HTTP, ...) sont ralenties.

 

4 - Le worker

À ce stade, nous avons déjà mit en place toutes la logique métier. 
NB: si vous exécutez votre code, il s’exécute de manière direct. Le moyen de transport est direct (TGV) :).

Nous devons choisir un moyen de transport qui prend son temps. Symfony propose plusieurs moyen de transport de message.

Nous optons pour "Doctrine Transport" car il est simple en mettre en place, mais pour des meilleures performance utiliser "Redis" ou "AMQP (RabbitMQ)".

Configuration Doctrine Transport :

Dans le fichier .env, dé-commenter la ligne suivante  ou ajouter la si elle n'existe pas : 

###> symfony/messenger ###
MESSENGER_TRANSPORT_DSN=doctrine://default?auto_setup=0
###< symfony/messenger ###

://default => connexion par défaut à la base de données. ( Vous pouvez utiliser une autre base de données )

auto_setup=0 => Ne crée PAS automatiquement la table. Très important pour la production, si la table "messenger_messages" n'existe pas effectuer une migration.

Ajuster le fichier /config/packages/messenger.yaml :

framework:
    messenger:
        failure_transport: failed

        # https://symfony.com/doc/current/messenger.html#transport-configuration
        transports:
            # 1: doctrine transport        
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
                    
            # 2: exemple "Redis transport"
            #fast_priority:
            #    dsn: 'redis://localhost:6379/messages_fast'
            #    retry_strategy:
            #        max_retries: 2
            
            # 3: doctrine transport
            heavy_processing:
                dsn: 'doctrine://default?queue_name=heavy'
                retry_strategy:
                    max_retries: 1 # Si un très gros fichier plante, on ne réessaie qu'une fois.
                    
            # 4: Contient les messages qui ont echoue.
            failed: 'doctrine://default?queue_name=failed'

        default_bus: messenger.bus.default

        buses:
            messenger.bus.default: []

        routing:
            Symfony\Component\Mailer\Messenger\SendEmailMessage: async
            Symfony\Component\Notifier\Message\ChatMessage: async
            Symfony\Component\Notifier\Message\SmsMessage: async

            # Route your messages to the transports
            App\Message\ImportContactsMessage: async
            # exemple d'utilisation du transport "heavy_processing"
            # App\Message\ImportBigFileMessage: heavy_processing

La configuration de notre moyens de transport doctrine est en place. passons à l’exécution.

Le Worker est un processus fourni par Symfony via la commande bin/console messenger:consume async

En développement 

Vous pouvez exécuter les moyens de transport "doctrine transport" en ligne de commande :

 php bin/console messenger:consume async -vv

En production 

Vous devez créer un worker fichier /etc/systemd/system/symfony-messenger.service :

[Unit]
Description=Symfony Messenger Worker
After=network.target
StartLimitIntervalSec=0

[Service]
Type=simple
Restart=always
RestartSec=30
User=www-data
Group=www-data
WorkingDirectory=/var/www/app
ExecStart=/usr/bin/php bin/console messenger:consume async --time-limit=3600 --memory-limit=128M

# Environnement (si besoin de variables)
Environment=APP_ENV=prod
Environment=APP_DEBUG=0

# Limites de ressources
MemoryLimit=256M
CPUQuota=50%

# Sécurité
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/www/app/var

Commande pour gérer le woker: 

# Démarrer
sudo systemctl start symfony-messenger

# Statut
sudo systemctl status symfony-messenger

# Voir les logs
sudo journalctl -u symfony-messenger -f

# Activer au démarrage du serveur
sudo systemctl enable symfony-messenger

 

Profile picture for user admin Stephane K

Écrit le

Il y'a 6 mois
Modifié
Il y'a 42 minutes
Loading ...
Need personalized advice?
Envoyer