Message Bus - Symfony Event Dispatcher Infrastructure
Symfony Event Dispatcher infrastructure for Message Bus, (*1)
Installation
composer require webit/message-bus-sf-event-dispatcher=^1.0.0
Usage
Publisher integration
To publish Message via Symfony Event Dispatcher use EventDispatcherPublisher, (*2)
MessageBusEventFactory
You need to tell the EventDispatcherPublisher how to translate your Message
into the event name and event object of Symfony Event Dispatcher.
Implement and configure MessageBusEventFactory., (*3)
MessageBusEventFactory: Example
Let's say you're going to publish messages of two types: type-1 and type-2
and you want to map then to two different Events of Symfony Event Dispatcher Event1 and Event2., (*4)
use Symfony\Component\EventDispatcher\Event;
class Event1 extends Event
{
private $x;
public function __construct($x) {
$this->x = $x;
}
public function x()
{
return $this->x;
}
}
class Event2 extends Event
{
private $y;
private $z;
public function __construct($y, $z) {
$this->y = $y;
$this->z = $z;
}
public function y()
{
return $this->y;
}
public function z()
{
return $this->z;
}
}
Option 1: implement MessageBusEventFactory
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\MessageBusEventFactory;
class MessageBusEvent1Factory implements MessageBusEventFactory
{
public function create(Message $message): MessageBusEventFactory
{
$arContent = json_decode($message->content(), true);
return new MessageBusEvent(
$message->type(),
new Event1(isset($arContent['x']) ? $arContent['x'] : '')
);
}
}
class MessageBusEvent2Factory implements MessageBusEventFactory
{
public function create(Message $message): MessageBusEventFactory
{
$arContent = json_decode($message->content(), true);
return new MessageBusEvent(
$message->type(),
new Event2(
isset($arContent['y']) ? $arContent['y'] : '',
isset($arContent['z']) ? $arContent['z'] : '',
)
);
}
}
Then combine both factories together, (*5)
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\ByMessageTypeMessageBusEventFactory;
$messageBusEventFactory = new ByMessageTypeMessageBusEventFactory([
'type-1' => new MessageBusEvent1Factory(),
'type-2' => new MessageBusEvent2Factory()
]);
Option 2: Use GenericMessageBusEventFactory and implement its dependencies
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\Symfony\CallbackSymfonyEventFactory;
$eventFactory1 = new CallbackSymfonyEventFactory(
function (Message $message) {
$arContent = json_decode($message->content(), true);
return new MessageBusEvent(
$message->type(),
new Event1(isset($arContent['x']) ? $arContent['x'] : '')
);
}
);
$eventFactory2 = new CallbackSymfonyEventFactory(
function (Message $message) {
$arContent = json_decode($message->content(), true);
return new MessageBusEvent(
$message->type(),
new Event2(
isset($arContent['y']) ? $arContent['y'] : '',
isset($arContent['z']) ? $arContent['z'] : '',
)
);
}
);
then, (*6)
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\GenericMessageBusEventFactory;
$messageBusEventFactory1 = new GenericMessageBusEventFactory(
$eventFactory1,
new FromMessageTypeEventNameResolver() // optional, used be default, you can provide a different implemenation
);
$messageBusEventFactory2 = new GenericMessageBusEventFactory(
$eventFactory2
);
// combine both factories together
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\Event\ByMessageTypeMessageBusEventFactory;
$messageBusEventFactory = new ByMessageTypeMessageBusEventFactory([
'type-1' => $messageBusEventFactory1,
'type-2' => $messageBusEventFactory2
]);
Option 3: Implement your own strategy
As EventDispatcherPublisher expects an interface MessageBusEventFactory as a dependency,
you can provide your own implementation for it. Also you can provide and combine
inner interfaces used by GenericMessageBusEventFactory: SymfonyEventFactory and EventNameResolver., (*7)
If you like JMSSerializer to produce Symfony Event object, use JMSSerializerSymfonyEventFactory., (*8)
Putting the stuff together
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Publisher\EventDispatcherPublisher;
use Webit\MessageBus\Message;
use Symfony\Component\EventDispatcher\EventDispatcher;
$eventDispatcher = new EventDispatcher();
$publisher = new EventDispatcherPublisher(
$eventDispatcher,
$messageBusEventFactory
);
$message = new Message('type-1', '{"x":"some-x"}');
$publisher->publish($message); // will be dispatched as "event-1" and event of "Event1" class
$message = new Message('type-2', '{"y":"some-y","z":"some-z"}');
$publisher->publish($message); // will be dispatched as "event-1" and event of "Event1" class
Event consumption
Why to consume events at all?
-
Dispatches public events to the Message Bus
If you want some events to be public and other applications be able to listen to them,
use PublishingConsumer to publish them using different infrastructure (AMQP for example)., (*9)
-
Asynchronous events processing
If you want some events to be processed asynchronously,
use PublishingConsumer to publish them using different infrastructure (AMQP for example),
then listen for them., (*10)
To consume Message created from Event of Symfony Event Dispatcher, use EventConsumingListener.
It requires MessageFromEventFactory and Consumer to be provided., (*11)
GenericMessageFromEventFactory
It requires EventSerialiser and MessageTypeResolver (by default uses event name), (*12)
Option 1: Implement own EventSerialiser
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\EventSerialiser;
use Symfony\Component\EventDispatcher\Event;
class Event1Serializer implements EventSerialiser
{
public function serialise(MessageBusEvent $event): string
{
$symfonyEvent = $event->event();
if ($symfonyEvent instanceof Event1) {
return json_encode(['x' => $symfonyEvent->x()]);
}
throw new \InvalidArgumentException('Event must be an instance of Event1.');
}
}
Option 2: Use JMSSerializer to Serialise Event
use JMS\Serializer\SerializerBuilder;
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\JmsEventSerialiser;
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\Content\EventOnlySerialisationDataProvider;
$serializerBuilder = SerializerBuilder::create();
// configure Serializer
$serializer = $serializerBuilder->build();
$jsmEventSerialiser = new JmsEventSerialiser(
$serializer,
new EventOnlySerialisationDataProvider(), // used by default, provides data to be passed to the JMSSerializer,
JmsEventSerialiser::FORMAT_JSON // JSON by default, can be JmsEventSerialiser::FORMAT_XML as well
);
Use FromMessageAwareEventMessageFromEventFactory
Your event can optionally implements MessageAwareEvent interface., (*13)
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\MessageAwareEvent;
use Symfony\Component\EventDispatcher\Event;
class EventX extends Event implements MessageAwareEvent
{
public function createMessage(string $eventName): Message
{
return new Message($eventName, json_decode(['some'=>'stuff']));
}
}
Then you can use FromMessageAwareEventMessageFromEventFactory to produce an event, (*14)
Putting all together
Configure MessageFromEventFactory, (*15)
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\Message\ByEventNameMessageFromEventFactory;
$messageFactory = new ByEventNameMessageFromEventFactory([
'type-1' => new GenericMessageFromEventFactory(
new Event1Serializer()
),
'type-2' => new GenericMessageFromEventFactory($jsmEventSerialiser)
]);
Create a listener, (*16)
use Webit\MessageBus\Infrastructure\Symfony\EventDispatcher\Listener\EventConsumingListener;
$listener = new EventConsumingListener(
new VoidConsumer(),
$messageFactory
);
Register the listener on Symfony Event Dispatcher for all required events, (*17)
$eventDispatcher->addListener('type-1', $listener);
$eventDispatcher->addListener('type-2', $listener);
// will produce new Message('type-1', '{"x":"xxx"}') and pass to the consumer
$eventDispatcher->dispatch('type-1', new Event1('xxx'));
Running tests
Install dependencies with composer, (*18)
docker-compose run --rm composer
docker-compose run --rm spec