, (*1)
Installation
composer require xervice/rabbitmq
Configuration
First you have to define exchanges. To provide your exchanges to RabbitMQ you create an ExchangeProvider., (*2)
<?php
namespace App\MyModule\Business\Model\Exchange;
use DataProvider\RabbitMqExchangeDataProvider;
use Xervice\RabbitMQ\Business\Dependency\Exchange\ExchangeInterface;
use Xervice\RabbitMQ\Business\Model\Core\ExchangeProviderInterface;
class MyExchange implements ExchangeInterface
{
/**
* @param \Xervice\RabbitMQ\Core\ExchangeProviderInterface $exchangeProvider
*/
public function declareExchange(ExchangeProviderInterface $exchangeProvider)
{
$myExchange = new RabbitMqExchangeDataProvider();
$myExchange
->setName('MyExchangeName')
->setType('direct')
->setAutoDelete(false);
$exchangeProvider->declare($myExchange);
}
}
Also you have to define and provide queues to RabbitMQ. A queue have to be binded to an exchange. s
Defining a queue and binding a queue is possible with an QueueProvider., (*3)
<?php
namespace App\MyModule\Business\Model\Queue;
use DataProvider\RabbitMqExchangeDataProvider;
use DataProvider\RabbitMqQueueBindDataProvider;
use DataProvider\RabbitMqQueueDataProvider;
use Xervice\RabbitMQ\Business\Dependency\Queue\QueueInterface;
use Xervice\RabbitMQ\Business\Model\Core\QueueProviderInterface;
class MyQueue implements QueueInterface
{
public function declareQueue(QueueProviderInterface $queueProvider)
{
$queue = new RabbitMqQueueDataProvider();
$queue
->setName('MyQueueName')
->setAutoDelete(false)
->setArgument([]);
$exchangeToBind = new RabbitMqExchangeDataProvider();
$exchangeToBind->setName('MyExchangeName');
$bind = new RabbitMqQueueBindDataProvider();
$bind
->setExchange($exchangeToBind)
->setQueue($queue);
$queueProvider->declare($queue);
$queueProvider->bind($bind);
}
}
If you start a worker, it will loop all defined Listener and provide queue messages to themn., (*4)
Listener example, (*5)
<?php
namespace App\MyModule\Business\Model\Listener;
use DataProvider\RabbitMqMessageCollectionDataProvider;
use PhpAmqpLib\Channel\AMQPChannel;
use Xervice\RabbitMQ\Business\Model\Worker\Listener\AbstractListener;
class MyListener extends AbstractListener
{
public function handleMessage(
RabbitMqMessageCollectionDataProvider $collectionDataProvider,
AMQPChannel $channel
): void {
foreach ($collectionDataProvider->getMessages() as $message) {
echo $message->getMessage()->getMessage();
$this->sendAck($channel, $message);
}
}
/**
* @return int
*/
public function getChunkSize(): int
{
return 500;
}
/**
* @return string
*/
public function getQueueName(): string
{
return 'MyQueueName';
}
}
The ChunkSize define, how many messages the worker want to get from RabbitMQ in one Worker instance., (*6)
To define the exchange, queue and listener, you can register them in the RabbitMQDependencyProvider:, (*7)
<?php
namespace App\RabbitMQ;
use Xervice\RabbitMQ\RabbitMQDependencyProvider as XerviceRabbitMQDependencyProvider;
use XerviceTest\RabbitMQ\Exchange\TestExchange;
use XerviceTest\RabbitMQ\Listener\TestListener;
use XerviceTest\RabbitMQ\Queue\TestQueue;
class RabbitMQDependencyProvider extends XerviceRabbitMQDependencyProvider
{
/**
* @return \Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface[]
*/
protected function getListener(): array
{
return [
new MyListener()
];
}
/**
* @return \Xervice\RabbitMQ\Business\Dependency\Queue\QueueInterface[]
*/
protected function getQueues(): array
{
return [
new MyQueue()
];
}
/**
* @return \Xervice\RabbitMQ\Business\Dependency\Exchange\ExchangeInterface[]
*/
protected function getExchanges(): array
{
return [
new MyExchange()
];
}
}
To use RabbitMQ in your application, you can add the \Xervice\RabbitMQ\Communication\Plugin\RabbitMqService plugin to your kernel stack., (*8)
Usage
Initialize RabbitMQ, (*9)
$rabbitMQFacade->init();
Sending a message to rabbitmq, (*10)
$exchange = new RabbitMqExchangeDataProvider();
$exchange->setName('UnitTest');
$simpleMessage = new SimpleMessageDataProvider();
$simpleMessage->setMessage('TestMessage');
$testMessage = new RabbitMqMessageDataProvider();
$testMessage
->setExchange($exchange)
->setMessage($simpleMessage);
$rabbitMQFacade->sendMessage($message);
Run the worker, (*11)
$rabbitMQFacade->runWorker();