Passed
Push — master ( 33b14d...e7440c )
by Thorsten
01:45
created

UnitOfWork::buildEventHistory()   B

Complexity

Conditions 6
Paths 4

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
dl 0
loc 16
c 0
b 0
f 0
ccs 0
cts 10
cp 0
rs 8.8571
cc 6
eloc 10
nc 4
nop 2
crap 42
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore;
12
13
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
14
use Daikon\EventSourcing\Aggregate\AggregateRevision;
15
use Daikon\EventSourcing\Aggregate\AggregateRootInterface;
16
use Daikon\EventSourcing\Aggregate\DomainEventSequence;
17
use Daikon\MessageBus\Metadata\Metadata;
18
19
final class UnitOfWork implements UnitOfWorkInterface
20
{
21
    /** @var string */
22
    private $aggregateRootType;
23
24
    /** @var StreamStoreInterface */
25
    private $streamStore;
26
27
    /** @var StreamProcessorInterface */
28
    private $streamProcessor;
29
30
    /** @var string */
31
    private $streamImplementor;
32
33
    /** @var StreamMap */
34
    private $trackedCommitStreams;
35
36 2
    public function __construct(
37
        string $aggregateRootType,
38
        StreamStoreInterface $streamStore,
39
        StreamProcessorInterface $streamProcessor = null,
40
        string $streamImplementor = Stream::class
41
    ) {
42 2
        $this->aggregateRootType = $aggregateRootType;
43 2
        $this->streamStore = $streamStore;
44 2
        $this->streamProcessor = $streamProcessor;
45 2
        $this->streamImplementor = $streamImplementor;
46 2
        $this->trackedCommitStreams = StreamMap::makeEmpty();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Daikon\EventSourcing\Ev...\StreamMap::makeEmpty() of type object<self> is incompatible with the declared type object<Daikon\EventSourcing\EventStore\StreamMap> of property $trackedCommitStreams.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
47 2
    }
48
49 2
    public function commit(AggregateRootInterface $aggregateRoot, Metadata $metadata): CommitSequence
50
    {
51 2
        $stream = $this->getStream($aggregateRoot)->appendEvents($aggregateRoot->getTrackedEvents(), $metadata);
52 2
        $knownHead = $stream->getStreamRevision();
53 2
        $result = $this->streamStore->commit($stream, $knownHead);
54 2
        if ($result instanceof StoreError) {
55
            $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
56
            throw new \Exception("Failed to store commit-stream with stream-id: ".$stream->getStreamId());
57
        }
58 2
        return $stream->getCommitRange($knownHead, $stream->getStreamRevision());
59
    }
60
61
    public function checkout(AggregateIdInterface $aggregateId, AggregateRevision $revision): AggregateRootInterface
62
    {
63
        $streamId = StreamId::fromNative($aggregateId->toNative());
64
        $stream = $this->streamStore->checkout($streamId, $revision);
0 ignored issues
show
Compatibility introduced by
$streamId of type object<Daikon\Entity\Val...t\ValueObjectInterface> is not a sub-type of object<Daikon\EventSourcing\EventStore\StreamId>. It seems like you assume a concrete implementation of the interface Daikon\Entity\ValueObject\ValueObjectInterface to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
65
        $aggregateRoot = call_user_func(
66
            [ $this->aggregateRootType, 'reconstituteFromHistory' ],
67
            $aggregateId,
68
            $this->buildEventHistory($stream)
0 ignored issues
show
Bug introduced by
The call to buildEventHistory() misses a required argument $to.

This check looks for function calls that miss required arguments.

Loading history...
69
        );
70
        $this->trackedCommitStreams = $this->trackedCommitStreams->register($stream);
71
        return $aggregateRoot;
72
    }
73
74 2
    private function getStream(AggregateRootInterface $aggregateRoot): StreamInterface
75
    {
76 2
        $streamId = StreamId::fromNative((string)$aggregateRoot->getIdentifier());
77 2
        $tailRevision = $aggregateRoot->getTrackedEvents()->getTailRevision();
78 2
        if ($this->trackedCommitStreams->has((string)$streamId)) {
79
            $stream = $this->trackedCommitStreams->get((string)$streamId);
80
            $this->trackedCommitStreams = $this->trackedCommitStreams->unregister($stream);
81 2
        } elseif ($tailRevision->isInitial()) {
82 2
            $stream = call_user_func([ $this->streamImplementor, 'fromStreamId' ], $streamId);
83
        } else {
84
            throw new \Exception("Existing aggregate-roots must be checked out before they may be comitted.");
85
        }
86 2
        return $stream;
87
    }
88
89
    private function buildEventHistory(StreamInterface $stream, AggregateRevision $to): DomainEventSequence
90
    {
91
        $history = DomainEventSequence::makeEmpty();
92
        if ($this->streamProcessor) {
93
            $stream = $this->streamProcessor->process($stream);
94
        }
95
        foreach ($stream as $commit) {
96
            if (!$to->isEmpty() && $commit->getAggregateRevision()->isGreaterThan($to)) {
97
                break;
98
            }
99
            foreach ($commit->getEventLog() as $event) {
100
                $history = $history->push($event);
101
            }
102
        }
103
        return $history;
104
    }
105
}
106