ORO Message Queue Component
Note: This article is published in the Oro documentation library., (*1)
The component incorporates a message queue in your application via different transports. It contains several layers., (*2)
The lowest layer is called Transport and provides an abstraction of transport protocol. The Consumption layer provides tools to consume messages, such as cli command, signal handling, logging, extensions. It works on top of transport layer., (*3)
The Client layer provides an ability to start producing/consuming messages with as little configuration as possible., (*4)
Message queues provide an asynchronous communications protocol, meaning that the sender and the receiver of the message do not need to interact with the message queue at the same time. Messages placed onto the queue are stored until the recipient retrieves them. A message does not have information about previous and next messages., (*5)
OroMessageQueue uses Publish/subscribe messaging. It means that the sending application publishes (sends) a message with a specific topic and a consumer finds a subscriber(s) for the topic. Publish/subscribe messaging allows decoupling of the information provider from the consumers of that information. The sending application and the receiving application do not need to know anything about each other for the information to be sent and received., (*6)
MessagePriority::VERY_LOW
, MessagePriority::LOW
, MessagePriority::NORMAL
, MessagePriority::HIGH
, MessagePriority::VERY_HIGH
. Recognizing priority is simple: there are five queues, one queue per priority. Consumers process messages from the VERY_HIGH queue. If there are no messages in the VERY_HIGH queue, consumers process messages from the HIGH queue, etc. Consequently, if all other queues are empty, consumer processes messages from the VERY_LOW queue.Message Processors are classes that process queue messages. They implement MessageProcessorInterface
. In addition, they usually subscribe to the specific topics and implement TopicSubscriberInterface
., (*7)
A process(MessageInterface $message, SessionInterface $session)
method describes the actions that should be performed when
a message is received. It can perform the actions directly or create a job. It can also produce a new message to run another
processor asynchronously., (*8)
The received message can be processed, rejected, and re-queued. An exception can also be thrown., (*9)
Message Processor will return self::ACK
in the following cases:, (*10)
true
.It means that the message was processed successfully and is removed from the queue., (*11)
Message Processor will return self::REJECT
in the following cases:, (*12)
false
. It means that the message was not processed and is removed from the queue because it is unprocessable and will never become processable (e.g. a required parameter is missing or another permanent error appears)., (*13)
There could be two options:, (*14)
If a message cannot be processed temporarily, for example, in case of connection timeout due server overload, the process
method should return self::REQUEUE
. The message will be returned to the queue again and will be processed later.
This will also happen if an exception is thrown during processing or job running., (*15)
The workflow of re-queuing messages (processor returns self::REQUEUE
) is the following:, (*16)
process
method of the message processor).process
method returns self::REQUEUE
.redelivery
flag to true. DelayRedeliveredMessageExtension
works and sets a delay for the requeued message.The workflow of re-queuing messages when an exception is thrown inside a message processor is slightly different:, (*17)
process
method of the message processor).process
method.redelivery
flag to true. Then the consumer fails with the exception.DelayRedeliveredMessageExtension
works and sets a delay for the failing message.A processor receives a message with the entity id. It finds the entity and changes its status without creating any job., (*18)
/** * {@inheritdoc} */ public function process(MessageInterface $message, SessionInterface $session) { $body = JSON::decode($message->getBody()); if (! isset($body['id'])) { $this->logger->critical( sprintf('Got invalid message, id is empty: "%s"', $message->getBody()), ['message' => $message] ); return self::REJECT; } $em = $this->getEntityManager(); $repository = $em->getRepository(SomeEntity::class); $entity = $repository->find($body['id']); if(! $entity) { $this->logger->error( sprintf('Cannot find an entity with id: "%s"', $body['id']), ['message' => $message] ); return self::REJECT; } $entity->setStatus('success'); $em->persist($entity); $em->flush(); return self::ACK; }
Overall, there can be three cases:, (*19)
A message processor can be implemented with or without creating jobs., (*20)
There is no ideal criteria to help decide whether a job should be created or not. A developer should decide each time which approach is better in this case., (*21)
Here are a few recommendations:, (*22)
Jobs are usually run with JobRunner., (*23)
JobRunner creates and runs a job. Usually one of the following methods is used:, (*24)
public function runUnique($ownerId, $name, \Closure $runCallback)
, (*25)
Runs the $runCallback
. It does not allow another job with the same name to be run at the same time., (*26)
public function createDelayed($name, \Closure $startCallback)
, (*27)
A sub-job which runs asynchronously (sending its own message). It can only run inside another job., (*28)
public function runDelayed($jobId, \Closure $runCallback)
, (*29)
This method is used inside a processor for a message which was sent with createDelayed., (*30)
The $runCallback
closure usually returns true
or false
, the job status depends on the returned value.
See Jobs statuses section for the details., (*31)
To reuse existing processor logic in scope of job it may be decorated with DelayedJobRunnerDecoratingProcessor
which will execute
runDelayed, pass control to given processor and then handle result in format applicable for runDelayed
, (*32)
Use a dependent job when your job flow has several steps but you want to send a new message when all steps are finished., (*33)
In the example below, a root job is created. As soon as its work is completed, it sends two messages with the 'topic1' and 'topic2' topics to the queue., (*34)
class MessageProcessor implements MessageProcessorInterface { /** * @var JobRunner */ private $jobRunner; /** * @var DependentJobService */ private $dependentJob; public function process(MessageInterface $message, SessionInterface $session) { $data = JSON::decode($message->getBody()); $result = $this->jobRunner->runUnique( $message->getMessageId(), 'oro:index:reindex', function (JobRunner $runner, Job $job) use ($data) { // register two dependent jobs // next messages will be sent to queue when that job and all children are finished $context = $this->dependentJob->createDependentJobContext($job->getRootJob()); $context->addDependentJob('topic1', 'message1'); $context->addDependentJob('topic2', 'message2', MessagePriority::VERY_HIGH); $this->dependentJob->saveDependentJob($context); // some work to do return true; // if you want to ACK message or false to REJECT } ); return $result ? self::ACK : self::REJECT; } }
The dependant jobs can be added only to root jobs (i.e. to the jobs created with runUnique
, not runDelayed
)., (*35)
A two-level job hierarchy is created for the process where:, (*36)
A root job cannot have a root job of its own., (*37)
If we use just runUnique
, then a parent and a child jobs with the same name are created., (*38)
runUnique
and createDelayed
inside it, then a parent and a child job for runUnique
are created. Then each run of createDelayed
adds another child for the runUnique parent.runUnique
is called without creating any child jobs:
Job::STATUS_RUNNING
status, the job startedAt
field is set to the current time.true
, the job status is changed to Job::STATUS_SUCCESS
, the job stoppedAt
field is changed to the current time.false
or throws an exception, the job status is changed to Job::STATUS_FAILED
, the job stoppedAt
field is changed to the current time.Job::STATUS_CANCELLED
status, the job stoppedAt
field is changed to the current time.runUnique
is called which creates child jobs with createDelayed
:
Job::STATUS_RUNNING
status, the job startedAt
field is set to the current time.createDelayed
is called, the child jobs are created and get the Job::STATUS_NEW
statuses. The messages for the jobs are sent to the message queue.runDelayed
is called, the closure runs and the child jobs get Job::STATUS_RUNNING
status.true
, the child job status is changed to Job::STATUS_SUCCESS
, the job stoppedAt
field is changed to the current time.false
or throws an exception, the child job status is changed to Job::STATUS_FAILED
, the job stoppedAt
field is changed to the current time.Job::STATUS_CANCELLED
status, the job stoppedAt
field is changed to the current time.Job::STATUS_CANCELLED
statuses.true
, the process method which runs this job should return self::ACK
. If a job closure returns false
, the process method which runs this job should return self::REJECT
.It is not possible to create two unique jobs with the same name. That's why if one unique job is not able to finish its work, it can block another job. To handle such a situation you can use Stale Jobs functionality., (*39)
By default JobProcessor uses NullJobConfigurationProvider, so unique job will never be "stale". If you want to change that behaviour you need to create your own provider that implements JobConfigurationProviderInterface., (*40)
Method JobConfigurationProvider::getTimeBeforeStaleForJobName($jobName); should return number of seconds, after which job will be considered as "stale". If you don't want job to be staled, return null or -1., (*41)
In example below all jobs will be treated as "stale" after one hour., (*42)
<?php use Oro\Component\MessageQueue\Provider\JobConfigurationProviderInterface; class JobConfigurationProvider implements JobConfigurationProviderInterface { /** * {@inheritdoc} */ public function getTimeBeforeStaleForJobName($jobName) { return 3600; } } $jobProcessor = new JobProcessor(/* arguments */); $jobProcessor->setJobConfigurationProvider(new JobConfigurationProvider());
In that case if second unique job with the same name is created, and previous job hasn't been updated for more than one hour, and it hasn't not started child, it get Job::STATUS_STALE status, and new job is created., (*43)
Additionally, if processor tries to finish "staled" job, the job will be removed., (*44)
Usually the message flow looks the following way:, (*45)
, (*46)
However, if there are more than one processor subscribed to the same topic, the flow becomes more complicated. The client's message producer sends a message to a router message processor. It takes the message and searches for real recipients that are interested in such message. Then it sends a copy of the message to all of them. Each target message processor takes its copy of the message and processes it., (*47)
Let us imagine that we want to run two processes in parallel. In this case, we can create a Processor B with the first process and Processor C with the second process. We can then create Processor A, inject Message Producer into it, and send messages to Processor B and Processor C. The messages are put to the queue and when their turn comes, the consumers run processes B and C. That could be done in parallel., (*48)
, (*49)
Code example:, (*50)
public function process(MessageInterface $message, SessionInterface $session) { $data = JSON::decode($message->getBody()); if ({$message is invalid}) { $this->logger->critical( sprintf('Got invalid message: "%s"', $message->getBody()), ['message' => $message] ); return self::REJECT; } foreach ($data['ids'] as $id) { $this->producer->send(Topics::DO_SOMETHING_WITH_ENTITY, [ 'id' => $id, 'targetClass' => $data['targetClass'], 'targetId' => $data['targetId'], ]); } $this->logger->info(sprintf( 'Sent "%s" messages', count($data['ids']) )); return self::ACK; }
The processor in the example accepts an array of some entity ids and sends a message Topics:DO_SOMETHING_WITH_ENTITY
to each id. The messages are put to the message queue and will be processed when their turn comes. It could be done in parallel if several consumers are running., (*51)
The approach is simple and works perfectly well, although it has a few flaws., (*52)
This way of running parallel jobs is more appropriate than the previous one, although it is slightly more complicated. It is, however, the preferred way for the parallel processes implementation., (*53)
The task is the same as the previous one. We want to run two processes in parallel. We are also creating processors A, B and C but they are slightly different., (*54)
We inject JobRunner to Processor A. Inside the process
method, it runs runUnique
method. In the closure
of the runUnique
, it runs createDelayed
method for Processor B and for Processor C passing jobId
param to its closure.
Inside the closures of createDelayed
, the messages for Processor B and Processor C are created and sent.
We should also add the jobId
params to the message bodies except for the required params., (*55)
Processors B and C are also slightly different. Their process methods call runDelayed
method passing the received
jobId
param., (*56)
The benefits are the following:, (*57)
runUnique
method in Processor A, a new instance of it cannot run until the previous instance completes all the jobs., (*58)
The processor subscribes to Topics::DO_BIG_JOB
and runs a unique big job (the name of the job is Topics::DO_BIG_JOB - the same as the topic name so it will not be possible to run another big job at the same time)
The processor creates a set of delayed jobs, each of them sends Topics::DO_SMALL_JOB
message., (*59)
/** * {@inheritdoc} */ public function process(MessageInterface $message, SessionInterface $session) { $bigJobParts = JSON::decode($message->getBody()); $result = $this->jobRunner->runUnique( //a root job is creating here $message->getMessageId(), Topics::DO_BIG_JOB, function (JobRunner $jobRunner) use ($bigJobParts) { foreach ($bigJobParts as $smallJob) { $jobRunner->createDelayed( // child jobs are creating here and get new status sprintf('%s:%s', Topics::DO_SMALL_JOB, $smallJob), function (JobRunner $jobRunner, Job $child) use ($smallJob) { $this->producer->send(Topics::DO_SMALL_JOB, [ // messages for child jobs are sent here 'smallJob' => $smallJob, 'jobId' => $child->getId(), // the created child jobs ids are passing as message body params ]); } ); } return true; } ); return $result ? self::ACK : self::REJECT; }
The processor subscribes to the Topics::DO_SMALL_JOB
and runs the created delayed job., (*60)
/** * {@inheritdoc} */ public function process(MessageInterface $message, SessionInterface $session) { $payload = JSON::decode($message->getBody()); $result = $this->jobRunner->runDelayed($payload['jobId'], function (JobRunner $jobRunner) use ($payload) { //the child job status with the id $payload['jobId'] is changed from new to running $smallJobData = $payload['smallJob']; if (! $this->checkDataValidity($smallJobData))) { $this->logger->error( sprintf('Invalid data received: "%s"', $smallJobData), ['message' => $payload] ); return false; //the child job status with the id $payload['jobId'] is changed from running to failed } return true; //the child job status with the id $payload['jobId'] is changed from running to success }); return $result ? self::ACK : self::REJECT; }
A root job is created for the big job and a set of its child jobs is created for the small jobs., (*61)
class MessageProcessor implements MessageProcessorInterface { /** * @var JobRunner */ private $jobRunner; public function process(MessageInterface $message, SessionInterface $session) { $data = JSON::decode($message->getBody()); $result = $this->jobRunner->runUnique( $message->getMessageId(), 'oro:index:reindex', function (JobRunner $runner, Job $job) use ($data) { // do your job return true; // if you want to ACK message or false to REJECT } ); return $result ? self::ACK : self::REJECT; } }
class Step1MessageProcessor implements MessageProcessorInterface { /** * @var JobRunner */ private $jobRunner; /** * @var MessageProducerInterface */ private $producer; public function process(MessageInterface $message, SessionInterface $session) { $data = JSON::decode($message->getBody()); $result = $this->jobRunner->runUnique( $message->getMessageId(), 'oro:index:reindex', function (JobRunner $runner, Job $job) use ($data) { // for example first step generates tasks for step two foreach ($entities as $entity) { // every job name must be unique $jobName = 'oro:index:index-single-entity:' . $entity->getId(); $runner->createDelayed( $jobName, function (JobRunner $runner, Job $childJob) use ($entity) { $this->producer->send('oro:index:index-single-entity', [ 'entityId' => $entity->getId(), 'jobId' => $childJob->getId(), ]) }); } return true; // if you want to ACK message or false to REJECT } ); return $result ? self::ACK : self::REJECT; } } class Step2MessageProcessor implements MessageProcessorInterface { /** * @var JobRunner */ private $jobRunner; public function process(MessageInterface $message, SessionInterface $session) { $data = JSON::decode($message->getBody()); $result = $this->jobRunner->runDelayed( $data['jobId'], function (JobRunner $runner, Job $job) use ($data) { // do your job return true; // if you want to ACK message or false to REJECT } ); return $result ? self::ACK : self::REJECT; } }
The following is an example of a message producing using only a transport layer:, (*62)
<?php use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection; use Doctrine\DBAL\Configuration; use Doctrine\DBAL\DriverManager; $doctrineConnection = DriverManager::getConnection( ['url' => 'mysql://user:secret@localhost/mydb'], new Configuration ); $connection = new DbalConnection($doctrineConnection, 'oro_message_queue'); $session = $connection->createSession(); $queue = $session->createQueue('aQueue'); $message = $session->createMessage('Something has happened'); $session->createProducer()->send($queue, $message); $session->close(); $connection->close();
The following is an example of a message consuming using only a transport layer:, (*63)
use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection; use Doctrine\DBAL\Configuration; use Doctrine\DBAL\DriverManager; $doctrineConnection = DriverManager::getConnection( ['url' => 'mysql://user:secret@localhost/mydb'], new Configuration ); $connection = new DbalConnection($doctrineConnection, 'oro_message_queue'); $session = $connection->createSession(); $queue = $session->createQueue('aQueue'); $consumer = $session->createConsumer($queue); while (true) { if ($message = $consumer->receive()) { echo $message->getBody(); $consumer->acknowledge($message); } } $session->close(); $connection->close();
The following is an example of a message consuming using consumption layer:, (*64)
<?php use Oro\Component\MessageQueue\Consumption\MessageProcessor; class FooMessageProcessor implements MessageProcessor { public function process(Message $message, Session $session) { echo $message->getBody(); return self::ACK; } }
<?php use Doctrine\DBAL\Configuration; use Doctrine\DBAL\DriverManager; use Oro\Component\MessageQueue\Consumption\ChainExtension; use Oro\Component\MessageQueue\Consumption\QueueConsumer; use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection; $doctrineConnection = DriverManager::getConnection( ['url' => 'mysql://user:secret@localhost/mydb'], new Configuration ); $connection = new DbalConnection($doctrineConnection, 'oro_message_queue'); $queueConsumer = new QueueConsumer($connection, new ChainExtension([])); $queueConsumer->bind('aQueue', new FooMessageProcessor()); try { $queueConsumer->consume(); } finally { $queueConsumer->getConnection()->close(); }