Projectionist::project()   A
last analyzed

Complexity

Conditions 5
Paths 8

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 25
c 0
b 0
f 0
rs 9.2088
cc 5
nc 8
nop 2
1
<?php
2
3
/**
4
 * This file is part of slick/cqrs-tools
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 */
9
10
namespace Slick\CQRSTools\Application\Projection;
11
12
use DateTimeImmutable;
13
use Exception;
14
use Slick\CQRSTools\Domain\Event\EventStore;
15
use Slick\CQRSTools\Domain\Event\EventStream;
16
use Slick\CQRSTools\Domain\Event\Stream;
17
use Slick\CQRSTools\Domain\Projection\ProjectorState;
18
use Slick\CQRSTools\Domain\Projection\ProjectorStateLedger;
19
20
/**
21
 * Projectionist
22
 *
23
 * @package Slick\CQRSTools\Application\Projection
24
*/
25
final class Projectionist implements ProgressStateProvider
26
{
27
28
    use ProgressStateProviderMethods;
29
30
    /**
31
     * @var EventStore
32
     */
33
    private $eventStore;
34
35
    /**
36
     * @var ProjectorStateLedger
37
     */
38
    private $ledger;
39
40
    /**
41
     * @var EventHandlingStrategy
42
     */
43
    private $strategy;
44
45
    /**
46
     * Creates a Projectionist
47
     *
48
     * @param EventStore            $eventStore
49
     * @param ProjectorStateLedger  $ledger
50
     * @param EventHandlingStrategy $strategy
51
     */
52
    public function __construct(EventStore $eventStore, ProjectorStateLedger $ledger, EventHandlingStrategy $strategy)
53
    {
54
        $this->eventStore = $eventStore;
55
        $this->ledger = $ledger;
56
        $this->strategy = $strategy;
57
    }
58
59
    /**
60
     * Plays provided list of projectors
61
     *
62
     * @param Projector[] $projectors
63
     * @throws Exception
64
     */
65 View Code Duplication
    public function play(array $projectors): void
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
66
    {
67
        foreach ($projectors as $projector) {
68
            $projectorState = $this->ledger->get($projector);
69
70
            // Play only when projector is not halted or being processed by other agent
71
            if ($projectorState->isHalted() || $projectorState->isProjecting()) {
72
                continue;
73
            }
74
75
            // If it's not halted, it already ran.
76
            if ($projectorState->projectorRunsFrom() === Projector::RUN_ONCE) {
77
                continue;
78
            }
79
80
            $this->project($projector, $projectorState);
81
        }
82
    }
83
84
    /**
85
     * Boots provided list of projectors
86
     *
87
     * @param Projector[] $projectors
88
     * @throws Exception
89
     */
90 View Code Duplication
    public function boot(array $projectors): void
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
91
    {
92
        foreach ($projectors as $projector) {
93
            $projectorState = $this->ledger->get($projector);
94
95
            // Play only when projector is not halted or being processed by other agent
96
            if (!$projectorState->isHalted() || $projectorState->isProjecting()) {
97
                continue;
98
            }
99
100
            $this->project($projector, $projectorState);
101
        }
102
    }
103
104
    /**
105
     * Retires provided list of projectors
106
     *
107
     * @param Projector[] $projectors
108
     */
109
    public function retire(array $projectors): void
110
    {
111
        foreach ($projectors as $projector) {
112
            $projectorState = $this->ledger->get($projector);
113
            $projectorState->halt("Projector was retired.");
114
            $projector->retire();
115
            $this->releaseProjector($projectorState);
116
        }
117
    }
118
119
    /**
120
     * Project stream into provided projector
121
     *
122
     * @param Projector      $projector
123
     * @param ProjectorState $projectorState
124
     *
125
     * @throws Exception
126
     */
127
    private function project(Projector $projector, ProjectorState $projectorState): void
128
    {
129
        $eventStream = $this->eventStream($projectorState);
130
        if ($eventStream->isEmpty()) {
131
            return;
132
        }
133
134
        $this->secureProjector($projectorState);
135
        $this->notifyMaxSteps($eventStream->count());
136
137
        foreach ($eventStream as $event) {
138
            try {
139
                if ($this->strategy->handle($event, $projector)) {
140
                    $this->notifyHandledEvent();
141
                }
142
                $projectorState->lastEventWas($event);
143
            } catch (Exception $caught) {
144
                $projectorState->halt($caught->getMessage().' EventId: '.$event->eventId());
145
                break;
146
            }
147
            $this->notifyAdvance();
148
        }
149
        $this->notifyFinish();
150
        $this->releaseProjector($projectorState);
151
    }
152
153
    /**
154
     * Retrieve the event stream for provided projector state
155
     *
156
     * @param ProjectorState $projectorState
157
     *
158
     * @return EventStream
159
     * @throws Exception
160
     */
161
    private function eventStream(ProjectorState $projectorState): Stream
162
    {
163
        switch ($projectorState->projectorRunsFrom()) {
164 View Code Duplication
            case Projector::RUN_FROM_NOW:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
165
                $date = new DateTimeImmutable('now');
166
                $eventStream = !$projectorState->lastEvent()
167
                    ? $this->eventStore->allStoredEventsSince($date)
168
                    : $this->eventStore->allStoredEventsSince($projectorState->lastEvent());
169
                break;
170
171 View Code Duplication
            case Projector::RUN_FROM_BEGINNING:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172
                $eventStream = $projectorState->lastEvent()
173
                    ? $this->eventStore->allStoredEventsSince($projectorState->lastEvent())
174
                    : $this->eventStore->allStoredEvents();
175
                break;
176
177
            case Projector::RUN_ONCE:
178
            default:
179
                $eventStream = $this->eventStore->allStoredEvents();
180
        }
181
182
        return $eventStream;
183
    }
184
185
    /**
186
     * Secure projector to avoid duplications
187
     *
188
     * @param ProjectorState $projectorState
189
     */
190
    private function secureProjector(ProjectorState $projectorState): void
191
    {
192
        $projectorState->startProjecting();
193
        $this->ledger->update($projectorState);
194
    }
195
196
    /**
197
     * Releases the projector so that it can be played by other agent
198
     *
199
     * @param ProjectorState $projectorState
200
     */
201
    private function releaseProjector(ProjectorState $projectorState): void
202
    {
203
        $projectorState->stopProjecting();
204
        $this->ledger->update($projectorState);
205
    }
206
}
207