SagaRunner   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 138
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 13

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 13
dl 0
loc 138
ccs 57
cts 57
cp 1
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A setTaskProgressReporter() 0 4 1
B feedSagaWithEvents() 0 42 6
A getEventClassesFromMethods() 0 9 2
A findMethodsByEventClass() 0 12 3
B processEvent() 0 26 5
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Saga;
7
8
9
use Gica\CodeAnalysis\MethodListenerDiscovery;
10
use Gica\CodeAnalysis\MethodListenerDiscovery\ListenerClassValidator\AnyPhpClassIsAccepted;
11
use Gica\CodeAnalysis\MethodListenerDiscovery\ListenerMethod;
12
use Gica\Cqrs\Command\CodeAnalysis\WriteSideEventHandlerDetector;
13
use Gica\Cqrs\Event\EventWithMetaData;
14
use Gica\Cqrs\EventProcessing\ConcurentEventProcessingException;
15
use Gica\Cqrs\EventStore;
16
use Gica\Cqrs\ProgressReporting\TaskProgressCalculator;
17
use Gica\Cqrs\ProgressReporting\TaskProgressReporter;
18
use Gica\Cqrs\Saga\SagaRunner\EventProcessingHasStalled;
19
use Psr\Log\LoggerInterface;
20
21
/**
22
 * This class can be run in background to feed a saga with events
23
 */
24
class SagaRunner
25
{
26
27
    /**
28
     * @var EventStore
29
     */
30
    private $eventStore;
31
    /**
32
     * @var LoggerInterface
33
     */
34
    private $logger;
35
    /**
36
     * @var SagaEventTrackerRepository
37
     */
38
    private $sagaRepository;
39
    /**
40
     * @var TaskProgressReporter|null
41
     */
42
    private $taskProgressReporter;
43
44 4
    public function __construct(
45
        EventStore $eventStore,
46
        LoggerInterface $logger,
47
        SagaEventTrackerRepository $sagaRepository
48
    )
49
    {
50 4
        $this->eventStore = $eventStore;
51 4
        $this->logger = $logger;
52 4
        $this->sagaRepository = $sagaRepository;
53 4
    }
54
55 1
    public function setTaskProgressReporter(?TaskProgressReporter $taskProgressReporter)
56
    {
57 1
        $this->taskProgressReporter = $taskProgressReporter;
58 1
    }
59
60 4
    public function feedSagaWithEvents($saga, ?int $afterSequence = null)
61
    {
62 4
        $discoverer = new MethodListenerDiscovery(
63 4
            new WriteSideEventHandlerDetector(),
64 4
            new AnyPhpClassIsAccepted()
65
        );
66
67 4
        $allMethods = $discoverer->findListenerMethodsInClass(get_class($saga));
68
69 4
        $eventClasses = $this->getEventClassesFromMethods($allMethods);
70
71 4
        $this->logger->info(print_r($eventClasses, true));
72 4
        $this->logger->info("loading events...");
73
74 4
        $allEvents = $this->eventStore->loadEventsByClassNames($eventClasses);
75
76 4
        if (null !== $afterSequence) {
77 1
            $allEvents->afterSequence($afterSequence);
78
        }
79
80 4
        $this->logger->info("processing events...");
81
82 4
        $taskProgress = null;
83
84 4
        if ($this->taskProgressReporter) {
85 1
            $taskProgress = new TaskProgressCalculator($allEvents->countCommits());
86
        }
87
88 4
        foreach ($allEvents->fetchCommits() as $eventsCommit) {
89
90 3
            $eventsCommit = $eventsCommit->filterEventsByClass($eventClasses);
91
92 3
            foreach ($eventsCommit->getEventsWithMetadata() as $eventWithMetadata) {
93 3
                $this->processEvent($saga, $eventWithMetadata, $allMethods);
94
            }
95
96 2
            if ($this->taskProgressReporter) {
97 1
                $taskProgress->increment();
98 2
                $this->taskProgressReporter->reportProgressUpdate($taskProgress->getStep(), $taskProgress->getTotalSteps(), $taskProgress->calculateSpeed(), $taskProgress->calculateEta());
99
            }
100
        }
101 3
    }
102
103
    /**
104
     * @param ListenerMethod[] $methods
105
     * @return array
106
     */
107 4
    private function getEventClassesFromMethods($methods)
108
    {
109 4
        $eventClasses = [];
110 4
        foreach ($methods as $listenerMethod) {
111 4
            $eventClasses[] = $listenerMethod->getEventClassName();
112
        }
113
114 4
        return $eventClasses;
115
    }
116
117
    /**
118
     * @param string $eventClass
119
     * @param ListenerMethod[] $allMethods
120
     * @return ListenerMethod[]
121
     */
122 3
    private function findMethodsByEventClass(string $eventClass, $allMethods)
123
    {
124 3
        $result = [];
125
126 3
        foreach ($allMethods as $listenerMethod) {
127 3
            if ($listenerMethod->getEventClassName() == $eventClass) {
128 3
                $result[] = $listenerMethod;
129
            }
130
        }
131
132 3
        return $result;
133
    }
134
135 3
    private function processEvent($saga, EventWithMetaData $eventWithMetadata, $allMethods): void
136
    {
137
        /** @var EventWithMetaData $eventWithMetadata */
138 3
        $methods = $this->findMethodsByEventClass(get_class($eventWithMetadata->getEvent()), $allMethods);
139 3
        $metaData = $eventWithMetadata->getMetaData();
140
141 3
        $sagaId = get_class($saga);
142
143 3
        foreach ($methods as $method) {
144
145
            try {
146 3
                if ($this->sagaRepository->isEventProcessingAlreadyStarted($sagaId, $metaData->getEventId())) {
147 2
                    if (!$this->sagaRepository->isEventProcessingAlreadyEnded($sagaId, $metaData->getEventId())) {
148 2
                        throw new EventProcessingHasStalled($eventWithMetadata);
149
                    }
150
                } else {
151 2
                    $this->sagaRepository->startProcessingEvent($sagaId, $metaData->getEventId());
152 1
                    call_user_func([$saga, $method->getMethodName()], $eventWithMetadata->getEvent(), $eventWithMetadata->getMetaData());
153 1
                    $this->sagaRepository->endProcessingEvent($sagaId, $metaData->getEventId());
154
                }
155 2
            } catch (ConcurentEventProcessingException $exception) {
156 1
                $this->logger->info("concurent event processing exception, skipping...");
157 2
                continue;
158
            }
159
        }
160
    }
161
}