Test Failed
Push — master ( 91e640...7cd92a )
by Thorsten
02:02
created

UnitOfWork::commit()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 24
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 24
ccs 0
cts 23
cp 0
rs 8.6845
c 0
b 0
f 0
cc 4
eloc 18
nc 4
nop 2
crap 20
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\Event\DomainEventInterface;
17
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequence;
18
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequenceInterface;
19
use Daikon\EventSourcing\EventStore\Commit\CommitInterface;
20
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
21
use Daikon\EventSourcing\EventStore\Storage\StorageError;
22
use Daikon\EventSourcing\EventStore\Storage\StreamStorageInterface;
23
use Daikon\EventSourcing\EventStore\Stream\Stream;
24
use Daikon\EventSourcing\EventStore\Stream\StreamId;
25
use Daikon\EventSourcing\EventStore\Stream\StreamInterface;
26
use Daikon\EventSourcing\EventStore\Stream\StreamMap;
27
use Daikon\EventSourcing\EventStore\Stream\StreamProcessorInterface;
28
use Daikon\MessageBus\Metadata\Metadata;
29
30
final class UnitOfWork implements UnitOfWorkInterface
31
{
32
    private const MAX_RACE_ATTEMPTS = 5;
33
34
    /** @var string */
35
    private $aggregateRootType;
36
37
    /** @var StreamStorageInterface */
38
    private $streamStorage;
39
40
    /** @var ?StreamProcessorInterface */
41
    private $streamProcessor;
42
43
    /** @var string */
44
    private $streamImplementor;
45
46
    /** @var StreamMap */
47
    private $trackedCommitStreams;
48
49
    /** @var int */
50
    private $maxRaceAttempts;
51
52
    public function __construct(
53
        string $aggregateRootType,
54
        StreamStorageInterface $streamStorage,
55
        StreamProcessorInterface $streamProcessor = null,
56
        string $streamImplementor = Stream::class,
57
        int $maxRaceAttempts = self::MAX_RACE_ATTEMPTS
58
    ) {
59
        $this->aggregateRootType = $aggregateRootType;
60
        $this->streamStorage = $streamStorage;
61
        $this->streamProcessor = $streamProcessor;
62
        $this->streamImplementor = $streamImplementor;
63
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourc...Store\Stream\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
64
        $this->maxRaceAttempts = $maxRaceAttempts;
65
    }
66
67
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequenceInterface
68
    {
69
        $prevStream = $this->getTrackedStream($aggregateRoot);
70
        $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
71
        $result = $this->streamStorage->append($updatedStream, $prevStream->getStreamRevision());
72
        $raceCount = 0;
73
        while ($result instanceof StorageError) {
74
            if (++$raceCount > $this->maxRaceAttempts) {
75
                throw new ConcurrencyRaceLost($prevStream->getStreamId(), $aggregateRoot->getTrackedEvents());
76
            }
77
            $prevStream = $this->streamStorage->load($updatedStream->getStreamId());
78
            $conflictingEvents = $this->determineConflicts($aggregateRoot, $prevStream);
79
            if (!$conflictingEvents->isEmpty()) {
80
                throw new UnresolvableConflict($prevStream->getStreamId(), $conflictingEvents);
81
            }
82
            $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
83
            $result = $this->streamStorage->append($updatedStream, $prevStream->getStreamRevision());
84
        }
85
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($prevStream->getStreamId());
86
        return $updatedStream->getCommitRange(
87
            $prevStream->getStreamRevision()->increment(), 
88
            $updatedStream->getStreamRevision()
89
        );
90
    }
91
92
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
93
    {
94
        /** @var StreamId $streamId */
95
        $streamId = StreamId::fromNative($aggregateId->toNative());
96
        $stream = $this->streamStorage->load($streamId, $revision);
97
        if ($stream->isEmpty()) {
98
            throw new \Exception('Checking out empty streams is not supported.');
99
        }
100
        $aggregateRoot = call_user_func(
101
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
102
            $aggregateId,
103
            $this->prepareHistory(
104
                $this->streamProcessor ? $this->streamProcessor->process($stream) : $stream,
105
                $revision
106
            )
107
        );
108
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
109
        return $aggregateRoot;
110
    }
111
112
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
113
    {
114
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
115
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
116
        if ($this->trackedCommitStreams->has((string)$streamId)) {
117
            $stream = $this->trackedCommitStreams->get((string)$streamId);
118
        } elseif ($tailRevision->isInitial()) {
119
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
120
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
121
        } else {
122
            throw new \Exception('AggregateRoot must be checked out before it may be committed.');
123
        }
124
        return $stream;
125
    }
126
127
    private function prepareHistory(
128
        StreamInterface $stream,
129
        AggregateRevision $targetRevision
130
    ): DomainEventSequenceInterface {
131
        $history = DomainEventSequence::makeEmpty();
132
        /** @var CommitInterface $commit */
133
        foreach ($stream as $commit) {
134
            $history = $history->append($commit->getEventLog());
135
            if (!$targetRevision->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($targetRevision)) {
136
                break;
137
            }
138
        }
139
        return $history;
140
    }
141
142
    private function determineConflicts(
143
        AggregateRootInterface $aggregateRoot,
144
        StreamInterface $stream
145
    ): DomainEventSequenceInterface {
146
        $conflictingEvents = DomainEventSequence::makeEmpty();
147
        $prevCommits = $stream->findCommitsSince($aggregateRoot->getRevision());
148
        /** @var CommitInterface $previousCommit */
149
        foreach ($prevCommits as $previousCommit) {
150
            /** @var DomainEventInterface $previousEvent */
151
            foreach ($previousCommit->getEventLog() as $previousEvent) {
152
                /** @var DomainEventInterface $newEvent */
153
                foreach ($aggregateRoot->getTrackedEvents() as $newEvent) {
154
                    if ($newEvent->conflictsWith($previousEvent)) {
155
                        $conflictingEvents = $conflictingEvents->push($newEvent);
156
                    }
157
                }
158
            }
159
        }
160
        return $conflictingEvents;
161
    }
162
}
163