Passed
Push — master ( 7f7df8...6d2aba )
by Thorsten
01:53
created

UnitOfWork::getTrackedStream()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.072

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 8
cts 10
cp 0.8
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 11
nc 3
nop 1
crap 3.072
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\DomainEventSequence;
17
use Daikon\MessageBus\Metadata\Metadata;
18
19
final class UnitOfWork implements UnitOfWorkInterface
20
{
21
    private const MAX_RESOLUTION_ATTEMPTS = 5;
22
23
    /** @var string */
24
    private $aggregateRootType;
25
26
    /** @var StreamStoreInterface */
27
    private $streamStore;
28
29
    /** @var StreamProcessorInterface */
30
    private $streamProcessor;
31
32
    /** @var string */
33
    private $streamImplementor;
34
35
    /** @var StreamMap */
36
    private $trackedCommitStreams;
37
38 2
    public function __construct(
39
        string $aggregateRootType,
40
        StreamStoreInterface $streamStore,
41
        StreamProcessorInterface $streamProcessor = null,
42
        string $streamImplementor = Stream::class
43
    ) {
44 2
        $this->aggregateRootType = $aggregateRootType;
45 2
        $this->streamStore = $streamStore;
46 2
        $this->streamProcessor = $streamProcessor;
47 2
        $this->streamImplementor = $streamImplementor;
48 2
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourcing\EventStore\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
49 2
    }
50
51 2
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequence
52
    {
53 2
        $prevStream = $this->getTrackedStream($aggregateRoot);
54 2
        $updatedStream = $prevStream->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
55 2
        $result = $this->streamStore->commit($updatedStream, $prevStream->getStreamRevision());
56 2
        $resolutionAttempts = 0;
57 2
        while ($result instanceof PossibleConflict) {
58
            $conflictingStream = $this->streamStore->checkout($updatedStream->getStreamId());
59
            $conflictingEvents = $this->detectConflictingEvents($aggregateRoot, $conflictingStream);
60
            if (!$conflictingEvents->isEmpty()) {
0 ignored issues
show
Bug introduced by
The method isEmpty cannot be called on $conflictingEvents (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
61
                throw new UnresolvableConflict($conflictingStream->getStreamId(), $conflictingEvents);
62
            }
63
            $resolvedStream = $conflictingStream->appendEvents($aggregateRoot->getTrackedEvents());
0 ignored issues
show
Bug introduced by
The call to appendEvents() misses a required argument $metadata.

This check looks for function calls that miss required arguments.

Loading history...
64
            $result = $this->streamStore->commit($resolvedStream, $conflictingStream->getStreamRevision());
65
            if (++$resolutionAttempts >= self::MAX_RESOLUTION_ATTEMPTS) {
66
                throw new ConcurrencyRaceLost($conflictingStream->getStreamId(), $aggregateRoot->getTrackedEvents());
67
            }
68
        }
69 2
        $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($prevStream);
70 2
        return $updatedStream->getCommitRange($prevStream->getStreamRevision(), $updatedStream->getStreamRevision());
71
    }
72
73
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
74
    {
75
        $streamId = StreamId::fromNative($aggregateId->toNative());
76
        $stream = $this->streamStore->checkout($streamId, $revision);
0 ignored issues
show
Compatibility introduced by
$streamId of type object<Daikon\Entity\Val...t\ValueObjectInterface> is not a sub-type of object<Daikon\EventSourcing\EventStore\StreamId>. It seems like you assume a concrete implementation of the interface Daikon\Entity\ValueObject\ValueObjectInterface to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
77
        $aggregateRoot = call_user_func(
78
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
79
            $aggregateId,
80
            $this->buildEventHistory($stream)
0 ignored issues
show
Bug introduced by
The call to buildEventHistory() misses a required argument $to.

This check looks for function calls that miss required arguments.

Loading history...
81
        );
82
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
83
        return $aggregateRoot;
84
    }
85
86 2
    private function getTrackedStream(AggregateRootInterface $aggregateRoot): StreamInterface
87
    {
88 2
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
89 2
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
90 2
        if ($this->trackedCommitStreams->has((string)$streamId)) {
91
            $stream = $this->trackedCommitStreams->get((string)$streamId);
92 2
        } elseif ($tailRevision->isInitial()) {
93 2
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
94 2
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
95
        } else {
96
            throw new \Exception("Existing aggregate-roots must be checked out before they may be comitted.");
97
        }
98 2
        return $stream;
99
    }
100
101
    private function buildEventHistory(StreamInterface $stream, AggregateRevision $to): DomainEventSequence
102
    {
103
        $history = DomainEventSequence::makeEmpty();
104
        if ($this->streamProcessor) {
105
            $stream = $this->streamProcessor->process($stream);
106
        }
107
        foreach ($stream as $commit) {
108
            if (!$to->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($to)) {
109
                break;
110
            }
111
            foreach ($commit->getEventLog() as $event) {
112
                $history = $history->push($event);
113
            }
114
        }
115
        return $history;
116
    }
117
118
    private function detectConflictingEvents(AggregateRootInterface $aggregateRoot, StreamInterface $stream): array
119
    {
120
        $conflictingEvents = [];
121
        foreach ($newEvents as $newEvent) {
0 ignored issues
show
Bug introduced by
The variable $newEvents does not exist. Did you forget to declare it?

This check marks access to variables or properties that have not been declared yet. While PHP has no explicit notion of declaring a variable, accessing it before a value is assigned to it is most likely a bug.

Loading history...
122
            foreach ($stream->findCommitsSince($aggregateRoot->getRevision()) as $previousCommit) {
123
                foreach ($previousCommit->getEventLog() as $previousEvent) {
124
                    if ($newEvent->conflictsWith($previousEvent)) {
125
                        $conflictingEvents[] = [ $previousEvent, $newEvent ];
126
                    }
127
                }
128
            }
129
        }
130
        return $conflictingEvents;
131
    }
132
}
133