Le problème d'exécution synchrone du code PHP
En PHP traditionnel, le code s'exécute de manière synchrone : chaque instruction attend la fin de la précédente. Problématique quand une opération prend plusieurs secondes :
<?php
public function sendNotification(User $user)
{
$this->emailService->sendWelcomeEmail($user); // 4 secondes
$this->smsService->sendConfirmation($user); // 2 seconde
$this->analytics->trackAction($user); // 1 seconde
// Total : 7 secondes d'attente pour l'utilisateur !
return $this->render('confirmation.html.twig');
}Résultat : L'utilisateur attend devant un écran qui charge.
Pour résoudre ce problème on peut utiliser "Symfony Messenger" afin de décaler l’exécution des scripts lourd.
Comment fonctionne Symfony Messenger ?
Symfony Messenger permet de différer l'exécution de tâches lourdes pour ne pas bloquer l'utilisateur. Le traitement se fait dans un processus séparé (le worker), de manière réellement asynchrone.
Pour comprendre, imaginons que plusieurs utilisateurs souhaitent importer des fichiers avec des centaines de milliers de lignes. Sans Messenger, le traitement se ferait pendant la requête HTTP : le serveur risquerait de planter et aucun utilisateur ne pourrait importer son fichier.
Pour corriger cela, on applique un système de file d'attente :
- Quand un utilisateur (le Dispatcher) demande un import, on ne le démarre pas directement. On crée un message contenant les informations nécessaires (par exemple : ID de l'import/message, offset, taille du lot ...).
- Ce message est stocké dans une file d'attente (par exemple dans la table
messenger_messages). - L'utilisateur est immédiatement informé que sa demande est prise en compte et sera traitée dans quelques minutes. La requête HTTP se termine.
- Un Worker (processus indépendant) interroge la file d'attente de manière périodique (par exemple chaque seconde ). Dès qu'un message est disponible, il le traite via son Handler qui exécute la logique métier (envoit de mails, lire un fichier CSV, importer les contacts par lots de 100 ... ).
Pour plus d'information n’hésitez par à regarder la documentation officielle.
Dans le suite de ce tutoriel nous allons mettre en place le code nécessaire pour la résolution du problème d'import de fichier.
1- Le Dispatcher
Permet de creer le message. Par exemple :
<?php
namespace App\Controller;
use App\Entity\Import;
use App\Message\ImportContactsMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;
final class ImportController extends AbstractController
{
public function __construct(
// Le bus de messagerie : c'est lui qui achemine les messages
private readonly MessageBusInterface $bus,
) {
}
#[Route('/import/{id}/launch', name: 'app_import_launch', methods: ['POST'])]
public function launch(Import $import): Response
{
// ─── LE DISPATCHER ───
// On crée un message (le ticket) qui contient :
// - L'ID de l'import à traiter
// - L'offset de départ (0 = début du fichier)
// - La taille d'un lot (100 lignes par batch)
$message = new ImportContactsMessage(
importId: $import->getId(),
offset: 0,
batchSize: 100,
);
// On place le message dans la file d'attente
$this->bus->dispatch($message);
// L'utilisateur reçoit une réponse immédiate
$this->addFlash('success', 'Import placé dans la file d\'attente. Il sera traité prochainement.');
return $this->redirectToRoute('app_import_show', [
'id' => $import->getId(),
]);
}
}
2 - Le message
Le message doit être le plus simple possible, car l'objet va être sérialiser pour être stocker en BD.
<?php
namespace App\Message;
/**
* Message transportant les informations nécessaires au traitement d'un lot d'import.
*
* C'est le "ticket" placé dans la file d'attente par le Dispatcher.
* Le Worker le récupérera et le passera au Handler.
*/
final class ImportContactsMessage
{
public function __construct(
/** ID de l'import à traiter */
private readonly int $importId,
/** Ligne de départ dans le fichier CSV (0 = début) */
private readonly int $offset = 0,
/** Nombre de lignes à traiter dans ce lot */
private readonly int $batchSize = 100,
) {
}
public function getImportId(): int
{
return $this->importId;
}
public function getOffset(): int
{
return $this->offset;
}
public function getBatchSize(): int
{
return $this->batchSize;
}
}
3 - Le Handler
il contient la logique métier qui va être exécute.
<?php
namespace App\MessageHandler;
use App\Entity\Import;
use App\Message\ImportContactsMessage;
use App\Service\ContactImportService;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
/**
* Le Handler : c'est lui qui exécute la logique métier quand le Worker
* récupère un message dans la file d'attente.
*/
#[AsMessageHandler]
final class ImportContactsHandler
{
// Pause de 5 secondes entre chaque lot
private const DELAY_MS = 5000;
public function __construct(
private readonly ContactImportService $importService,
private readonly EntityManagerInterface $entityManager,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(ImportContactsMessage $message): void
{
// 1. Traiter un lot de 100 lignes
$nextOffset = $this->importService->processBatch(
importId: $message->getImportId(),
offset: $message->getOffset(),
batchSize: $message->getBatchSize(),
);
// 2. Nettoyer la mémoire après le traitement
$this->entityManager->flush();
$this->entityManager->clear();
// 3. S'il reste des lignes à traiter, on relance un nouveau message
// avec un délai de 5 secondes pour laisser le serveur respirer
if ($nextOffset !== null) {
$this->bus->dispatch(
new ImportContactsMessage(
$message->getImportId(),
$nextOffset,
$message->getBatchSize(),
),
[new DelayStamp(self::DELAY_MS)]
);
}
}
}Dans le Handler, nous avons "new DelayStamp(self::DELAY_MS)" qui ajoute un délais avant l'ajout dans la fille d'attente.
Sans DelayStamp :
Worker : traite le lot 0 (100 lignes)
Worker : flush → clear → dispatch lot 100
Worker : le message est IMMÉDIATEMENT disponible
Worker : récupère le lot 100 → le traite
Worker : flush → clear → dispatch lot 200
Worker : récupère le lot 200 → le traiteLe handler monopolise le CPU et la base de données sans pause. Les autres tâches (autres handler, workers, cron, requêtes HTTP, ...) sont ralenties.
4 - Le worker
À ce stade, nous avons déjà mit en place toutes la logique métier.
NB: si vous exécutez votre code, il s’exécute de manière direct. Le moyen de transport est direct (TGV) :).
Nous devons choisir un moyen de transport qui prend son temps. Symfony propose plusieurs moyen de transport de message.
Nous optons pour "Doctrine Transport" car il est simple en mettre en place, mais pour des meilleures performance utiliser "Redis" ou "AMQP (RabbitMQ)".
Configuration Doctrine Transport :
Dans le fichier .env, dé-commenter la ligne suivante ou ajouter la si elle n'existe pas :
###> symfony/messenger ###
MESSENGER_TRANSPORT_DSN=doctrine://default?auto_setup=0
###< symfony/messenger ###://default => connexion par défaut à la base de données. ( Vous pouvez utiliser une autre base de données ).
auto_setup=0 => Ne crée PAS automatiquement la table. Très important pour la production, si la table "messenger_messages" n'existe pas effectuer une migration.
Ajuster le fichier /config/packages/messenger.yaml :
framework:
messenger:
failure_transport: failed
# https://symfony.com/doc/current/messenger.html#transport-configuration
transports:
# 1: doctrine transport
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
multiplier: 2
# 2: exemple "Redis transport"
#fast_priority:
# dsn: 'redis://localhost:6379/messages_fast'
# retry_strategy:
# max_retries: 2
# 3: doctrine transport
heavy_processing:
dsn: 'doctrine://default?queue_name=heavy'
retry_strategy:
max_retries: 1 # Si un très gros fichier plante, on ne réessaie qu'une fois.
# 4: Contient les messages qui ont echoue.
failed: 'doctrine://default?queue_name=failed'
default_bus: messenger.bus.default
buses:
messenger.bus.default: []
routing:
Symfony\Component\Mailer\Messenger\SendEmailMessage: async
Symfony\Component\Notifier\Message\ChatMessage: async
Symfony\Component\Notifier\Message\SmsMessage: async
# Route your messages to the transports
App\Message\ImportContactsMessage: async
# exemple d'utilisation du transport "heavy_processing"
# App\Message\ImportBigFileMessage: heavy_processing
La configuration de notre moyens de transport doctrine est en place. passons à l’exécution.
Le Worker est un processus fourni par Symfony via la commande bin/console messenger:consume async
En développement
Vous pouvez exécuter les moyens de transport "doctrine transport" en ligne de commande :
php bin/console messenger:consume async -vvEn production
Vous devez créer un worker fichier /etc/systemd/system/symfony-messenger.service :
[Unit]
Description=Symfony Messenger Worker
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=30
User=www-data
Group=www-data
WorkingDirectory=/var/www/app
ExecStart=/usr/bin/php bin/console messenger:consume async --time-limit=3600 --memory-limit=128M
# Environnement (si besoin de variables)
Environment=APP_ENV=prod
Environment=APP_DEBUG=0
# Limites de ressources
MemoryLimit=256M
CPUQuota=50%
# Sécurité
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/www/app/var
Commande pour gérer le woker:
# Démarrer
sudo systemctl start symfony-messenger
# Statut
sudo systemctl status symfony-messenger
# Voir les logs
sudo journalctl -u symfony-messenger -f
# Activer au démarrage du serveur
sudo systemctl enable symfony-messenger