Passed
Push — master ( 95a082...8dd2b6 )
by Thorsten
02:00
created

UnitOfWork::prepareHistory()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 0
cts 13
cp 0
rs 9.2
c 0
b 0
f 0
cc 4
eloc 10
nc 3
nop 2
crap 20
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\Event\DomainEventInterface;
17
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequence;
18
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequenceInterface;
19
use Daikon\EventSourcing\EventStore\Commit\CommitInterface;
20
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
21
use Daikon\EventSourcing\EventStore\Storage\StorageError;
22
use Daikon\EventSourcing\EventStore\Storage\StreamStorageInterface;
23
use Daikon\EventSourcing\EventStore\Stream\Stream;
24
use Daikon\EventSourcing\EventStore\Stream\StreamId;
25
use Daikon\EventSourcing\EventStore\Stream\StreamInterface;
26
use Daikon\EventSourcing\EventStore\Stream\StreamMap;
27
use Daikon\EventSourcing\EventStore\Stream\StreamProcessorInterface;
28
use Daikon\MessageBus\Metadata\Metadata;
29
30
final class UnitOfWork implements UnitOfWorkInterface
31
{
32
    private const MAX_RACE_ATTEMPTS = 5;
33
34
    /** @var string */
35
    private $aggregateRootType;
36
37
    /** @var StreamStorageInterface */
38
    private $streamStorage;
39
40
    /** @var ?StreamProcessorInterface */
41
    private $streamProcessor;
42
43
    /** @var string */
44
    private $streamImplementor;
45
46
    /** @var StreamMap */
47
    private $trackedCommitStreams;
48
49
    /** @var int */
50
    private $maxRaceAttempts;
51
52
    public function __construct(
53
        string $aggregateRootType,
54
        StreamStorageInterface $streamStorage,
55
        StreamProcessorInterface $streamProcessor = null,
56
        string $streamImplementor = Stream::class,
57
        int $maxRaceAttempts = self::MAX_RACE_ATTEMPTS
58
    ) {
59
        $this->aggregateRootType = $aggregateRootType;
60
        $this->streamStorage = $streamStorage;
61
        $this->streamProcessor = $streamProcessor;
62
        $this->streamImplementor = $streamImplementor;
63
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourc...Store\Stream\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
64
        $this->maxRaceAttempts = $maxRaceAttempts;
65
    }
66
67
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequenceInterface
68
    {
69
        $prevStream = $this->getTrackedStream($aggregateRoot);
70
        $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
71
        $result = $this->streamStorage->commit($updatedStream, $prevStream->getStreamRevision());
72
        $raceCount = 0;
73
        while ($result instanceof StorageError) {
74
            if (++$raceCount > $this->maxRaceAttempts) {
75
                throw new ConcurrencyRaceLost($prevStream->getStreamId(), $aggregateRoot->getTrackedEvents());
76
            }
77
            $prevStream = $this->streamStorage->checkout($updatedStream->getStreamId());
78
            $conflictingEvents = $this->getConflicts($aggregateRoot, $prevStream);
79
            if (!$conflictingEvents->isEmpty()) {
80
                throw new UnresolvableConflict($prevStream->getStreamId(), $conflictingEvents);
81
            }
82
            $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
83
            $result = $this->streamStorage->commit($updatedStream, $prevStream->getStreamRevision());
84
        }
85
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($prevStream->getStreamId());
86
        return $updatedStream->getCommitRange($prevStream->getStreamRevision(), $updatedStream->getStreamRevision());
87
    }
88
89
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
90
    {
91
        /** @var StreamId $streamId */
92
        $streamId = StreamId::fromNative($aggregateId->toNative());
93
        $stream = $this->streamStorage->checkout($streamId, $revision);
94
        $aggregateRoot = call_user_func(
95
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
96
            $aggregateId,
97
            $this->prepareHistory(
98
                $this->streamProcessor ? $this->streamProcessor->process($stream) : $stream,
99
                $revision
100
            )
101
        );
102
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
103
        return $aggregateRoot;
104
    }
105
106
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
107
    {
108
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
109
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
110
        if ($this->trackedCommitStreams->has((string)$streamId)) {
111
            $stream = $this->trackedCommitStreams->get((string)$streamId);
112
        } elseif ($tailRevision->isInitial()) {
113
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
114
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
115
        } else {
116
            throw new \Exception('AggregateRoot must be checked out before it may be committed.');
117
        }
118
        return $stream;
119
    }
120
121
    private function prepareHistory(
122
        StreamInterface $stream,
123
        AggregateRevision $targetRevision
124
    ): DomainEventSequenceInterface {
125
        $history = DomainEventSequence::makeEmpty();
126
        /** @var CommitInterface $commit */
127
        foreach ($stream as $commit) {
128
            $history = $history->append($commit->getEventLog());
129
            if (!$targetRevision->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($targetRevision)) {
130
                break;
131
            }
132
        }
133
        return $history;
134
    }
135
136
    private function getConflicts(
137
        AggregateRootInterface $aggregateRoot,
138
        StreamInterface $stream
139
    ): DomainEventSequenceInterface {
140
        $conflictingEvents = DomainEventSequence::makeEmpty();
141
        $prevCommits = $stream->findCommitsSince($aggregateRoot->getRevision());
142
        /** @var CommitInterface $previousCommit */
143
        foreach ($prevCommits as $previousCommit) {
144
            /** @var DomainEventInterface $previousEvent */
145
            foreach ($previousCommit->getEventLog() as $previousEvent) {
146
                /** @var DomainEventInterface $newEvent */
147
                foreach ($aggregateRoot->getTrackedEvents() as $newEvent) {
148
                    if ($newEvent->conflictsWith($previousEvent)) {
149
                        $conflictingEvents = $conflictingEvents->push($newEvent);
150
                    }
151
                }
152
            }
153
        }
154
        return $conflictingEvents;
155
    }
156
}
157