RabbitMQ async en PHP: como Cohete maneja mensajes sin bloquear


17 de marzo de 2026

PHP y mensajeria asincrona no suelen ir en la misma frase. Pero con ReactPHP y bunny 0.6, funciona. Y funciona bien.

Este post explica como hemos integrado RabbitMQ en el skeleton de Cohete para que HTTP y AMQP convivan en el mismo proceso, en el mismo event loop, sin bloquear nada.

El problema

Tienes un servidor HTTP async con ReactPHP. Cuando un usuario crea un recurso, quieres publicar un domain event (por ejemplo, TodoCreated). Ese evento puede disparar notificaciones, indexar en busqueda, actualizar caches, lo que sea.

Si usas un bus in-memory, los eventos se pierden cuando el proceso muere. Si usas RabbitMQ, tienes persistencia y multiples consumers. Pero como conectas un cliente AMQP a un event loop que ya esta sirviendo HTTP?

La interfaz: un bus es un bus

Lo primero es que el dominio no sepa que bus usa. Cohete define una interfaz simple:

interface MessageBus
{
    public function publish(Message $message): void;
    public function subscribe(string $messageName, callable $listener): void;
}

Dos implementaciones:

El switch se hace en bootstrap por variable de entorno:

$useRabbit = (bool)getenv('RABBITMQ_HOST');

if ($useRabbit) {
    $definitions[MessageBus::class] = static fn () => new BunnieMessageBus([
        'host'     => getenv('RABBITMQ_HOST'),
        'user'     => getenv('RABBITMQ_USER') ?: 'guest',
        'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
    ]);
}
// Sin RABBITMQ_HOST: el framework registra ReactMessageBus automaticamente

Sin .env: bus in-memory. Con RABBITMQ_HOST: RabbitMQ. El dominio no cambia.

Bunny 0.6: parece sincrono, es async

Aqui esta la clave. Bunny 0.5 tenia Bunny\Async\Client, un cliente separado que devolvia Promises. En bunny 0.6 lo quitaron. Ahora solo hay Bunny\Client y la magia async la hace ReactPHP por debajo con Fibers.

$client = new Client($options);
$client->connect();              // parece bloqueante...
$channel = $client->channel();   // parece bloqueante...
$channel->exchangeDeclare('cohete_events', 'topic', false, true);

Esto parece PHP clasico. Pero por dentro, Connection usa React\Socket\ConnectionInterface. Cuando llamas a connect():

  1. Abre un socket TCP via ReactPHP (no-bloqueante)
  2. React\Async\await() suspende la Fiber actual
  3. El event loop procesa el handshake AMQP
  4. La Fiber se resume y connect() retorna

Tu codigo no ve nada de esto. Escribes como si fuera sincrono, pero el event loop sigue vivo procesando HTTP requests mientras tanto.

La implementacion: 48 lineas

class BunnieMessageBus implements MessageBus
{
    private const EXCHANGE = 'cohete_events';
    private Channel $channel;

    public function __construct(array $options = [])
    {
        $client = new Client($options);
        $client->connect();
        $this->channel = $client->channel();
        $this->channel->exchangeDeclare(self::EXCHANGE, 'topic', false, true);
    }

    public function publish(Message $message): void
    {
        $payload = json_encode([
            'name' => $message->name,
            'payload' => $message->payload,
        ], JSON_THROW_ON_ERROR);

        $this->channel->publish($payload, [], self::EXCHANGE, $message->name);
    }

    public function subscribe(string $messageName, callable $listener): void
    {
        $ok = $this->channel->queueDeclare('', false, false, true);
        $this->channel->queueBind(self::EXCHANGE, $ok->queue, $messageName);

        $this->channel->consume(
            function (BunnyMessage $msg, Channel $ch) use ($listener) {
                $data = json_decode($msg->content, true, 512, JSON_THROW_ON_ERROR);
                $listener($data['payload']);
                $ch->ack($msg);
            },
            $ok->queue,
        );
    }
}

Vamos por partes.

Constructor: conectar y declarar

