1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/** |
4
|
|
|
* This file is part of slick/cqrs-tools |
5
|
|
|
* |
6
|
|
|
* For the full copyright and license information, please view the LICENSE |
7
|
|
|
* file that was distributed with this source code. |
8
|
|
|
*/ |
9
|
|
|
|
10
|
|
|
namespace Slick\CQRSTools\Application\Projection; |
11
|
|
|
|
12
|
|
|
use DateTimeImmutable; |
13
|
|
|
use Exception; |
14
|
|
|
use Slick\CQRSTools\Domain\Event\EventStore; |
15
|
|
|
use Slick\CQRSTools\Domain\Event\EventStream; |
16
|
|
|
use Slick\CQRSTools\Domain\Event\Stream; |
17
|
|
|
use Slick\CQRSTools\Domain\Projection\ProjectorState; |
18
|
|
|
use Slick\CQRSTools\Domain\Projection\ProjectorStateLedger; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* Projectionist |
22
|
|
|
* |
23
|
|
|
* @package Slick\CQRSTools\Application\Projection |
24
|
|
|
*/ |
25
|
|
|
final class Projectionist implements ProgressStateProvider |
26
|
|
|
{ |
27
|
|
|
|
28
|
|
|
use ProgressStateProviderMethods; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var EventStore |
32
|
|
|
*/ |
33
|
|
|
private $eventStore; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @var ProjectorStateLedger |
37
|
|
|
*/ |
38
|
|
|
private $ledger; |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* @var EventHandlingStrategy |
42
|
|
|
*/ |
43
|
|
|
private $strategy; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* Creates a Projectionist |
47
|
|
|
* |
48
|
|
|
* @param EventStore $eventStore |
49
|
|
|
* @param ProjectorStateLedger $ledger |
50
|
|
|
* @param EventHandlingStrategy $strategy |
51
|
|
|
*/ |
52
|
|
|
public function __construct(EventStore $eventStore, ProjectorStateLedger $ledger, EventHandlingStrategy $strategy) |
53
|
|
|
{ |
54
|
|
|
$this->eventStore = $eventStore; |
55
|
|
|
$this->ledger = $ledger; |
56
|
|
|
$this->strategy = $strategy; |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* Plays provided list of projectors |
61
|
|
|
* |
62
|
|
|
* @param Projector[] $projectors |
63
|
|
|
* @throws Exception |
64
|
|
|
*/ |
65
|
|
View Code Duplication |
public function play(array $projectors): void |
|
|
|
|
66
|
|
|
{ |
67
|
|
|
foreach ($projectors as $projector) { |
68
|
|
|
$projectorState = $this->ledger->get($projector); |
69
|
|
|
|
70
|
|
|
// Play only when projector is not halted or being processed by other agent |
71
|
|
|
if ($projectorState->isHalted() || $projectorState->isProjecting()) { |
72
|
|
|
continue; |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
// If it's not halted, it already ran. |
76
|
|
|
if ($projectorState->projectorRunsFrom() === Projector::RUN_ONCE) { |
77
|
|
|
continue; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
$this->project($projector, $projectorState); |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* Boots provided list of projectors |
86
|
|
|
* |
87
|
|
|
* @param Projector[] $projectors |
88
|
|
|
* @throws Exception |
89
|
|
|
*/ |
90
|
|
View Code Duplication |
public function boot(array $projectors): void |
|
|
|
|
91
|
|
|
{ |
92
|
|
|
foreach ($projectors as $projector) { |
93
|
|
|
$projectorState = $this->ledger->get($projector); |
94
|
|
|
|
95
|
|
|
// Play only when projector is not halted or being processed by other agent |
96
|
|
|
if (!$projectorState->isHalted() || $projectorState->isProjecting()) { |
97
|
|
|
continue; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
$this->project($projector, $projectorState); |
101
|
|
|
} |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* Retires provided list of projectors |
106
|
|
|
* |
107
|
|
|
* @param Projector[] $projectors |
108
|
|
|
*/ |
109
|
|
|
public function retire(array $projectors): void |
110
|
|
|
{ |
111
|
|
|
foreach ($projectors as $projector) { |
112
|
|
|
$projectorState = $this->ledger->get($projector); |
113
|
|
|
$projectorState->halt("Projector was retired."); |
114
|
|
|
$projector->retire(); |
115
|
|
|
$this->releaseProjector($projectorState); |
116
|
|
|
} |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* Project stream into provided projector |
121
|
|
|
* |
122
|
|
|
* @param Projector $projector |
123
|
|
|
* @param ProjectorState $projectorState |
124
|
|
|
* |
125
|
|
|
* @throws Exception |
126
|
|
|
*/ |
127
|
|
|
private function project(Projector $projector, ProjectorState $projectorState): void |
128
|
|
|
{ |
129
|
|
|
$eventStream = $this->eventStream($projectorState); |
130
|
|
|
if ($eventStream->isEmpty()) { |
131
|
|
|
return; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
$this->secureProjector($projectorState); |
135
|
|
|
$this->notifyMaxSteps($eventStream->count()); |
136
|
|
|
|
137
|
|
|
foreach ($eventStream as $event) { |
138
|
|
|
try { |
139
|
|
|
if ($this->strategy->handle($event, $projector)) { |
140
|
|
|
$this->notifyHandledEvent(); |
141
|
|
|
} |
142
|
|
|
$projectorState->lastEventWas($event); |
143
|
|
|
} catch (Exception $caught) { |
144
|
|
|
$projectorState->halt($caught->getMessage().' EventId: '.$event->eventId()); |
145
|
|
|
break; |
146
|
|
|
} |
147
|
|
|
$this->notifyAdvance(); |
148
|
|
|
} |
149
|
|
|
$this->notifyFinish(); |
150
|
|
|
$this->releaseProjector($projectorState); |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* Retrieve the event stream for provided projector state |
155
|
|
|
* |
156
|
|
|
* @param ProjectorState $projectorState |
157
|
|
|
* |
158
|
|
|
* @return EventStream |
159
|
|
|
* @throws Exception |
160
|
|
|
*/ |
161
|
|
|
private function eventStream(ProjectorState $projectorState): Stream |
162
|
|
|
{ |
163
|
|
|
switch ($projectorState->projectorRunsFrom()) { |
164
|
|
View Code Duplication |
case Projector::RUN_FROM_NOW: |
|
|
|
|
165
|
|
|
$date = new DateTimeImmutable('now'); |
166
|
|
|
$eventStream = !$projectorState->lastEvent() |
167
|
|
|
? $this->eventStore->allStoredEventsSince($date) |
168
|
|
|
: $this->eventStore->allStoredEventsSince($projectorState->lastEvent()); |
169
|
|
|
break; |
170
|
|
|
|
171
|
|
View Code Duplication |
case Projector::RUN_FROM_BEGINNING: |
|
|
|
|
172
|
|
|
$eventStream = $projectorState->lastEvent() |
173
|
|
|
? $this->eventStore->allStoredEventsSince($projectorState->lastEvent()) |
174
|
|
|
: $this->eventStore->allStoredEvents(); |
175
|
|
|
break; |
176
|
|
|
|
177
|
|
|
case Projector::RUN_ONCE: |
178
|
|
|
default: |
179
|
|
|
$eventStream = $this->eventStore->allStoredEvents(); |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
return $eventStream; |
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
/** |
186
|
|
|
* Secure projector to avoid duplications |
187
|
|
|
* |
188
|
|
|
* @param ProjectorState $projectorState |
189
|
|
|
*/ |
190
|
|
|
private function secureProjector(ProjectorState $projectorState): void |
191
|
|
|
{ |
192
|
|
|
$projectorState->startProjecting(); |
193
|
|
|
$this->ledger->update($projectorState); |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* Releases the projector so that it can be played by other agent |
198
|
|
|
* |
199
|
|
|
* @param ProjectorState $projectorState |
200
|
|
|
*/ |
201
|
|
|
private function releaseProjector(ProjectorState $projectorState): void |
202
|
|
|
{ |
203
|
|
|
$projectorState->stopProjecting(); |
204
|
|
|
$this->ledger->update($projectorState); |
205
|
|
|
} |
206
|
|
|
} |
207
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.