Passed
Push — master ( 56fedc...5e064c )
by Constantin
02:52
created

SagaRunner::processEvent()   B

Complexity

Conditions 5
Paths 9

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 5

Importance

Changes 0
Metric Value
dl 0
loc 26
ccs 15
cts 15
cp 1
rs 8.439
c 0
b 0
f 0
cc 5
eloc 16
nc 9
nop 3
crap 5
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Saga;
7
8
9
use Gica\CodeAnalysis\MethodListenerDiscovery;
10
use Gica\CodeAnalysis\MethodListenerDiscovery\ListenerClassValidator\AnyPhpClassIsAccepted;
11
use Gica\CodeAnalysis\MethodListenerDiscovery\ListenerMethod;
12
use Gica\CodeAnalysis\Shared\ClassSorter\ByConstructorDependencySorter;
13
use Gica\Cqrs\Command\CodeAnalysis\WriteSideEventHandlerDetector;
14
use Gica\Cqrs\Event\EventWithMetaData;
15
use Gica\Cqrs\EventProcessing\ConcurentEventProcessingException;
16
use Gica\Cqrs\EventStore;
17
use Gica\Cqrs\ProgressReporting\TaskProgressCalculator;
18
use Gica\Cqrs\ProgressReporting\TaskProgressReporter;
19
use Gica\Cqrs\Saga\SagaRunner\EventProcessingHasStalled;
20
use Psr\Log\LoggerInterface;
21
22
/**
23
 * This class can be run in background to feed a saga with events
24
 */
25
class SagaRunner
26
{
27
28
    /**
29
     * @var EventStore
30
     */
31
    private $eventStore;
32
    /**
33
     * @var LoggerInterface
34
     */
35
    private $logger;
36
    /**
37
     * @var SagaEventTrackerRepository
38
     */
39
    private $sagaRepository;
40
    /**
41
     * @var TaskProgressReporter|null
42
     */
43
    private $taskProgressReporter;
44
45 4
    public function __construct(
46
        EventStore $eventStore,
47
        LoggerInterface $logger,
48
        SagaEventTrackerRepository $sagaRepository
49
    )
50
    {
51 4
        $this->eventStore = $eventStore;
52 4
        $this->logger = $logger;
53 4
        $this->sagaRepository = $sagaRepository;
54 4
    }
55
56 1
    public function setTaskProgressReporter(?TaskProgressReporter $taskProgressReporter)
57
    {
58 1
        $this->taskProgressReporter = $taskProgressReporter;
59 1
    }
60
61 4
    public function feedSagaWithEvents($saga, ?int $afterSequence = null)
62
    {
63 4
        $discoverer = new MethodListenerDiscovery(
64 4
            new WriteSideEventHandlerDetector(),
65 4
            new AnyPhpClassIsAccepted(),
66 4
            new ByConstructorDependencySorter()
67
        );
68
69 4
        $allMethods = $discoverer->findListenerMethodsInClass(get_class($saga));
70
71 4
        $eventClasses = $this->getEventClassesFromMethods($allMethods);
72
73 4
        $this->logger->info(print_r($eventClasses, true));
74 4
        $this->logger->info("loading events...");
75
76 4
        $allEvents = $this->eventStore->loadEventsByClassNames($eventClasses);
77
78 4
        if (null !== $afterSequence) {
79 1
            $allEvents->afterSequence($afterSequence);
80
        }
81
82 4
        $this->logger->info("processing events...");
83
84 4
        $taskProgress = null;
85
86 4
        if ($this->taskProgressReporter) {
87 1
            $taskProgress = new TaskProgressCalculator($allEvents->countCommits());
88
        }
89
90 4
        foreach ($allEvents->fetchCommits() as $eventsCommit) {
91
92 3
            $eventsCommit = $eventsCommit->filterEventsByClass($eventClasses);
93
94 3
            foreach ($eventsCommit->getEventsWithMetadata() as $eventWithMetadata) {
95 3
                $this->processEvent($saga, $eventWithMetadata, $allMethods);
96
            }
97
98 2
            if ($this->taskProgressReporter) {
99 1
                $taskProgress->increment();
100 1
                $this->taskProgressReporter->reportProgressUpdate($taskProgress->getStep(), $taskProgress->getTotalSteps(), $taskProgress->calculateSpeed(), $taskProgress->calculateEta());
101
            }
102
        }
103 3
    }
104
105
    /**
106
     * @param ListenerMethod[] $methods
107
     * @return array
108
     */
109 4
    private function getEventClassesFromMethods($methods)
110
    {
111 4
        $eventClasses = [];
112 4
        foreach ($methods as $listenerMethod) {
113 4
            $eventClasses[] = $listenerMethod->getEventClassName();
114
        }
115
116 4
        return $eventClasses;
117
    }
118
119
    /**
120
     * @param string $eventClass
121
     * @param ListenerMethod[] $allMethods
122
     * @return ListenerMethod[]
123
     */
124 3
    private function findMethodsByEventClass(string $eventClass, $allMethods)
125
    {
126 3
        $result = [];
127
128 3
        foreach ($allMethods as $listenerMethod) {
129 3
            if ($listenerMethod->getEventClassName() == $eventClass) {
130 3
                $result[] = $listenerMethod;
131
            }
132
        }
133
134 3
        return $result;
135
    }
136
137 3
    private function processEvent($saga, EventWithMetaData $eventWithMetadata, $allMethods): void
138
    {
139
        /** @var EventWithMetaData $eventWithMetadata */
140 3
        $methods = $this->findMethodsByEventClass(get_class($eventWithMetadata->getEvent()), $allMethods);
141 3
        $metaData = $eventWithMetadata->getMetaData();
142
143 3
        $sagaId = get_class($saga);
144
145 3
        foreach ($methods as $method) {
146
147
            try {
148 3
                if ($this->sagaRepository->isEventProcessingAlreadyStarted($sagaId, $metaData->getEventId())) {
149 2
                    if (!$this->sagaRepository->isEventProcessingAlreadyEnded($sagaId, $metaData->getEventId())) {
150 1
                        throw new EventProcessingHasStalled($eventWithMetadata);
151
                    }
152
                } else {
153 2
                    $this->sagaRepository->startProcessingEvent($sagaId, $metaData->getEventId());
154 1
                    call_user_func([$saga, $method->getMethodName()], $eventWithMetadata->getEvent(), $eventWithMetadata->getMetaData());
155 1
                    $this->sagaRepository->endProcessingEvent($sagaId, $metaData->getEventId());
156
                }
157 2
            } catch (ConcurentEventProcessingException $exception) {
158 1
                $this->logger->info("concurent event processing exception, skipping...");
159 1
                continue;
160
            }
161
        }
162
    }
163
}