Completed
Push — master ( f318b1...b9f34b )
by Freek
05:03 queued 43s
created

Projectionist::replayEvents()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 9.504
c 0
b 0
f 0
cc 2
nc 1
nop 3
1
<?php
2
3
namespace Spatie\EventProjector;
4
5
use Exception;
6
use Illuminate\Support\Collection;
7
use Spatie\EventProjector\EventHandlers\EventHandler;
8
use Spatie\EventProjector\EventHandlers\EventHandlerCollection;
9
use Spatie\EventProjector\Events\EventHandlerFailedHandlingEvent;
10
use Spatie\EventProjector\Events\FinishedEventReplay;
11
use Spatie\EventProjector\Events\ProjectorDidNotHandlePriorEvents;
12
use Spatie\EventProjector\Events\StartingEventReplay;
13
use Spatie\EventProjector\Models\StoredEvent;
14
use Spatie\EventProjector\Projectors\Projector;
15
use Illuminate\Contracts\Container\Container;
16
use Spatie\EventProjector\Exceptions\InvalidEventHandler;
17
18
class Projectionist
19
{
20
    /** @var array */
21
    protected $config;
22
23
    /** @var \Spatie\EventProjector\EventHandlers\EventHandlerCollection */
24
    protected $projectors;
25
26
    /** @var \Spatie\EventProjector\EventHandlers\EventHandlerCollection */
27
    protected $reactors;
28
29
    /** @var bool */
30
    protected $isReplayingEvents = false;
31
32
    public function __construct(array $config)
33
    {
34
        $this->projectors = new EventHandlerCollection();
35
36
        $this->reactors = new EventHandlerCollection();
37
38
        $this->config = $config;
39
    }
40
41
    public function addProjector($projector): Projectionist
42
    {
43
        if (is_string($projector)) {
44
            $projector = app($projector);
45
        }
46
47
        if (! $projector instanceof Projector) {
48
            throw InvalidEventHandler::notAProjector($projector);
49
        }
50
51
        $this->projectors->add($projector);
52
53
        return $this;
54
    }
55
56
    public function addProjectors(array $projectors): Projectionist
57
    {
58
        foreach($projectors as $projector) {
59
            $this->addProjector($projector);
60
        }
61
62
        return $this;
63
    }
64
65
    public function getProjectors(): Collection
66
    {
67
        return $this->projectors->all();
68
    }
69
70
    public function getProjector(string $name): ?Projector
71
    {
72
        return $this->projectors->all()->first(function (Projector $projector) use ($name) {
73
            return $projector->getName() === $name;
74
        });
75
    }
76
77
    public function addReactor($reactor): Projectionist
78
    {
79
        if (is_string($reactor)) {
80
            $reactor = app($reactor);
81
        }
82
83
        if (! $reactor instanceof EventHandler) {
84
            throw InvalidEventHandler::notAnEventHandler($reactor);
85
        }
86
87
        $this->reactors->add($reactor);
88
89
        return $this;
90
    }
91
92
    public function addReactors(array $reactors): Projectionist
93
    {
94
        foreach ($reactors as $reactor) {
95
            $this->addReactor($reactor);
96
        }
97
98
        return $this;
99
    }
100
101
    public function getReactors(): Collection
102
    {
103
        return $this->reactors->all();
104
    }
105
106
    public function storeEvent(ShouldBeStored $event)
107
    {
108
        $storedEvent = $this->config['stored_event_model']::createForEvent($event);
109
110
        $this->handleImmediately($storedEvent);
111
112
        dispatch(new HandleStoredEventJob($storedEvent))
113
            ->onQueue($this->config['queue']);
114
    }
115
116
    public function handle(StoredEvent $storedEvent)
117
    {
118
        $this->applyStoredEventToProjectors(
119
            $storedEvent,
120
            $this->projectors->forEvent($storedEvent)
121
        );
122
123
        $this->applyStoredEventToReactors(
124
            $storedEvent,
125
            $this->reactors->forEvent($storedEvent)
126
        );
127
    }
128
129
    public function handleImmediately(StoredEvent $storedEvent)
130
    {
131
        $projectors = $this->projectors
132
            ->forEvent($storedEvent)
133
            ->filter(function (Projector $projector) {
134
                return $projector->shouldBeCalledImmediately();
135
            });
136
137
        $this->applyStoredEventToProjectors($storedEvent, $projectors);
138
    }
139
140
    protected function applyStoredEventToProjectors(StoredEvent $storedEvent, Collection $projectors)
141
    {
142
        foreach ($projectors as $projector) {
143
            if ($projector->hasAlreadyReceivedEvent($storedEvent)) {
144
                continue;
145
            }
146
147
            if (! $projector->hasReceivedAllPriorEvents($storedEvent)) {
148
                event(new ProjectorDidNotHandlePriorEvents($projector, $storedEvent));
149
150
                $projector->markAsNotUpToDate($storedEvent);
151
152
                continue;
153
            }
154
155
            if (! $this->callEventHandler($projector, $storedEvent)) {
156
                continue;
157
            }
158
159
            $projector->rememberReceivedEvent($storedEvent);
160
        }
161
    }
162
163
    protected function applyStoredEventToReactors(StoredEvent $storedEvent, Collection $reactors)
164
    {
165
        foreach ($reactors as $reactor) {
166
            $this->callEventHandler($reactor, $storedEvent);
167
        }
168
    }
169
170
    protected function callEventHandler(EventHandler $eventHandler, StoredEvent $storedEvent): bool
171
    {
172
        try {
173
            $eventHandler->handle($storedEvent);
174
        } catch (Exception $exception) {
175
            if (! $this->config['catch_exceptions']) {
176
                throw $exception;
177
            }
178
179
            $eventHandler->handleException($exception);
180
181
            event(new EventHandlerFailedHandlingEvent($eventHandler, $storedEvent, $exception));
182
183
            return false;
184
        }
185
186
        return true;
187
    }
188
189
    public function isReplayingEvents(): bool
190
    {
191
        return $this->isReplayingEvents;
192
    }
193
194
    public function replayEvents(Collection $projectors, int $afterStoredEventId = 0, callable $onEventReplayed = null)
195
    {
196
        $this->isReplayingEvents = true;
197
198
        event(new StartingEventReplay($projectors));
199
200
        $this->projectors->call('onStartingEventReplay');
201
202
        StoredEvent::query()
203
            ->after($afterStoredEventId ?? 0)
204
            ->chunk($this->config['replay_chunk_size'], function (Collection $storedEvents) use ($projectors, $onEventReplayed) {
205
                $storedEvents->each(function (StoredEvent $storedEvent) use ($projectors, $onEventReplayed) {
206
                    $this->applyStoredEventToProjectors($storedEvent, $this->projectors->all());
207
208
                    if ($onEventReplayed) {
209
                        $onEventReplayed($storedEvent);
210
                    }
211
                });
212
            });
213
214
        $this->isReplayingEvents = false;
215
216
        event(new FinishedEventReplay());
217
218
        $this->projectors->call('onFinishedEventReplay');
219
    }
220
}
221