Passed
Push — master ( 8d33c9...a5787e )
by Thorsten
01:46
created

UnitOfWork   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 39.34%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 11
dl 0
loc 122
ccs 24
cts 61
cp 0.3934
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 14 1
A commit() 0 21 4
A checkout() 0 13 1
A getTrackedStream() 0 14 3
B buildEventHistory() 0 16 6
B detectConflictingEvents() 0 16 5
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\DomainEventSequence;
17
use Daikon\MessageBus\Metadata\Metadata;
18
19
final class UnitOfWork implements UnitOfWorkInterface
20
{
21
    private const MAX_RESOLUTION_ATTEMPTS = 5;
22
23
    /** @var string */
24
    private $aggregateRootType;
25
26
    /** @var StreamStoreInterface */
27
    private $streamStore;
28
29
    /** @var StreamProcessorInterface */
30
    private $streamProcessor;
31
32
    /** @var string */
33
    private $streamImplementor;
34
35
    /** @var StreamMap */
36
    private $trackedCommitStreams;
37
38
    /** @var int */
39
    private $maxResolutionAttempts;
40
41 2
    public function __construct(
42
        string $aggregateRootType,
43
        StreamStoreInterface $streamStore,
44
        StreamProcessorInterface $streamProcessor = null,
45
        string $streamImplementor = Stream::class,
46
        int $maxResolutionAttempts = self::MAX_RESOLUTION_ATTEMPTS
47
    ) {
48 2
        $this->aggregateRootType = $aggregateRootType;
49 2
        $this->streamStore = $streamStore;
50 2
        $this->streamProcessor = $streamProcessor;
51 2
        $this->streamImplementor = $streamImplementor;
52 2
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourcing\EventStore\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
53 2
        $this->maxResolutionAttempts = $maxResolutionAttempts;
54 2
    }
55
56 2
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequence
57
    {
58 2
        $prevStream = $this->getTrackedStream($aggregateRoot);
59 2
        $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
60 2
        $result = $this->streamStore->commit($updatedStream, $prevStream->getStreamRevision());
61 2
        $resolutionAttempts = 0;
62 2
        while ($result instanceof PossibleConflict) {
63
            $conflictingStream = $this->streamStore->checkout($updatedStream->getStreamId());
64
            $conflictingEvents = $this->detectConflictingEvents($aggregateRoot, $conflictingStream);
65
            if (!$conflictingEvents->isEmpty()) {
66
                throw new UnresolvableConflict($conflictingStream->getStreamId(), $conflictingEvents);
67
            }
68
            $resolvedStream = $conflictingStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
69
            $result = $this->streamStore->commit($resolvedStream, $conflictingStream->getStreamRevision());
70
            if (++$resolutionAttempts >= $this->maxResolutionAttempts) {
71
                throw new ConcurrencyRaceLost($conflictingStream->getStreamId(), $aggregateRoot->getTrackedEvents());
72
            }
73
        }
74 2
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($prevStream);
75 2
        return $updatedStream->getCommitRange($prevStream->getStreamRevision(), $updatedStream->getStreamRevision());
76
    }
77
78
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
79
    {
80
        /** @var $streamId StreamId */
81
        $streamId = StreamId::fromNative($aggregateId->toNative());
82
        $stream = $this->streamStore->checkout($streamId, $revision);
83
        $aggregateRoot = call_user_func(
84
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
85
            $aggregateId,
86
            $this->buildEventHistory($stream, $revision)
87
        );
88
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
89
        return $aggregateRoot;
90
    }
91
92 2
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
93
    {
94 2
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
95 2
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
96 2
        if ($this->trackedCommitStreams->has((string)$streamId)) {
97
            $stream = $this->trackedCommitStreams->get((string)$streamId);
98 2
        } elseif ($tailRevision->isInitial()) {
99 2
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
100 2
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
101
        } else {
102
            throw new \Exception("AggregateRoots must be checked out before they may be committed.");
103
        }
104 2
        return $stream;
105
    }
106
107
    private function buildEventHistory(StreamInterface $stream, AggregateRevision $to): DomainEventSequence
108
    {
109
        $history = DomainEventSequence::makeEmpty();
110
        if ($this->streamProcessor) {
111
            $stream = $this->streamProcessor->process($stream);
112
        }
113
        foreach ($stream as $commit) {
114
            if (!$to->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($to)) {
115
                break;
116
            }
117
            foreach ($commit->getEventLog() as $event) {
118
                $history = $history->push($event);
119
            }
120
        }
121
        return $history;
122
    }
123
124
    private function detectConflictingEvents(
125
        AggregateRootInterface $aggregateRoot,
126
        StreamInterface $stream
127
    ): DomainEventSequence {
128
        $conflictingEvents = DomainEventSequence::makeEmpty();
129
        foreach ($aggregateRoot->getTrackedEvents() as $newEvent) {
130
            foreach ($stream->findCommitsSince($aggregateRoot->getRevision()) as $previousCommit) {
131
                foreach ($previousCommit->getEventLog() as $previousEvent) {
132
                    if ($newEvent->conflictsWith($previousEvent)) {
133
                        $conflictingEvents = $conflictingEvents->push($newEvent);
134
                    }
135
                }
136
            }
137
        }
138
        return $conflictingEvents;
139
    }
140
}
141