Completed
Push — master ( 7fe5eb...9bad48 )
by Beñat
08:54 queued 04:27
created

InMemoryEventStore::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Kreta package.
5
 *
6
 * (c) Beñat Espiña <[email protected]>
7
 * (c) Gorka Laucirica <[email protected]>
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
declare(strict_types=1);
14
15
namespace Kreta\SharedKernel\Infrastructure\Persistence\InMemory\EventStore;
16
17
use Kreta\SharedKernel\Domain\Model\AggregateDoesNotExistException;
18
use Kreta\SharedKernel\Domain\Model\DomainEvent;
19
use Kreta\SharedKernel\Domain\Model\DomainEventCollection;
20
use Kreta\SharedKernel\Event\EventStore;
21
use Kreta\SharedKernel\Event\Stream;
22
use Kreta\SharedKernel\Event\StreamName;
23
24
class InMemoryEventStore implements EventStore
25
{
26
    private $store;
27
28
    public function __construct()
29
    {
30
        $this->store = [];
31
    }
32
33
    public function append(Stream $stream) : void
34
    {
35
        foreach ($stream->events() as $event) {
36
            $content = [];
37
            $eventReflection = new \ReflectionClass($event);
38
            foreach ($eventReflection->getProperties() as $property) {
39
                $property->setAccessible(true);
40
                $content[$property->getName()] = $property->getValue($event);
41
            }
42
43
            $this->store[] = [
44
                'stream_name' => $stream->name()->name(),
45
                'type'        => get_class($event),
46
                'content'     => json_encode($content),
47
            ];
48
        }
49
    }
50
51
    public function streamOfName(StreamName $name) : Stream
52
    {
53
        $events = new DomainEventCollection();
54
        foreach ($this->store as $event) {
55
            if ($event['stream_name'] === $name->name()) {
56
                $events->add($this->buildEvent($event));
57
            }
58
        }
59
        if (0 === $events->count()) {
60
            throw new AggregateDoesNotExistException($name->aggregateId()->id());
61
        }
62
63
        return new Stream($name, $events);
64
    }
65
66
    public function eventsSince(?\DateTimeInterface $since, int $offset = 0, int $limit = -1) : array
67
    {
68
        $since = $since instanceof \DateTimeInterface ? $since->getTimestamp() : 0;
69
70
        $events = array_map(function (array $event) use ($since) {
71
            $domainEvent = $this->buildEvent($event);
72
            if ($domainEvent->occurredOn()->getTimestamp() >= $since) {
73
                $evenContent = json_decode($event['content'], true);
74
                $evenContent['occurredOn'] = $domainEvent->occurredOn()->getTimestamp();
75
76
                return [
77
                    'stream_name' => $event['stream_name'],
78
                    'type'        => $event['type'],
79
                    'content'     => $evenContent,
80
                ];
81
            }
82
        }, $this->store);
83
84
        return array_slice($events, $offset);
85
    }
86
87
    private function buildEvent(array $event) : DomainEvent
88
    {
89
        $eventData = json_decode($event['content']);
90
        $eventReflection = new \ReflectionClass($event['type']);
91
        $parameters = $eventReflection->getConstructor()->getParameters();
92
        $arguments = [];
93
        foreach ($parameters as $parameter) {
94
            foreach ($eventData as $key => $data) {
95
                if ($key === $parameter->getName()) {
96
                    $arguments[] = $data;
97
                }
98
            }
99
        }
100
101
        return new $event['type'](...$arguments);
102
    }
103
}
104