2017 © Pedro Peláez
 

symfony-bundle cqrs-bundle

Symfony Bundle that integrates Prooph CQRS/ES library

image

nixilla/cqrs-bundle

Symfony Bundle that integrates Prooph CQRS/ES library

  • Wednesday, March 1, 2017
  • by nixilla
  • Repository
  • 1 Watchers
  • 2 Stars
  • 109 Installations
  • PHP
  • 0 Dependents
  • 0 Suggesters
  • 1 Forks
  • 0 Open issues
  • 1 Versions
  • 6 % Grown

The README.md

CQRS/ES Bundle for Symfony which integrates Prooph library

This Symfony Bundle wires Prooph CQRS/ES library into Symfony project., (*1)

Build Status Coverage Status, (*2)

Installation

Install with composer:, (*3)

composer require nixilla/cqrs-bundle

Add budle to AppKernel:, (*4)

<?php

// app/AppKernel.php

class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = [
            // other bundles here,
            new Nixilla\CqrsBundle\NixillaCqrsBundle()
        ];

        return $bundles;
    }
}

Configure store adapter. Here is an example of setting MongoDB adapter:, (*5)

# app/config/services.yml

parameters:

    mongodb_server: mongodb://localhost:27017
    mongodb_dbname: my_database

services:
    mongo.client:
        class: MongoClient
        arguments: [ "%mongodb_server%" ]

    mongo.event.store.adapter:
        class: Prooph\EventStore\Adapter\MongoDb\MongoDbEventStoreAdapter
        arguments: [ "@prooph.message.factory", "@prooph.message.converter", "@mongo.client", "%mongodb_dbname%" ]
# app/config/config.yml

nixilla_cqrs:
    event_store:
        adapter: mongo.event.store.adapter

Usage

CQRS/ES is quite simple and easy in development, but on the other hand quite time consuming. For each event you want to record in the system, you need to create several classes. Below I'll show you how to create simple newsletter signup event with this bundle., (*6)

Here is the list of CQRS/ES artifacts we will create in order, (*7)

  • CreateContact - command
  • CreateContactHandler - command handler
  • ContactRepository - class that interacts with data store - similar concept as Doctrine repository
  • ContactCreated - this is the event class that gets persisted to data store
  • MarketingNotificationListener - a sample listener which send Hipchat notification
  • CampaignMonitorContactCreatedProjector - a sample projector, which updates Campaign Monitor

Let's start with non-cqrs stuff, and that is Symfony form and controller. I've omitted some code (like constructors and use statements) to make it shorter., (*8)

<?php

namespace AppBundle\Form;

class BasicContactType extends AbstractType
{
    public function buildForm(FormBuilderInterface $builder, array $options)
    {
        $builder
            ->add('emailAddress', EmailType::class, [
                'constraints' => [ new NotBlank(), new Email() ]
            ])
        ;
    }
}
<?php

namespace AppBundle\Controller;

class ContactCreateController
{
    /** @var Prooph\ServiceBus\CommandBus */
    private $commandBus;

    /** @var Symfony\Component\Form\FormInterface */
    private $form;

    public function createAction(Request $request)
    {
        $this->form->submit(json_decode($request->getContent(), true));

        if($this->form->isValid())
        {
            $this->commandBus->dispatch(new CreateContact($this->form->getData()));
            return new Response('', 201);
        }

        return new Response($this->form->getErrors(true)->__toString(), 422);
    }
}

The commandBus service is provided by this bundle, but CreateContact command is something we will created now. This is the first CQRS artifact., (*9)

<?php

namespace Newsletter\Domain\Command;

class CreateContact extends Prooph\Common\Messaging\Command
{
    private $payload;

    public function __construct(array $payload)
    {
        $this->init();
        $this->setPayload($payload);
    }

    public function payload() { return $this->payload; }

    protected function setPayload(array $payload) { $this->payload = $payload; }
}

Now if configure routing like this:, (*10)

# src/AppBundle/Resources/config/routing.yml

contact_create:
    path: /contact
    defaults: { _controller: controller.contact.create:createAction }
    methods: [ POST ]

then you should be able to call it with curl like this:, (*11)

curl -v \
    -H "Content-Type: application/json" \
    -X POST \
    --data '{"emailAddress":"john@smith.local"}' \
    http://localhost/contact

When you actually run this command now, you'll get exception Message dispatch failed during locate-handler phase. Error: You have requested a non-existent service "handler.create_contact"., (*12)

The handler.create_contact is the service which we will create next. The only purpose for this service is to handle CreateContact command. In other words, each command has only one command handler, and each command handler can only handle one command., (*13)

<?php

namespace Newsletter\Domain\CommandHandler;

class CreateContactHandler
{
    /** @var Newsletter\Domain\Repository\ContactRepositoryInterface */
    private $repository;

    public function __invoke(CreateContact $command)
    {
        $this->repository->add(Contact::fromPayload($command->payload()));
    }
}

This handler require 2 additional CQRS artifacts., (*14)

First one is ContactRepository which usually has 2 methods: get and add., (*15)

<?php

namespace Newsletter\Domain\Repository;

interface ContactRepositoryInterface
{
    public function add(Newsletter\Domain\Aggregate\Contact $contact);
    public function get($id);
}

and implementation, (*16)

<?php

namespace AppBundle\Cqrs\Repository;

class ContactRepository implements Newsletter\Domain\Repository\ContactRepositoryInterface
{
    /** @var Prooph\EventStore\Aggregate\AggregateRepository */
    private $repository;

    public function add(Contact $contact) { $this->repository->addAggregateRoot($contact); }

    public function get($id) { return $this->repository->getAggregateRoot($id); }
}

