RabbitMQ async en PHP: como Cohete maneja mensajes sin bloquear
PHP y mensajeria asincrona no suelen ir en la misma frase. Pero con ReactPHP y bunny 0.6, funciona. Y funciona bien.
Este post explica como hemos integrado RabbitMQ en el skeleton de Cohete para que HTTP y AMQP convivan en el mismo proceso, en el mismo event loop, sin bloquear nada.
El problema
Tienes un servidor HTTP async con ReactPHP. Cuando un usuario crea un
recurso, quieres publicar un domain event (por ejemplo, TodoCreated). Ese evento puede disparar
notificaciones, indexar en busqueda, actualizar caches, lo que sea.
Si usas un bus in-memory, los eventos se pierden cuando el proceso muere. Si usas RabbitMQ, tienes persistencia y multiples consumers. Pero como conectas un cliente AMQP a un event loop que ya esta sirviendo HTTP?
La interfaz: un bus es un bus
Lo primero es que el dominio no sepa que bus usa. Cohete define una interfaz simple:
interface MessageBus
{
public function publish(Message $message): void;
public function subscribe(string $messageName, callable $listener): void;
}Dos implementaciones:
ReactMessageBus(framework): EventEmitter in-memory. Los eventos viajan dentro del mismo proceso viafutureTick(). Zero deps.BunnieMessageBus(skeleton): RabbitMQ via bunny/bunny. Los eventos viajan por AMQP. Persistentes, distribuidos.
El switch se hace en bootstrap por variable de entorno:
$useRabbit = (bool)getenv('RABBITMQ_HOST');
if ($useRabbit) {
$definitions[MessageBus::class] = static fn () => new BunnieMessageBus([
'host' => getenv('RABBITMQ_HOST'),
'user' => getenv('RABBITMQ_USER') ?: 'guest',
'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
]);
}
// Sin RABBITMQ_HOST: el framework registra ReactMessageBus automaticamenteSin .env: bus in-memory. Con RABBITMQ_HOST: RabbitMQ. El dominio no
cambia.
Bunny 0.6: parece sincrono, es async
Aqui esta la clave. Bunny 0.5 tenia Bunny\Async\Client, un cliente separado que
devolvia Promises. En bunny 0.6 lo quitaron. Ahora solo hay Bunny\Client y la magia async la hace ReactPHP
por debajo con Fibers.
$client = new Client($options);
$client->connect(); // parece bloqueante...
$channel = $client->channel(); // parece bloqueante...
$channel->exchangeDeclare('cohete_events', 'topic', false, true);Esto parece PHP clasico. Pero por dentro, Connection usa React\Socket\ConnectionInterface. Cuando llamas
a connect():
- Abre un socket TCP via ReactPHP (no-bloqueante)
React\Async\await()suspende la Fiber actual- El event loop procesa el handshake AMQP
- La Fiber se resume y
connect()retorna
Tu codigo no ve nada de esto. Escribes como si fuera sincrono, pero el event loop sigue vivo procesando HTTP requests mientras tanto.
La implementacion: 48 lineas
class BunnieMessageBus implements MessageBus
{
private const EXCHANGE = 'cohete_events';
private Channel $channel;
public function __construct(array $options = [])
{
$client = new Client($options);
$client->connect();
$this->channel = $client->channel();
$this->channel->exchangeDeclare(self::EXCHANGE, 'topic', false, true);
}
public function publish(Message $message): void
{
$payload = json_encode([
'name' => $message->name,
'payload' => $message->payload,
], JSON_THROW_ON_ERROR);
$this->channel->publish($payload, [], self::EXCHANGE, $message->name);
}
public function subscribe(string $messageName, callable $listener): void
{
$ok = $this->channel->queueDeclare('', false, false, true);
$this->channel->queueBind(self::EXCHANGE, $ok->queue, $messageName);
$this->channel->consume(
function (BunnyMessage $msg, Channel $ch) use ($listener) {
$data = json_decode($msg->content, true, 512, JSON_THROW_ON_ERROR);
$listener($data['payload']);
$ch->ack($msg);
},
$ok->queue,
);
}
}Vamos por partes.
Constructor: conectar y declarar
$client = new Client($options); // host, port, user, password, vhost
$client->connect();
$this->channel = $client->channel();
$this->channel->exchangeDeclare(self::EXCHANGE, 'topic', false, true);Creamos una conexion, abrimos un canal, declaramos un exchange topic (durable). El exchange cohete_events es el punto central donde se
publican todos los domain events. Cada subscriber se ata a los routing
keys que le interesan.
Publish: fire and forget
$this->channel->publish($payload, [], self::EXCHANGE, $message->name);Serializa el mensaje como JSON, lo manda al exchange con el nombre
del evento como routing key (domain_event.todo_created). No espera
confirmacion. El event loop manda los bytes cuando toque.
Subscribe: callback en el event loop
$ok = $this->channel->queueDeclare('', false, false, true);
$this->channel->queueBind(self::EXCHANGE, $ok->queue, $messageName);
$this->channel->consume(callback, $ok->queue);queueDeclare(''): crea una queue anonima, exclusiva, auto-delete. RabbitMQ le asigna nombre (amq.gen-xxxx).queueBind: ata la queue al exchange con el routing key del evento que nos interesa.consume: registra un callback. Cada vez que llega un mensaje por el socket, el event loop lo lee, bunny lo parsea, ejecuta tu callback. No hay polling. El mismo loop que sirve HTTP sirve AMQP.
El flujo completo
POST /todos
-> CreateTodoController
-> Todo::create()
graba TodoCreated event en el aggregate
-> Repository::save()
pullDomainEvents() -> MessageBus::publish(TodoCreated)
Con ReactMessageBus (in-memory):
EventEmitter::emit() en el proximo futureTick
-> TodoCreatedSubscriber recibe el evento
-> Logger: "Todo created"
Con BunnieMessageBus (RabbitMQ):
channel->publish() al exchange "cohete_events"
routing key: "domain_event.todo_created"
-> RabbitMQ rutea al queue del subscriber
-> consume() callback -> TodoCreatedSubscriber
-> Logger: "Todo created"
Mismo resultado. Diferente transporte. El dominio no sabe cual se usa.
Cuando usar cada uno
ReactMessageBus (in-memory)
- Desarrollo local
- Apps simples donde un solo proceso basta
- Prototipado rapido (zero deps)
- Cuando perder eventos no es critico
BunnieMessageBus (RabbitMQ)
- Produccion con multiples consumers
- Eventos que no puedes perder
- Procesamiento distribuido (N workers escuchando la misma cola)
- Cuando necesitas desacoplar servicios
Gotcha: queueBind en bunny 0.6
La firma es queueBind($exchange, $queue, $routingKey) –
exchange primero. En la mayoria de clientes AMQP es al reves (queue, exchange, routingKey). Si los inviertes,
RabbitMQ te dice:
NOT_FOUND - no exchange 'amq.gen-xxxxx' in vhost '/'
Ese amq.gen-xxxxx es el nombre de tu
queue anonima que RabbitMQ intenta buscar como exchange. Error confuso
si no sabes que los parametros estan invertidos.
Otro gotcha: Bunny ya no existe
Si vienes de bunny 0.5, tenias Bunny\Async\Client con API de Promises. En bunny
0.6 lo eliminaron. Todo el async se maneja internamente con Fibers. Si
tu codigo viejo importa Bunny\Async\Client, te dara Class not found. La solucion: usar Bunny\Client directamente, que parece sincrono
pero es async por debajo.
Configuracion completa
.env.example:
# MySQL (persistent storage)
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_USER=cohete
MYSQL_PASSWORD=cohete
MYSQL_DATABASE=cohete_skeleton
# RabbitMQ (distributed message bus)
# RABBITMQ_HOST=127.0.0.1
# RABBITMQ_PORT=5672
# RABBITMQ_USER=guest
# RABBITMQ_PASSWORD=guestdocker-compose.yml incluye MySQL +
RabbitMQ con health checks. Un cp .env.example .env && docker compose up -d
y tienes todo corriendo.
El codigo
Todo esta en cohete/skeleton:
src/Bus/BunnieMessageBus.php- la implementacion RabbitMQsrc/bootstrap.php- el switch por env varssrc/Subscriber/TodoCreatedSubscriber.php- ejemplo de subscriber
48 lineas de implementacion. Infraestructura intercambiable. Dominio intacto. Asi deberia ser siempre.
Comentarios (0)
Sin comentarios todavia. Se el primero!
Deja un comentario