Completed
Pull Request — master (#366)
by Beñat
05:36
created

RedisEventStore   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 50
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 6
lcom 1
cbo 5
dl 0
loc 50
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A appendTo() 0 13 2
A __construct() 0 6 1
A streamOfName() 0 22 3
1
<?php
2
3
/*
4
 * This file is part of the Kreta package.
5
 *
6
 * (c) Beñat Espiña <[email protected]>
7
 * (c) Gorka Laucirica <[email protected]>
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
declare(strict_types=1);
14
15
namespace Kreta\SharedKernel\Infrastructure\Persistence\Redis\EventStore;
16
17
use Kreta\SharedKernel\Domain\Model\AggregateDoesNotExistException;
18
use Kreta\SharedKernel\Domain\Model\DomainEventCollection;
19
use Kreta\SharedKernel\Event\EventStore;
20
use Kreta\SharedKernel\Event\Stream;
21
use Kreta\SharedKernel\Event\StreamName;
22
use Kreta\SharedKernel\Serialization\Serializer;
23
use Predis\Client;
24
25
final class RedisEventStore implements EventStore
26
{
27
    private $predis;
28
    private $serializer;
29
    private $eventType;
30
31
    public function __construct(Client $predis, Serializer $serializer, string $eventType = null)
32
    {
33
        $this->predis = $predis;
34
        $this->serializer = $serializer;
35
        $this->eventType = $eventType;
36
    }
37
38
    public function appendTo(Stream $stream) : void
39
    {
40
        foreach ($stream->events() as $event) {
41
            $serializedEvent = $this->serializer->serialize(
42
                [
43
                    'type' => get_class($event),
44
                    'data' => $this->serializer->serialize($event),
45
                ]
46
            );
47
48
            $this->predis->rpush($stream->name()->name(), $serializedEvent);
49
        }
50
    }
51
52
    public function streamOfName(StreamName $name) : Stream
53
    {
54
        if (!$this->predis->exists($name->name())) {
55
            throw new AggregateDoesNotExistException($name->aggregateId()->id());
56
        }
57
58
        $serializedEvents = $this->predis->lrange($name->name(), 0, -1);
59
60
        $events = new DomainEventCollection();
61
        foreach ($serializedEvents as $serializedEvent) {
62
            $eventData = $this->serializer->deserialize($serializedEvent, 'array');
63
64
            $events->add(
65
                $this->serializer->deserialize(
66
                    $eventData['data'],
67
                    $eventData['type']
68
                )
69
            );
70
        }
71
72
        return new Stream($name, $events);
73
    }
74
}
75