pmg/queue-tactician
This is a middleware for Tactician to
integrate it with pmg/queue., (*1)
Installation and Usage
Install with composer., (*2)
composer require pmg/queue-tactician
To use it, add the middleware to your middleware chain sometime before the
default command handler middleware., (*3)
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Producer;
use PMG\Queue\Tactician\QueueingMiddleware;
/** @var Producer */
$producer = createAQueueProducerSomehow();
$bus = new CommandBus([
new QueueingMiddleware($producer),
new CommandHandlerMiddleware(/*...*/),
]);
Enqueueing Commands
Any command that implements PMG\Queue\Message
will be put into the queue via
the producer and no further middlewares will be called., (*4)
use PMG\Queue\Message;
final class DoLongRunningStuff implements Message
{
/**
* {@inheritdoc}
*/
public function getName()
{
return 'LongRunningStuff';
}
}
// goes right into the queue
$bus->handle(new DoLongRunningStuff());
Dequeueing (Consuming) Commands
To use tactician to process the messages via the consumer, use
PMG\Queue\Handler\TacticianHandler
., (*5)
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TacticianHandler;
/** @var League\Tactician\CommandBus $bus */
$handler = new TacticianHandler($bus);
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();
The above assumes that the CommandBus
instance still has the
QueueingMiddleware
installed. If not, you'll need to use your own handler that
invokes the command bus, perhaps via CallableHandler
., (*6)
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
// no QueueingMiddleware!
$differentBus = new CommandBus([
new CommandHandlerMiddleware(/*...*/),
]);
$handler = new CallableHandler([$bus, 'handle']);
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();
Beware of Wrapping This Handler with PcntlForkingHandler
The shared instance of the command bus means that it's very likely that things
like open database connections will cause issues if/when a child press is forked
to handle messages., (*7)
Instead a better bet is to create a new command bus for each message.
CreatingTacticianHandler
can do that for you., (*8)
use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
use PMG\Queue\Tactician\QueuedCommand;
use PMG\Queue\Tactician\QueueingMiddleware;
use PMG\Queue\Handler\CreatingTacticianHandler;
$handler = new CreatingTacticianHandler(function () {
// this is invoked for every message
return new CommandBus([
new QueueingMiddleware(createAProduerSomehow()),
new CommandHandlerMiddlware(/* ... */)
]);
});
/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);
$consumer->run();