dev-master
9999999-devMessage Queue Manager for symfony2.
MIT
The Requires
- php >=5.3.2
The Development Requires
by David Tee
queue message queue
Message Queue Manager for symfony2.
Allow symfony developers to create background job as easily as:
$worker->later()->process(1,2,3)
, (*2)
See changes, (*3)
Upgrading from 5.0: see UPGRADING-6.0.md, (*4)
, (*5)
This bundle provides a way to easily create and manage queued background jobs, (*6)
Basic Features:, (*7)
Job-specific Features: - Auto-retry on failure, exception * If a job exits with a failure code, it can auto-retry * Same for Exception if desired - Priority * Jobs can have levels of priority so that higher priority jobs can get processed first even if they were added * to the queue later. - Future Jobs (ODM / ORM / Redis) * Jobs can be scheduled to run at some time in the future - Batch * Jobs can be "batched" so that only one job runs, even if multiple are queued of the same type - Expires * Jobs can have an "expires" time so that they wont run after a certain point * (useful if the queue gets backed up and a job is worthless after a certain time) - Stalls (ODM / ORM) * Jobs that crash the interpreter, or get terminated for some other reason can be detected * These can be re-queued to run in the future., (*8)
see /Resources/doc/symfony2-3.md, (*9)
see /Resources/doc/symfony4-5.md, (*10)
see /Resources/doc/troubleshooting.md, (*11)
Create a worker class that will work on the background job., (*12)
Example: * __src/Worker/Fibonacci.php:__ (symfony 4/5) * __src/AppBundle/Worker/Fibonacci.php:__ (symfony 2/3), (*13)
<?php namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker class Fibonacci extends \Dtc\QueueBundle\Model\Worker { private $filename; public function __construct() { $this->filename = '/tmp/fib-result.txt'; } public function fibonacciFile($n) { $fib = $this->fibonacci($n); file_put_contents($this->filename, "{$n}: {$fib}"); } public function fibonacci($n) { if($n == 0) return 0; //F0 elseif ($n == 1) return 1; //F1 else return $this->fibonacci($n - 1) + $this->fibonacci($n - 2); } public function getName() { return 'fibonacci'; } public function getFilename() { return $this->filename; } }
Create a DI service for the job, and tag it as a background worker., (*14)
Symfony 5, 4 and 3.3, 3.4:, (*15)
services: # for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker App\Worker\Fibonacci: # public: false is possible if you completely use DependencyInjection for access to the service public: true tags: - { name: "dtc_queue.worker" }
Symfony 2, and 3.0, 3.1, 3.2:, (*16)
services: app.worker.fibonacci: class: AppBundle\Worker\Fibonacci: tags: - { name: "dtc_queue.worker" }
<services> <!-- ... --> <service id="fibonacci" class="Fibonacci"> <tag name="dtc_queue.worker" /> </service> <!-- ... --> </services>
Simple examples:, (*17)
bin/console dtc:queue:create_job <worker> <method> <arg> bin/console dtc:queue:create_job fibonacci fibonacci 3 bin/console dtc:queue:create_job fibonacci fibonacciFile 8
// Dependency inject the worker or fetch it from the container $fibonacci = $container->get('App\Worker\Fibonacci'); // For Symfony 3.3, 3.4 // $fibonacci = $container->get('AppBundle\Worker\Fibonacci'); // // For Symfony 2, 3.0, 3.1, 3.2: // $fibonacci = $container->get('app.worker.fibonacci'); // Basic Examples $fibonacci->later()->fibonacci(20); $fibonacci->later()->fibonacciFile(20); // Batch Example $fibonacci->batchLater()->fibonacci(20); // Batch up runs into a single run // Timed Example $fibonacci->later(90)->fibonacci(20); // Run 90 seconds later // Priority // Note: whether 1 == High or Low priority is configurable, but by default it is High $fibonacci->later(0, 1); // As soon as possible, High priority $fibonacci->later(0, 125); // Medium priority $fibonacci->later(0, 255); // Low priority // Advanced Usage Example: // (If the job is not processed by $expireTime, then don't execute it ever...) $expireTime = time() + 3600; $fibonacci->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
For further instructions on creating jobs, including how to create a job from the command line using json-encoded arguments, see:, (*18)
/Resources/doc/create-job.md, (*19)
It's recommended that you background the following console commands, (*20)
bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds during which to keep executing jobs before ending. # For example you could put the above command into cron or a cron-like system to run every 2 minutes # # There are a number of other parameters that could be passed to dtc:queue:run run this for a full list: bin/console dtc:queue:run --help
For ODM and ORM based stores, the archive tables and the regular job table (queue) can require periodic pruning., (*21)
The regular job table is for waiting and running jobs. If a job throws an exception that can't be caught or the process segfaults, machine crashes, etc. then jobs which never finished can remain in the job queue in the "Running" state., (*22)
The archive table is where finished and errored jobs end up after execution; this table can grow indefinitely., (*23)
For Mongo in production, it may be prudent to use a capped collection or TTL Indexes, (*24)
For Mysql you could create an event to delete data periodically., (*25)
Nevertheless there are also several commands that exist that do similarly (and could be put into a periodic cron job as well):, (*26)
bin/console dtc:queue:prune old --older 1m # (deletes jobs older than one month from the Archive table) # Clear out stalled jobs from the regular job table: bin/console dtc:queue:prune stalled # If you're recording runs...this is recommended: bin/console dtc:queue:prune stalled_runs # If you're recording runs...another recommendation bin/console dtc:queue:prune old_runs --older 1m # If you're recording timings bin/console dtc:queue:prune old_job_timings --older 1m # You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour) # if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands
These commands may help with debugging issues with the queue:, (*27)
bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only) bin/console dtc:queue:reset # resets errored and/or stalled jobs # This is really only good for ORM/ODM based stores. bin/console dtc:queue:run --id={jobId} # (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)
Each job run can be tracked in a table in an ORM / ODM backed datastore., (*28)
Ways to configure: __app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*29)
dtc_queue: manager: # run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb) # If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run # to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save # run: orm # other possible option is "odm" (i.e. mongodb) # # (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:
Change the document manager, (*30)
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*31)
dtc_queue: odm: document_manager: {something} # default is "default"
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*32)
dtc_queue: manager: job: orm
Change the EntityManager:, (*33)
dtc_queue: orm: entity_manager: {something} # default is "default"
NOTE: You may need to add DtcQueueBundle to your mappings section in config.yml if auto_mapping is not enabled, (*34)
doctrine: #... orm: #... mappings: DtcQueueBundle: ~
If you plan on using ODM or Redis or another configuration, but you have Doctrine ORM enabled elsewhere, it's recommended that you use the schema_filter configuration parameter so that schema dumps and/or migration diffs don't pickup those tables (see issue #77)., (*35)
E.g., (*36)
doctrine: # ... dbal: # ... schema_filter: ~^(?!dtc_)~
(if you already have a schema_filter, you can just add the "dtc_" prefix to it.), (*37)
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*38)
dtc_queue: beanstalkd: host: beanstalkd tube: some-tube-name [optional] manager: job: beanstalkd
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*39)
dtc_queue: manager: job: rabbit_mq rabbit_mq: host: rabbitmq port: 5672 user: guest password: guest vhost: "/" [optional defaults to "/"] ssl: [optional defaults to false - toggles to use AMQPSSLConnection] options: [optional options to pass to AMQPStreamConnection or AMQPSSLConnection] ssl_options: [optional extra ssl options to pass to AMQPSSLConnection] queue_args: [optional] queue: [optional queue name] passive: [optional defaults to false] durable: [optional defaults to true] exlusive: [optional defaults to false] auto_delete: [optional defaults to false] exchange_args: [optional] exchange: [optional queue name] type: [optional defaults to "direct"] passive: [optional defaults to false] durable: [optional defaults to true] auto_delete: [optional defaults to false]
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*40)
dtc_queue: manager: job: redis redis: # choose one of the below snc_redis, predis, or phpredis snc_redis: type: predis alias: default predis: # choose one of dns or connection_parameters dsn: redis://localhost connection_parameters: scheme: tcp host: localhost port: 6379 path: ~ database: ~ password: ~ async: false persistent: false timeout: 5.0 read_write_timeout: ~ alias: ~ weight: ~ iterable_multibulk: false throw_errors: true phpredis: # minimum fill host and port if needed host: localhost port: 6379 timeout: 0 retry_interval: ~ read_timeout: 0 auth: ~
__app/config/config.yml:__ (symfony 2/3) __config/packages/dtc_queue.yaml:__ (symfony 4/5), (*41)
dtc_queue: class_job: Some\Job\ClassName [optional] manager: job: some_name [optional] # (create your own manager service and name or alias it: # dtc_queue.manager.job.<some_name> and put # <some_name> in the manager: job field above)
1) Extend the following:, (*42)
Dtc\QueueBundle\Document\Job Dtc\QueueBundle\Document\JobArchive
or, (*43)
Dtc\QueueBundle\Entity\Job Dtc\QueueBundle\Entity\JobArchive
(Depending on whether you're using MongoDB or an ORM), (*44)
2) Change the parameters on the class appropriately, (*45)
<?php namespace App\Entity; // Or whatever use Dtc\QueueBundle\Entity\Job as BaseJob; use Doctrine\ORM\Mapping as ORM; #[ORM\Entity] #[ORM\Table(name: 'job_some_other_name')] #[ORM\Index(columns: ['crc_hash', 'status'], name: 'job_crc_hash_idx')] #[ORM\Index(columns: ['priority', 'whenAt'], name: 'job_priority_idx')] #[ORM\Index(columns: ['whenAt'], name: 'job_when_idx')] #[ORM\Index(columns: ['status', 'whenAt'], name: 'job_status_idx')] class Job extends BaseJob { } // ... similarly for Entity\JobArchive if necessary
<?php namespace App\Document; use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM; use Dtc\QueueBundle\Document\Job as BaseJob; #[ODM\Document(db: 'my_db', collection: 'my_job_collection')] class Job extends BaseJob { } // ... similarly for Document\JobArchive if necessary
3) Add the new class(es) to config.yml, (*46)
# config.yml # ... dtc_queue: class_job: App\Entity\Job class_job_archive: App\Entity\JobArchive
It's useful to listen to event in a long running script to clear doctrine manager or send email about status of a job. To add a job event subscriber, create a new service with tag: dtc_queue.event_subscriber:, (*47)
services: voices.queue.listener.clear_manager: class: ClearManagerSubscriber arguments: - '@service_container' tags: - { name: dtc_queue.event_subscriber, connection: default }
ClearManagerSubscriber.php, (*48)
<?php use Dtc\QueueBundle\EventDispatcher\Event; use Dtc\QueueBundle\EventDispatcher\EventSubscriberInterface; use Symfony\Component\DependencyInjection\ContainerInterface; class ClearManagerSubscriber implements EventSubscriberInterface { private $container; public function __construct(ContainerInterface $container) { $this->container = $container; } public function onPostJob(Event $event) { $managerIds = [ 'doctrine.odm.mongodb.document_manager', 'doctrine.orm.default_entity_manager', 'doctrine.orm.content_entity_manager' ]; foreach ($managerIds as $id) { $manager = $this->container->get($id); $manager->clear(); } } public static function getSubscribedEvents() { return array( Event::POST_JOB => 'onPostJob', ); } }
# /etc/init/queue.conf author "David Tee" description "Queue worker service, run 20 jobs at a time, process timeout of 3600" respawn start on startup script /{path to}/console dtc:queue:run --max-count 20 -v -t 3600>> /var/logs/queue.log 2>&1 end script
NOTE: ORM And ODM (MongoDB) require mmucklo/grid-bundle in order to view the jobs/runs admin page., (*49)
You can register admin routes to see queue status. In your routing.yml file, add the following:, (*50)
dtc_queue: resource: '@DtcQueueBundle/Resources/config/routing.yml'
You can run unittest by typing bin/phpunit
in source folder. If you want to run
integration testing with Mongodb, you need to set up Mongodb server on
localhost and run:, (*51)
bin/phpunit Tests/Document/JobManagerTest.php
If you want to run Beanstalkd integration testing, you need to run a local, empty instance of beanstalkd for testing., (*52)
sudo service beanstalkd restart; BEANSTALKD_HOST=localhost bin/phpunit Tests/BeanStalkd/JobManagerTest.php
See /Resources/doc/full-configuration.md, (*53)
This bundle is under the MIT license., (*54)
Originally written by @dtee Enhanced and maintained by @mmucklo, (*55)
Message Queue Manager for symfony2.
MIT
queue message queue