Stream::toNative()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 0
dl 0
loc 6
ccs 0
cts 2
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/event-sourcing project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\EventSourcing\EventStore\Stream;
10
11
use Daikon\EventSourcing\Aggregate\AggregateId;
12
use Daikon\EventSourcing\Aggregate\AggregateIdInterface;
13
use Daikon\EventSourcing\Aggregate\AggregateRevision;
14
use Daikon\EventSourcing\Aggregate\Event\DomainEventSequenceInterface;
15
use Daikon\EventSourcing\EventStore\Commit\Commit;
16
use Daikon\EventSourcing\EventStore\Commit\CommitInterface;
17
use Daikon\EventSourcing\EventStore\Commit\CommitSequence;
18
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
19
use Daikon\Interop\Assertion;
20
use Daikon\Metadata\MetadataInterface;
21
use Ds\Vector;
22
23
final class Stream implements StreamInterface
24
{
25
    private AggregateIdInterface $aggregateId;
26
27
    private CommitSequenceInterface $commitSequence;
28
29
    private string $commitImplementor;
30
31
    public static function fromAggregateId(
32
        AggregateIdInterface $aggregateId,
33
        string $commitImplementor = Commit::class
34
    ): self {
35
        return new self($aggregateId);
36
    }
37
38
    /** @param array $state */
39
    public static function fromNative($state): self
40
    {
41
        Assertion::keyExists($state, 'aggregateId');
42
        Assertion::keyExists($state, 'commitSequence');
43
44
        return new self(
45
            AggregateId::fromNative($state['aggregateId']),
46
            CommitSequence::fromNative($state['commitSequence']),
47
            $state['commitImplementor'] ?? null
48
        );
49
    }
50
51
    public function getAggregateId(): AggregateIdInterface
52
    {
53
        return $this->aggregateId;
54
    }
55
56
    public function getHeadSequence(): Sequence
57
    {
58
        if ($this->isEmpty()) {
59
            return Sequence::makeInitial();
60
        }
61
        return $this->getHead()->getSequence();
62
    }
63
64
    public function getHeadRevision(): AggregateRevision
65
    {
66
        if ($this->isEmpty()) {
67
            return AggregateRevision::makeEmpty();
68
        }
69
        return $this->getHead()->getHeadRevision();
70
    }
71
72
    public function appendEvents(DomainEventSequenceInterface $eventLog, MetadataInterface $metadata): self
73
    {
74
        return $this->appendCommit(
75
            ([$this->commitImplementor, 'make'])(
76
                $this->aggregateId,
77
                $this->getHeadSequence()->increment(),
78
                $eventLog,
79
                $metadata
80
            )
81
        );
82
    }
83
84
    public function appendCommit(CommitInterface $commit): self
85
    {
86
        $stream = clone $this;
87
        $stream->commitSequence = $this->commitSequence->push($commit);
88
        return $stream;
89
    }
90
91
    public function getHead(): CommitInterface
92
    {
93
        return $this->commitSequence->getHead();
94
    }
95
96
    public function getCommitRange(Sequence $fromRev, Sequence $toRev = null): CommitSequenceInterface
97
    {
98
        return $this->commitSequence->getSlice($fromRev, $toRev ?? $this->getHeadSequence());
99
    }
100
101
    public function count(): int
102
    {
103
        return $this->commitSequence->count();
104
    }
105
106
    public function isEmpty(): bool
107
    {
108
        return $this->commitSequence->isEmpty();
109
    }
110
111
    public function toNative(): array
112
    {
113
        return [
114
            'commitSequence' => $this->commitSequence->toNative(),
115
            'aggregateId' => $this->aggregateId->toNative(),
116
            'commitImplementor' => $this->commitImplementor
117
        ];
118
    }
119
120
    public function getIterator(): Vector
121
    {
122
        return $this->commitSequence->getIterator();
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->commitSequence->getIterator() returns the type Traversable which includes types incompatible with the type-hinted return Ds\Vector.
Loading history...
123
    }
124
125
    public function findCommitsSince(AggregateRevision $incomingRevision): CommitSequenceInterface
126
    {
127
        $previousCommits = [];
128
        /** @var CommitInterface $commit */
129
        foreach ($this as $commit) {
130
            if ($commit->getTailRevision()->isGreaterThanOrEqual($incomingRevision)) {
131
                $previousCommits[] = clone $commit;
132
            }
133
        }
134
        return new CommitSequence($previousCommits);
135
    }
136
137
    private function __construct(
138
        AggregateIdInterface $aggregateId,
139
        CommitSequenceInterface $commitSequence = null,
140
        string $commitImplementor = null
141
    ) {
142
        $this->aggregateId = $aggregateId;
143
        $this->commitSequence = $commitSequence ?? CommitSequence::makeEmpty();
144
        $this->commitImplementor = $commitImplementor ?? Commit::class;
145
    }
146
}
147