WQ-AMQP (mle86/wq-amqp
)
This package contains the PHP class
mle86\WQ\WorkServerAdapter\AMQPWorkServer
., (*1)
It supplements the
mle86/wq package
by implementing its WorkServerAdapter
interface., (*2)
It connects to an AMQP server
such as RabbitMQ
using the php-amqplib/php-amqplib package., (*3)
It creates durable queues in the default exchange (“”) for immediate-delivery jobs
and the durable “_phpwq._delayed
” queue in the custom “_phpwq._delay_exchange
” exchange for delayed jobs.
Jobs stored with storeJob()
are always durable as well.
Empty queues or exchanges won't be deleted automatically., (*4)
Version and Compatibility
This is
version 1.4.0
of mle86/wq-amqp
., (*5)
It was developed for
version 1.0.0
of mle86/wq
and should be compatible
with all of its future 1.x versions as well., (*6)
Installation and Dependencies
$ composer require mle86/wq-amqp
It requires PHP 8.0+., (*7)
It depends on
mle86/wq
and php-amqplib/php-amqplib,
which in turn requires the
mbstring,
sockets,
and
bcmath
PHP extensions., (*8)
Class reference
class mle86\WQ\WorkServerAdapter\AMQPWorkServer implements WorkServerAdapter
, (*9)
-
public function __construct (AMQPStreamConnection $connection)
Constructor.
Takes an already-configured AMQPStreamConnection
instance to work with.
Does not attempt to establish a connection itself –
use the connect()
factory method for that instead.
-
public static function connect ($host = 'localhost', $port = 5672, $user = 'guest', $password = 'guest', $vhost = '/', $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $connection_timeout = 3.0, $read_write_timeout = 3.0, $context = null, $keepalive = false, $heartbeat = 0)
Factory method.
See AMQPStreamConnection::__construct
for the parameter descriptions.
Interface methods
which are documented in the WorkServerAdapter
interface:, (*10)
public function storeJob (string $workQueue, Job $job, int $delay = 0)
public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT): ?QueueEntry
public function buryEntry (QueueEntry $entry)
public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
public function deleteEntry (QueueEntry $entry)
Usage example
<?php
use mle86\WQ\WorkServerAdapter\AMQPWorkServer;
use mle86\WQ\WorkProcessor;
use mle86\WQ\Job\Job;
$processor = new WorkProcessor( AMQPWorkServer::connect("localhost") );
while (true) {
$processor->processNextJob("mail", function(Job $job) {
$job->...;
});
}
This executes all jobs available in the local AMQP server's “mail
” queue, forever.
It will however abort if one of the jobs throws an exception –
you might want to add a logging try-catch block around the processNextJob()
call
as shown in WQ's “Quick Start” example., (*11)