Completed
Pull Request — master (#366)
by Beñat
06:07
created

RedisEventStore   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 68
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
wmc 9
lcom 1
cbo 6
dl 0
loc 68
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A streamOfName() 0 17 3
A append() 0 17 2
A eventsSince() 0 15 2
A countStoredEventsOfStream() 0 4 1
1
<?php
2
3
/*
4
 * This file is part of the Kreta package.
5
 *
6
 * (c) Beñat Espiña <[email protected]>
7
 * (c) Gorka Laucirica <[email protected]>
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
declare(strict_types=1);
14
15
namespace Kreta\SharedKernel\Infrastructure\Persistence\Redis\EventStore;
16
17
use Kreta\SharedKernel\Domain\Model\AggregateDoesNotExistException;
18
use Kreta\SharedKernel\Domain\Model\DomainEventCollection;
19
use Kreta\SharedKernel\Event\EventStore;
20
use Kreta\SharedKernel\Event\StoredEvent;
21
use Kreta\SharedKernel\Event\Stream;
22
use Kreta\SharedKernel\Event\StreamName;
23
use Kreta\SharedKernel\Serialization\Serializer;
24
use Predis\Client;
25
26
final class RedisEventStore implements EventStore
27
{
28
    private $predis;
29
    private $serializer;
30
31
    public function __construct(Client $predis, Serializer $serializer)
32
    {
33
        $this->predis = $predis;
34
        $this->serializer = $serializer;
35
    }
36
37
    public function append(Stream $stream) : void
38
    {
39
        $order = $this->countStoredEventsOfStream($stream) + 1;
40
41
        foreach ($stream->events() as $event) {
42
            $event = new StoredEvent(
43
                $order,
44
                $stream->name(),
45
                $event
46
            );
47
48
            $this->predis->rpush(
49
                $stream->name()->name(),
50
                $this->serializer->serialize($event)
51
            );
52
        }
53
    }
54
55
    public function streamOfName(StreamName $name) : Stream
56
    {
57
        if (!$this->predis->exists($name->name())) {
58
            throw new AggregateDoesNotExistException($name->aggregateId()->id());
59
        }
60
61
        $serializedEvents = $this->predis->lrange($name->name(), 0, -1);
62
63
        $events = new DomainEventCollection();
64
        foreach ($serializedEvents as $serializedEvent) {
65
            $events->add(
66
                $this->serializer->deserialize($serializedEvent)
67
            );
68
        }
69
70
        return new Stream($name, $events);
71
    }
72
73
    public function eventsSince(?\DateTimeInterface $since, int $offset = 0, int $limit = -1) : array
74
    {
75
        $since = null === $since ? 0 : $since->getTimestamp();
76
        $keys = $this->predis->keys('*');
77
78
        $events = array_filter(call_user_func_array('array_merge', array_map(function ($key) use ($since) {
79
            return array_map(function (string $serializedEvent) {
80
                return json_decode($serializedEvent, true);
81
            }, $this->predis->lrange($key, 0, -1));
82
        }, $keys)), function (array $event) use ($since) {
83
            return $event['occurred_on'] >= $since;
84
        });
85
86
        return array_slice($events, $offset, $limit);
87
    }
88
89
    private function countStoredEventsOfStream(Stream $stream) : int
90
    {
91
        return count($this->predis->lrange($stream->name()->name(), 0, -1));
92
    }
93
}
94