Second one is event, which is recorded in data store. All events are in past tense. Good thing is that you don't have to write any code - just extend base class., (*17)

<?php
namespace Newsletter\Domain\Event;

use Prooph\EventSourcing\AggregateChanged;

class ContactCreated extends AggregateChanged
{
}

Now you need to let know Symfony how to construct service:, (*18)

# src/AppBundle/Resources/config/services.yml

parameters:

    form.contact.basic.type.class: AppBundle\Form\BasicContactType

services:

    aggregate.type.contact:
        class: Prooph\EventStore\Aggregate\AggregateType
        factory: [ 'Prooph\EventStore\Aggregate\AggregateType', 'fromAggregateRootClass']
        arguments: [ 'Newsletter\Domain\Aggregate\Contact' ]

    repository.aggregate.contact:
        class: Prooph\EventStore\Aggregate\AggregateRepository
        arguments: [ "@prooph.event.store", "@aggregate.type.contact", "@prooph.aggregate.translator" ]

    repository.document:
        class: AppBundle\Cqrs\Repository\ContactRepository
        arguments: [ "@repository.aggregate.contact" ]

    handler.create_contact:
        class: Newsletter\Domain\CommandHandler\CreateContactHandler
        arguments: [ "@repository.document" ]

    form.contact.basic.type:
        class: "%form.contact.basic.type.class%"
        tags:
            - { name: "form.type" }

    form.contact.basic:
        class: Symfony\Component\Form\FormInterface
        factory: [ "@form.factory", create ]
        arguments: [ "%form.contact.basic.type.class%" ]

    controller.contact.create:
        class: AppBundle\Controller\ContactCreateController
        arguments: [ "@prooph.command.bus", "@form.contact.basic", "@prooph.event.publisher" ]

Please note that the number of params passed to controller is higher than the number it accepts. @todo explain why, (*19)

Now if you run the curl command listed above, you should get HTTP 201 Created and you should see this record in your MongoDB:, (*20)

{
    "_id" : "2d44331b-9d0d-47fa-b5be-01cd247c8a70",
    "version" : 1,
    "event_name" : "Newsletter\\Domain\\Event\\ContactCreated",
    "payload" : {
        "emailAddress" : "john@smith.local"
    },
    "created_at" : "2017-02-28T16:51:27.414000",
    "aggregate_id" : "john@smith.local",
    "aggregate_type" : "Newsletter\\Domain\\Aggregate\\Contact",
    "causation_id" : "21cd3129-0216-4437-86bc-9d7ede0bb08c",
    "causation_name" : "Newsletter\\Domain\\Command\\CreateContact"
}

Because there is a 1-to-1 relation between Command and CommandHandler, this bundle assumes following configuration:, (*21)

  • if command is called \Any\Namespace\SomeCommand then the service id that handles this command should be called handler.some_command
  • namespace is ignored, only class name is used
  • class name CamelCased is converted to snake_case using Symfony own CamelCaseToSnakeCaseNameConverter

Listeners and Projectors

Although Prooph library does not distinguish listeners and projectors, this bundle supports this separation., (*22)

Why separate it? ES - Event Sourcing is a concept of storing data as series of events. At some point you will want to replay all events and construct new read model. This may happen in various situations, for example when someone asks you:, (*23)

  • what is the newsletter monthly subscription frequency?
  • or "Can you send this newsletter to all people why subscribed over the August only?"

The idea is that listeners are run only once, when the event actually occurs (real time). For example, you may want to notify marketing team that they have new newsletter signup. You don't want to execute listeners when you replay events to compile new read model or new report., (*24)

Projectors on the other hand, can be run as many time as you want. The purpose of the projector is update read model. Updating read model could be:, (*25)

  • writing SQL commands to read database
  • writing cache to Varnish
  • writing data directly to ElasticSearch
  • building static HTML files

A listener may look like this:, (*26)

<?php

namespace AppBundle\Cqrs\Listeners;

class MarketingNotificationListener
{
    /**
     * @var HipchatNotifier (from nixilla/hipchat-bundle)
     */
    private $notifier;

    public function __invoke(ContactCreated $event)
    {
        $payload = $event->payload();
        $message = sprintf("New Newsletter subscription, email address '%s'", $payload['emailAddress']);
        $this->notifier->notify('red', $message, 'text', true);
    }
}

and to make it work, you need to let know Symfony how to inject it., (*27)


# src/AppBundle/Resources/config/services.yml services: listener.contact_created: class: AppBundle\Cqrs\Listeners\MarketingNotificationListener arguments: [ "@hipchat.notifier" ] tags: - { name: cqrs.event.listener, event: Newsletter\Domain\Event\ContactCreated }

A single event can have many event listeners and many projectors. So in order to configure listener, you need to tag service with { name: cqrs.event.listener, event: Newsletter\Domain\Event\ContactCreated } This will add listener to array of listeners for given event class, and when that even occurs, all listeners will be executed., (*28)

A sample projector may look like this:, (*29)

<?php

namespace AppBundle\Cqrs\Projectors;

class CampaignMonitorContactCreatedProjector
{
    private $campaignMonitor;

    public function __invoke(ContactCreated $event)
    {
        $payload = $event->payload();
        $this->campaignMonitor->subscribe($payload['emailAddress'], $listId = 'my list id');
    }
}

wiring is similar to what you see in the listeners section above, but it is important to tag it with cqrs.event.projector., (*30)


# src/AppBundle/Resources/config/services.yml services: projector.campaign_monitor.contact_created: class: AppBundle\Cqrs\Projectors\CampaignMonitorContactCreatedProjector arguments: [ "@campaign.monitor" ] tags: - { name: cqrs.event.projector, event: Newsletter\Domain\Event\ContactCreated }

The Versions