Passed
Push — master ( 947947...758fbd )
by Thorsten
01:57
created

UnitOfWork::prepareHistory()   B

Complexity

Conditions 5
Paths 6

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 0
cts 8
cp 0
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 8
nc 6
nop 2
crap 30
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\DomainEventSequence;
17
use Daikon\MessageBus\Metadata\Metadata;
18
19
final class UnitOfWork implements UnitOfWorkInterface
20
{
21
    private const MAX_RESOLUTION_ATTEMPTS = 5;
22
23
    /** @var string */
24
    private $aggregateRootType;
25
26
    /** @var StreamStoreInterface */
27
    private $streamStore;
28
29
    /** @var StreamProcessorInterface */
30
    private $streamProcessor;
31
32
    /** @var string */
33
    private $streamImplementor;
34
35
    /** @var StreamMap */
36
    private $trackedCommitStreams;
37
38
    /** @var int */
39
    private $maxResolutionAttempts;
40
41 2
    public function __construct(
42
        string $aggregateRootType,
43
        StreamStoreInterface $streamStore,
44
        StreamProcessorInterface $streamProcessor = null,
45
        string $streamImplementor = Stream::class,
46
        int $maxResolutionAttempts = self::MAX_RESOLUTION_ATTEMPTS
47
    ) {
48 2
        $this->aggregateRootType = $aggregateRootType;
49 2
        $this->streamStore = $streamStore;
50 2
        $this->streamProcessor = $streamProcessor;
51 2
        $this->streamImplementor = $streamImplementor;
52 2
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourcing\EventStore\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
53 2
        $this->maxResolutionAttempts = $maxResolutionAttempts;
54 2
    }
55
56 2
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequence
57
    {
58 2
        $prevStream = $this->getTrackedStream($aggregateRoot);
59 2
        $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
60 2
        $result = $this->streamStore->commit($updatedStream, $prevStream->getStreamRevision());
61 2
        $resolutionAttempts = 0;
62 2
        while ($result instanceof ConcurrencyError) {
63
            if (++$resolutionAttempts > $this->maxResolutionAttempts) {
64
                throw new ConcurrencyRaceLost($prevStream->getStreamId(), $aggregateRoot->getTrackedEvents());
65
            }
66
            $prevStream = $this->streamStore->checkout($updatedStream->getStreamId());
67
            $conflictingEvents = $this->getConflicts($aggregateRoot, $prevStream);
68
            if (!$conflictingEvents->isEmpty()) {
69
                throw new UnresolvableConflict($prevStream->getStreamId(), $conflictingEvents);
70
            }
71
            $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
72
            $result = $this->streamStore->commit($updatedStream, $prevStream->getStreamRevision());
73
        }
74 2
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($prevStream->getStreamId());
75 2
        return $updatedStream->getCommitRange($prevStream->getStreamRevision(), $updatedStream->getStreamRevision());
76
    }
77
78
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
79
    {
80
        /** @var $streamId StreamId */
81
        $streamId = StreamId::fromNative($aggregateId->toNative());
82
        $stream = $this->streamStore->checkout($streamId, $revision);
83
        $aggregateRoot = call_user_func(
84
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
85
            $aggregateId,
86
            $this->prepareHistory($stream, $revision)
87
        );
88
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
89
        return $aggregateRoot;
90
    }
91
92 2
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
93
    {
94 2
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
95 2
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
96 2
        if ($this->trackedCommitStreams->has((string)$streamId)) {
97
            $stream = $this->trackedCommitStreams->get((string)$streamId);
98 2
        } elseif ($tailRevision->isInitial()) {
99 2
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
100 2
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
101
        } else {
102
            throw new \Exception("AggregateRoots must be checked out before they may be committed.");
103
        }
104 2
        return $stream;
105
    }
106
107
    private function prepareHistory(StreamInterface $stream, AggregateRevision $targetRevision): DomainEventSequence
108
    {
109
        $stream = $this->streamProcessor ? $this->streamProcessor->process($stream) : $stream;
110
        $history = DomainEventSequence::makeEmpty();
111
        foreach ($stream as $commit) {
112
            if (!$targetRevision->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($targetRevision)) {
113
                break;
114
            }
115
            $history = $history->append($commit->getEventLog());
116
        }
117
        return $history;
118
    }
119
120
    private function getConflicts(AggregateRootInterface $aggregateRoot, StreamInterface $stream): DomainEventSequence
121
    {
122
        $conflictingEvents = DomainEventSequence::makeEmpty();
123
        $prevCommits = $stream->findCommitsSince($aggregateRoot->getRevision());
124
        foreach ($prevCommits as $previousCommit) {
125
            foreach ($previousCommit->getEventLog() as $previousEvent) {
126
                foreach ($aggregateRoot->getTrackedEvents() as $newEvent) {
127
                    if ($newEvent->conflictsWith($previousEvent)) {
128
                        $conflictingEvents = $conflictingEvents->push($newEvent);
129
                    }
130
                }
131
            }
132
        }
133
        return $conflictingEvents;
134
    }
135
}
136