SagasOnlyOnceEventDispatcher::dispatchEvent()   B
last analyzed

Complexity

Conditions 6
Paths 10

Size

Total Lines 35
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 35
ccs 22
cts 22
cp 1
rs 8.439
c 0
b 0
f 0
cc 6
eloc 23
nc 10
nop 1
crap 6
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Saga;
7
8
9
use Gica\Cqrs\Event\EventDispatcher;
10
use Gica\Cqrs\Event\EventSubscriber;
11
use Gica\Cqrs\Event\EventWithMetaData;
12
use Gica\Cqrs\EventProcessing\ConcurentEventProcessingException;
13
use Psr\Log\LoggerInterface;
14
15
class SagasOnlyOnceEventDispatcher implements EventDispatcher
16
{
17
18
    /** @var EventSubscriber */
19
    private $eventSubscriber;
20
    /**
21
     * @var SagaEventTrackerRepository
22
     */
23
    private $trackerRepository;
24
    /**
25
     * @var LoggerInterface
26
     */
27
    private $logger;
28
29 4
    public function __construct(
30
        SagaEventTrackerRepository $trackerRepository,
31
        EventSubscriber $eventSubscriber,
32
        LoggerInterface $logger
33
    )
34
    {
35 4
        $this->eventSubscriber = $eventSubscriber;
36 4
        $this->trackerRepository = $trackerRepository;
37 4
        $this->logger = $logger;
38 4
    }
39
40 4
    public function dispatchEvent(EventWithMetaData $eventWithMetadata)
41
    {
42 4
        $listeners = $this->eventSubscriber->getListenersForEvent($eventWithMetadata->getEvent());
43
44 4
        foreach ($listeners as $listener) {
45 4
            $metaData = $eventWithMetadata->getMetaData();
46
47 4
            if (is_array($listener)) {
48 4
                $saga = $listener[0];
49
50 4
                $sagaId = get_class($saga);
51
52 4
                if (!$this->trackerRepository->isEventProcessingAlreadyStarted($sagaId, $metaData->getEventId())) {
53
                    try {
54 3
                        $this->trackerRepository->startProcessingEvent($sagaId, $metaData->getEventId());
55 2
                        call_user_func($listener, $eventWithMetadata->getEvent(), $metaData);
56 1
                        $this->trackerRepository->endProcessingEvent($sagaId, $metaData->getEventId());
57 2
                    } catch (ConcurentEventProcessingException $exception) {
58 1
                        continue;
59 1
                    } catch (\Throwable $exception) {
60 1
                        $this->logger->error($exception->getMessage(), [
61 1
                            'saga'  => get_class($saga),
62
                            'event' => [
63 1
                                'class'    => get_class($eventWithMetadata->getEvent()),
64 1
                                'sequence' => $metaData->getSequence(),
65 1
                                'index'    => $metaData->getIndex(),
66
                            ],
67 1
                            'file'  => $exception->getFile(),
68 3
                            'line'  => $exception->getLine(),
69
                        ]);
70
                    }
71
                }
72
            }
73
        }
74
    }
75
}