Passed
Push — master ( 977f82...f4c7d5 )
by Mr
07:24
created

UnitOfWork::buildHistory()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 8
c 0
b 0
f 0
nc 4
nop 2
dl 0
loc 16
ccs 0
cts 15
cp 0
crap 20
rs 10
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/event-sourcing project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\EventSourcing\EventStore;
10
11
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
12
use Daikon\EventSourcing\Aggregate\AggregateRevision;
13
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
14
use Daikon\EventSourcing\Aggregate\Event\DomainEventInterface;
15
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequence;
16
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequenceInterface;
17
use Daikon\EventSourcing\EventStore\Commit\CommitInterface;
18
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
19
use Daikon\EventSourcing\EventStore\Storage\StorageError;
20
use Daikon\EventSourcing\EventStore\Storage\StreamStorageInterface;
21
use Daikon\EventSourcing\EventStore\Stream\Stream;
22
use Daikon\EventSourcing\EventStore\Stream\StreamInterface;
23
use Daikon\EventSourcing\EventStore\Stream\StreamMap;
24
use Daikon\EventSourcing\EventStore\Stream\StreamProcessorInterface;
25
use Daikon\Interop\RuntimeException;
26
use Daikon\Metadata\MetadataInterface;
27
28
final class UnitOfWork implements UnitOfWorkInterface
29
{
30
    private const MAX_RACE_ATTEMPTS = 3;
31
32
    private string $aggregateRootType;
33
34
    private StreamStorageInterface $streamStorage;
35
36
    private ?StreamProcessorInterface $streamProcessor;
37
38
    private string $streamImplementor;
39
40
    private StreamMap $trackedCommitStreams;
41
42
    private int $maxRaceAttempts;
43
44
    public function __construct(
45
        string $aggregateRootType,
46
        StreamStorageInterface $streamStorage,
47
        StreamProcessorInterface $streamProcessor = null,
48
        string $streamImplementor = Stream::class,
49
        int $maxRaceAttempts = self::MAX_RACE_ATTEMPTS
50
    ) {
51
        $this->aggregateRootType = $aggregateRootType;
52
        $this->streamStorage = $streamStorage;
53
        $this->streamProcessor = $streamProcessor;
54
        $this->streamImplementor = $streamImplementor;
55
        $this->trackedCommitStreams = StreamMap::makeEmpty();
56
        $this->maxRaceAttempts = $maxRaceAttempts;
57
    }
58
59
    public function commit(AggregateRootInterface $aggregateRoot, MetadataInterface $metadata): CommitSequenceInterface
60
    {
61
        $raceCount = 0;
62
        $previousStream = $this->getTrackedStream($aggregateRoot);
63
        $trackedEvents = $aggregateRoot->getTrackedEvents();
64
        $updatedStream = $previousStream->appendEvents($trackedEvents, $metadata);
65
        $result = $this->streamStorage->append($updatedStream, $previousStream->getHeadSequence());
66
67
        while ($result instanceof StorageError) {
68
            if (++$raceCount > $this->maxRaceAttempts) {
69
                throw new ConcurrencyRaceLost($previousStream->getAggregateId(), $aggregateRoot->getTrackedEvents());
70
            }
71
            $previousStream = $this->streamStorage->load($updatedStream->getAggregateId());
72
            $conflictingEvents = $this->determineConflicts($aggregateRoot, $previousStream);
73
            if (!$conflictingEvents->isEmpty()) {
74
                throw new UnresolvableConflict($previousStream->getAggregateId(), $conflictingEvents);
75
            }
76
            $resequencedEvents = $trackedEvents->resequence($previousStream->getHeadRevision());
77
            $updatedStream = $previousStream->appendEvents($resequencedEvents, $metadata);
78
            $result = $this->streamStorage->append($updatedStream, $previousStream->getHeadSequence());
79
        }
80
81
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($previousStream->getAggregateId());
82
83
        return $updatedStream->getCommitRange(
84
            $previousStream->getHeadSequence()->increment(),
85
            $updatedStream->getHeadSequence()
86
        );
87
    }
88
89
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
90
    {
91
        $stream = $this->streamStorage->load($aggregateId, $revision);
92
        if ($stream->isEmpty()) {
93
            throw new RuntimeException('Checking out empty streams is not supported.');
94
        }
95
        /** @var AggregateRootInterface $aggregateRoot */
96
        $aggregateRoot = ([$this->aggregateRootType, 'reconstituteFromHistory'])(
97
            $aggregateId,
98
            $this->buildHistory(
99
                $this->streamProcessor ? $this->streamProcessor->process($stream) : $stream,
100
                $revision
101
            )
102
        );
103
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
104
        return $aggregateRoot;
105
    }
106
107
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
108
    {
109
        $aggregateId = $aggregateRoot->getIdentifier();
110
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
111
        if ($this->trackedCommitStreams->has((string)$aggregateId)) {
112
            /** @var StreamInterface $stream */
113
            $stream = $this->trackedCommitStreams->get((string)$aggregateId);
114
        } elseif ($tailRevision->isInitial()) {
115
            $stream = ([$this->streamImplementor, 'fromAggregateId'])($aggregateId);
116
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
117
        } else {
118
            throw new RuntimeException('AggregateRoot must be checked out before it can be committed.');
119
        }
120
        return $stream;
121
    }
122
123
    private function buildHistory(
124
        StreamInterface $stream,
125
        AggregateRevision $targetRevision
126
    ): DomainEventSequenceInterface {
127
        $history = DomainEventSequence::makeEmpty();
128
        /** @var CommitInterface $commit */
129
        foreach ($stream as $commit) {
130
            $history = $history->append($commit->getEventLog());
131
        }
132
        if (!$targetRevision->isEmpty() && !$history->getHeadRevision()->equals($targetRevision)) {
133
            throw new RuntimeException(sprintf(
134
                'AggregateRoot cannot be reconstituted to revision %s.',
135
                (string)$targetRevision
136
            ));
137
        }
138
        return $history;
139
    }
140
141
    private function determineConflicts(
142
        AggregateRootInterface $aggregateRoot,
143
        StreamInterface $stream
144
    ): DomainEventSequenceInterface {
145
        $conflictingEvents = DomainEventSequence::makeEmpty();
146
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
147
        $previousCommits = $stream->findCommitsSince($tailRevision);
148
        /** @var CommitInterface $previousCommit */
149
        foreach ($previousCommits as $previousCommit) {
150
            /** @var DomainEventInterface $previousEvent */
151
            foreach ($previousCommit->getEventLog() as $previousEvent) {
152
                /** @var DomainEventInterface $trackedEvent */
153
                foreach ($aggregateRoot->getTrackedEvents() as $trackedEvent) {
154
                    //All events from the first conflict onwards are considered to be in conflict
155
                    if (!$conflictingEvents->isEmpty() || $trackedEvent->conflictsWith($previousEvent)) {
156
                        $conflictingEvents = $conflictingEvents->push($previousEvent);
157
                        break;
158
                    }
159
                }
160
            }
161
        }
162
        return $conflictingEvents;
163
    }
164
}
165