Passed
Push — master ( a6915d...4f6218 )
by Mr
02:05
created

Stream::isEmpty()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
/**
3
 * This file is part of the daikon-cqrs/cqrs project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
declare(strict_types=1);
10
11
namespace Daikon\EventSourcing\EventStore\Stream;
12
13
use Daikon\EventSourcing\Aggregate\AggregateRevision;
14
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequenceInterface;
15
use Daikon\EventSourcing\EventStore\Commit\Commit;
16
use Daikon\EventSourcing\EventStore\Commit\CommitInterface;
17
use Daikon\EventSourcing\EventStore\Commit\CommitSequence;
18
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
19
use Daikon\MessageBus\Metadata\Metadata;
20
21
final class Stream implements StreamInterface
22
{
23
    /** @var StreamIdInterface */
24
    private $streamId;
25
26
    /** @var CommitSequenceInterface */
27
    private $commitSequence;
28
29
    /** @var string */
30
    private $commitImplementor;
31
32
    public static function fromStreamId(
33
        StreamIdInterface $streamId,
34
        string $commitImplementor = Commit::class
35
    ): StreamInterface {
36
        return new static($streamId);
37
    }
38
39
    public static function fromArray(array $streamState): Stream
40
    {
41
        return new static(
42
            StreamId::fromNative($streamState['commitStreamId']),
43
            CommitSequence::fromArray($streamState['commitStreamSequence']),
44
            $streamState['commitImplementor']
45
        );
46
    }
47
48
    public function __construct(
49
        StreamIdInterface $streamId,
50
        CommitSequenceInterface $commitSequence = null,
51
        string $commitImplementor = Commit::class
52
    ) {
53
        $this->streamId = $streamId;
54
        $this->commitSequence = $commitSequence ?? new CommitSequence;
55
        $this->commitImplementor = $commitImplementor;
56
    }
57
58
    public function getStreamId(): StreamIdInterface
59
    {
60
        return $this->streamId;
61
    }
62
63
    public function getStreamRevision(): StreamRevision
64
    {
65
        return StreamRevision::fromNative($this->commitSequence->getLength());
66
    }
67
68
    public function getAggregateRevision(): AggregateRevision
69
    {
70
        $head = $this->commitSequence->getHead();
71
        return $head ? $head->getAggregateRevision() : AggregateRevision::makeEmpty();
72
    }
73
74
    public function appendEvents(DomainEventSequenceInterface $eventLog, Metadata $metadata): StreamInterface
75
    {
76
        return $this->appendCommit(
77
            call_user_func(
78
                [ $this->commitImplementor, 'make' ],
79
                $this->streamId,
80
                $this->getStreamRevision()->increment(),
81
                $eventLog,
82
                $metadata
83
            )
84
        );
85
    }
86
87
    public function appendCommit(CommitInterface $commit): StreamInterface
88
    {
89
        $stream = clone $this;
90
        $stream->commitSequence = $this->commitSequence->push($commit);
91
        return $stream;
92
    }
93
94
    public function getHead(): ?CommitInterface
95
    {
96
        return $this->commitSequence->isEmpty() ? null : $this->commitSequence->getHead();
97
    }
98
99
    public function getCommitRange(StreamRevision $fromRev, StreamRevision $toRev = null): CommitSequenceInterface
100
    {
101
        return $this->commitSequence->getSlice($fromRev, $toRev ?? $this->getStreamRevision());
102
    }
103
104
    public function count(): int
105
    {
106
        return $this->commitSequence->count();
107
    }
108
109
    public function isEmpty(): bool
110
    {
111
        return $this->count() === 0;
112
    }
113
114
    public function toNative(): array
115
    {
116
        return [
117
            'commitSequence' => $this->commitSequence->toNative(),
118
            'streamId' => $this->streamId->toNative(),
119
            'commitImplementor' => $this->commitImplementor
120
        ];
121
    }
122
123
    public function getIterator(): \Traversable
124
    {
125
        return $this->commitSequence->getIterator();
126
    }
127
128
    public function findCommitsSince(AggregateRevision $incomingRevision): CommitSequenceInterface
129
    {
130
        $previousCommits = [];
131
        $prevCommit = $this->getHead();
132
        while ($prevCommit && $incomingRevision->isLessThan($prevCommit->getAggregateRevision())) {
133
            $previousCommits[] = $prevCommit;
134
            $prevCommit = $this->commitSequence->get($prevCommit->getStreamRevision()->decrement());
135
        }
136
        return new CommitSequence(array_reverse($previousCommits));
137
    }
138
}
139