Completed
Push — master ( 119f00...f45d38 )
by Filipe
04:26
created

Projectionist::eventStream()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 23

Duplication

Lines 11
Ratio 47.83 %

Importance

Changes 0
Metric Value
dl 11
loc 23
c 0
b 0
f 0
rs 8.9297
cc 6
nc 6
nop 1
1
<?php
2
3
/**
4
 * This file is part of slick/cqrs-tools
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 */
9
10
namespace Slick\CQRSTools\Application\Projection;
11
12
use DateTimeImmutable;
13
use Exception;
14
use Slick\CQRSTools\Domain\Event\EventStore;
15
use Slick\CQRSTools\Domain\Event\EventStream;
16
use Slick\CQRSTools\Domain\Projection\ProjectorState;
17
use Slick\CQRSTools\Domain\Projection\ProjectorStateLedger;
18
19
/**
20
 * Projectionist
21
 *
22
 * @package Slick\CQRSTools\Application\Projection
23
*/
24
final class Projectionist
25
{
26
    /**
27
     * @var EventStore
28
     */
29
    private $eventStore;
30
31
    /**
32
     * @var ProjectorStateLedger
33
     */
34
    private $ledger;
35
36
    /**
37
     * @var EventHandlingStrategy
38
     */
39
    private $strategy;
40
41
    /**
42
     * Creates a Projectionist
43
     *
44
     * @param EventStore            $eventStore
45
     * @param ProjectorStateLedger  $ledger
46
     * @param EventHandlingStrategy $strategy
47
     */
48
    public function __construct(EventStore $eventStore, ProjectorStateLedger $ledger, EventHandlingStrategy $strategy)
49
    {
50
        $this->eventStore = $eventStore;
51
        $this->ledger = $ledger;
52
        $this->strategy = $strategy;
53
    }
54
55
    /**
56
     * Plays provided list of projectors
57
     *
58
     * @param Projector[] $projectors
59
     * @throws Exception
60
     */
61 View Code Duplication
    public function play(array $projectors): void
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
62
    {
63
        foreach ($projectors as $projector) {
64
            $projectorState = $this->ledger->get($projector);
65
66
            // Play only when projector is not halted or being processed by other agent
67
            if ($projectorState->isHalted() || $projectorState->isProjecting()) {
68
                continue;
69
            }
70
71
            // If it's not halted, it already ran.
72
            if ($projectorState->projectorRunsFrom() === Projector::RUN_ONCE) {
73
                continue;
74
            }
75
76
            $this->project($projector, $projectorState);
77
        }
78
    }
79
80
    /**
81
     * Boots provided list of projectors
82
     *
83
     * @param Projector[] $projectors
84
     * @throws Exception
85
     */
86 View Code Duplication
    public function boot(array $projectors): void
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
87
    {
88
        foreach ($projectors as $projector) {
89
            $projectorState = $this->ledger->get($projector);
90
91
            // Play only when projector is not halted or being processed by other agent
92
            if (!$projectorState->isHalted() || $projectorState->isProjecting()) {
93
                continue;
94
            }
95
96
            $this->project($projector, $projectorState);
97
        }
98
    }
99
100
    /**
101
     * Retires provided list of projectors
102
     *
103
     * @param Projector[] $projectors
104
     */
105
    public function retire(array $projectors): void
106
    {
107
        foreach ($projectors as $projector) {
108
            $projectorState = $this->ledger->get($projector);
109
            $projectorState->halt("Projector was retired.");
110
            $projector->retire();
111
            $this->releaseProjector($projectorState);
112
        }
113
    }
114
115
    /**
116
     * Project stream into provided projector
117
     *
118
     * @param Projector      $projector
119
     * @param ProjectorState $projectorState
120
     *
121
     * @throws Exception
122
     */
123
    private function project(Projector $projector, ProjectorState $projectorState): void
124
    {
125
        $eventStream = $this->eventStream($projectorState);
126
        if ($eventStream->isEmpty()) {
127
            return;
128
        }
129
130
        $this->secureProjector($projectorState);
131
        foreach ($eventStream as $event) {
132
            try {
133
                $this->strategy->handle($event, $projector);
134
                $projectorState->lastEventWas($event);
135
            } catch (Exception $caught) {
136
                $projectorState->halt($caught->getMessage().' EventId: '.$event->eventId());
137
                break;
138
            }
139
        }
140
        $this->releaseProjector($projectorState);
141
    }
142
143
    /**
144
     * Retrieve the event stream for provided projector state
145
     *
146
     * @param ProjectorState $projectorState
147
     *
148
     * @return EventStream
149
     * @throws Exception
150
     */
151
    private function eventStream(ProjectorState $projectorState): EventStream
152
    {
153
        switch ($projectorState->projectorRunsFrom()) {
154 View Code Duplication
            case Projector::RUN_FROM_NOW:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
155
                $date = new DateTimeImmutable('now');
156
                $eventStream = !$projectorState->lastEvent()
157
                    ? $this->eventStore->allStoredEventsSince($date)
158
                    : $this->eventStore->allStoredEventsSince($projectorState->lastEvent());
159
                break;
160
161 View Code Duplication
            case Projector::RUN_FROM_BEGINNING:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
                $eventStream = $projectorState->lastEvent()
163
                    ? $this->eventStore->allStoredEventsSince($projectorState->lastEvent())
164
                    : $this->eventStore->allStoredEvents();
165
                break;
166
167
            case Projector::RUN_ONCE:
168
            default:
169
                $eventStream = $this->eventStore->allStoredEvents();
170
        }
171
172
        return $eventStream;
173
    }
174
175
    /**
176
     * Secure projector to avoid duplications
177
     *
178
     * @param ProjectorState $projectorState
179
     */
180
    private function secureProjector(ProjectorState $projectorState): void
181
    {
182
        $projectorState->startProjecting();
183
        $this->ledger->update($projectorState);
184
    }
185
186
    /**
187
     * Releases the projector so that it can be played by other agent
188
     *
189
     * @param ProjectorState $projectorState
190
     */
191
    private function releaseProjector(ProjectorState $projectorState): void
192
    {
193
        $projectorState->stopProjecting();
194
        $this->ledger->update($projectorState);
195
    }
196
}
197