$client = new Client($options);  // host, port, user, password, vhost
$client->connect();
$this->channel = $client->channel();
$this->channel->exchangeDeclare(self::EXCHANGE, 'topic', false, true);

Creamos una conexion, abrimos un canal, declaramos un exchange topic (durable). El exchange cohete_events es el punto central donde se publican todos los domain events. Cada subscriber se ata a los routing keys que le interesan.

Publish: fire and forget

$this->channel->publish($payload, [], self::EXCHANGE, $message->name);

Serializa el mensaje como JSON, lo manda al exchange con el nombre del evento como routing key (domain_event.todo_created). No espera confirmacion. El event loop manda los bytes cuando toque.

Subscribe: callback en el event loop

$ok = $this->channel->queueDeclare('', false, false, true);
$this->channel->queueBind(self::EXCHANGE, $ok->queue, $messageName);
$this->channel->consume(callback, $ok->queue);
  1. queueDeclare(''): crea una queue anonima, exclusiva, auto-delete. RabbitMQ le asigna nombre (amq.gen-xxxx).
  2. queueBind: ata la queue al exchange con el routing key del evento que nos interesa.
  3. consume: registra un callback. Cada vez que llega un mensaje por el socket, el event loop lo lee, bunny lo parsea, ejecuta tu callback. No hay polling. El mismo loop que sirve HTTP sirve AMQP.

El flujo completo

POST /todos
    -> CreateTodoController
    -> Todo::create()
       graba TodoCreated event en el aggregate
    -> Repository::save()
       pullDomainEvents() -> MessageBus::publish(TodoCreated)

    Con ReactMessageBus (in-memory):
       EventEmitter::emit() en el proximo futureTick
       -> TodoCreatedSubscriber recibe el evento
       -> Logger: "Todo created"

    Con BunnieMessageBus (RabbitMQ):
       channel->publish() al exchange "cohete_events"
       routing key: "domain_event.todo_created"
       -> RabbitMQ rutea al queue del subscriber
       -> consume() callback -> TodoCreatedSubscriber
       -> Logger: "Todo created"

Mismo resultado. Diferente transporte. El dominio no sabe cual se usa.

Cuando usar cada uno

ReactMessageBus (in-memory)

BunnieMessageBus (RabbitMQ)

Gotcha: queueBind en bunny 0.6

La firma es queueBind($exchange, $queue, $routingKey) – exchange primero. En la mayoria de clientes AMQP es al reves (queue, exchange, routingKey). Si los inviertes, RabbitMQ te dice:

NOT_FOUND - no exchange 'amq.gen-xxxxx' in vhost '/'

Ese amq.gen-xxxxx es el nombre de tu queue anonima que RabbitMQ intenta buscar como exchange. Error confuso si no sabes que los parametros estan invertidos.

Otro gotcha: Bunny ya no existe

Si vienes de bunny 0.5, tenias Bunny\Async\Client con API de Promises. En bunny 0.6 lo eliminaron. Todo el async se maneja internamente con Fibers. Si tu codigo viejo importa Bunny\Async\Client, te dara Class not found. La solucion: usar Bunny\Client directamente, que parece sincrono pero es async por debajo.

Configuracion completa

.env.example:

# MySQL (persistent storage)
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_USER=cohete
MYSQL_PASSWORD=cohete
MYSQL_DATABASE=cohete_skeleton

# RabbitMQ (distributed message bus)
# RABBITMQ_HOST=127.0.0.1
# RABBITMQ_PORT=5672
# RABBITMQ_USER=guest
# RABBITMQ_PASSWORD=guest

docker-compose.yml incluye MySQL + RabbitMQ con health checks. Un cp .env.example .env && docker compose up -d y tienes todo corriendo.

El codigo

Todo esta en cohete/skeleton:

48 lineas de implementacion. Infraestructura intercambiable. Dominio intacto. Asi deberia ser siempre.

Comparte este post:

Es tu post

Estas seguro? Esto no se puede deshacer.

Comentarios (0)

Sin comentarios todavia. Se el primero!

Deja un comentario