ReactAMQP
, (*1)
Basic AMQP bindings for React PHP., (*2)
Install
This library requires PHP >=7.1 and the PECL AMQP extension. The best way to install this library is through composer., (*3)
Please, checkout 1.x version for PHP-5.x., (*4)
composer require gos/react-amqp
Usage
This library provides two classes, an AMQP Consumer and Producer. Both classes work with a periodic timer and you supply the timer interval as an argument to the constructor., (*5)
Consumer
The consumer class allows you to receive messages from an AMQP broker and to dispatch a callback whenever one is received. You can also supply a number of messages to consume in one go, making sure that your event loop isn't perpetually stuck consuming messages from a broker. The callback you supply must accept an AMQPEnvelope as the first argument and an optional AMQPQueue as the second., (*6)
<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();
// Create a channel
$ch = new AMQPChannel($cnn);
// Create a new queue
$queue = new AMQPQueue($ch);
$queue->setName('queue1');
$queue->declare();
// Create an event loop
$loop = React\EventLoop\Factory::create();
// Create a consumer that will check for messages every half a second and consume up to 10 at a time.
$consumer = new Gos\Component\ReactAMQP\Consumer($queue, $loop, 0.5, 10);
$consumer->on('consume', function(AMQPEnvelope $envelope, AMQPQueue $queue){
//Process the message here
});
$loop->run();
Producer
The producer class allows you to send messages to an AMQP exchange. The producer has a publish method that has exactly the same method signature as the AMQPExchange's publish method. Messages are stored in the producer class and sent based on the timer interval passed to the constructor. When the producer object is invoked to send any queued messages the AMQPExchange objects publish method is used. This method is blocking, which may be a performance concern for your application. When a message is successfully sent a 'produce' event is emitted that you can bind a callback to. This is passed an array containing all of the message parameters sent. If an AMQPExchangeException is thrown, meaning the message could not be sent, an 'error' event is emitted that you can bind a callback to. This will be passed the AMQPExchangeException object for you to handle., (*7)
<?php
// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();
// Create a channel
$ch = new AMQPChannel($cnn);
// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('exchange1');
$ex->declare();
// Create an event loop
$loop = React\EventLoop\Factory::create();
// Create a producer that will send any waiting messages every half a second.
$producer = new Gos\Component\ReactAMQP\Producer($ex, $loop, 0.5);
// Add a callback that's called every time a message is successfully sent.
$producer->on('produce', function(array $message) {
// $message is an array containing keys 'message', 'routingKey', 'flags' and 'attributes'
});
$producer->on('error', function(AMQPExchangeException $e) {
// Handle any exceptions here.
});
$i = 0;
$loop->addPeriodicTimer(1, function() use(&$i, $producer) {
$i++;
echo "Sending $i\n";
$producer->publish($i, 'routing.key');
});
$loop->run();