MessageRepository   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 65
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 36
c 1
b 0
f 0
dl 0
loc 65
rs 10
wmc 7

4 Methods

Rating   Name   Duplication   Size   Complexity  
A retrieveAll() 0 11 2
A retrieveAllAfterVersion() 0 12 2
A __construct() 0 7 1
A persist() 0 15 2
1
<?php
2
3
namespace Chocofamily\LaravelEventSauce;
4
5
use EventSauce\EventSourcing\AggregateRootId;
6
use EventSauce\EventSourcing\Header;
7
use EventSauce\EventSourcing\Message;
8
use EventSauce\EventSourcing\MessageRepository as EventSauceMessageRepository;
9
use EventSauce\EventSourcing\Serialization\MessageSerializer;
10
use Generator;
11
use Illuminate\Database\Connection;
12
use Ramsey\Uuid\Uuid;
13
14
class MessageRepository implements EventSauceMessageRepository
15
{
16
    /** @var Connection */
17
    protected $connection;
18
19
    /** @var string */
20
    protected $table;
21
22
    /** @var MessageSerializer */
23
    protected $serializer;
24
25
    public function __construct(Connection $connection, string $table, MessageSerializer $serializer)
26
    {
27
        $this->connection = $connection;
28
29
        $this->table = $table;
30
31
        $this->serializer = $serializer;
32
    }
33
34
    public function persist(Message ...$messages)
35
    {
36
        foreach ($messages as $message) {
37
            $serializedMessage = $this->serializer->serializeMessage($message);
38
            $headers = $serializedMessage['headers'];
39
40
            $this->connection
41
                ->table($this->table)
42
                ->insert([
43
                    'event_id'                  =>  $headers[Header::EVENT_ID] ?? Uuid::uuid4()->toString(),
44
                    'event_type'                =>  $headers[Header::EVENT_TYPE],
45
                    'aggregate_root_id'         =>  $headers[Header::AGGREGATE_ROOT_ID] ?? null,
46
                    'aggregate_root_version'    =>  $headers[Header::AGGREGATE_ROOT_VERSION] ?? null,
47
                    'recorded_at'               =>  $headers[Header::TIME_OF_RECORDING],
48
                    'payload'                   =>  json_encode($serializedMessage),
49
                ]);
50
        }
51
    }
52
53
    public function retrieveAll(AggregateRootId $id): Generator
54
    {
55
        $payloads = $this->connection
56
            ->table($this->table)
57
            ->select('payload')
58
            ->where('aggregate_root_id', $id->toString())
59
            ->orderBy('recorded_at')
60
            ->get();
61
62
        foreach ($payloads as $payload) {
63
            yield from $this->serializer->unserializePayload(json_decode($payload->payload, true));
64
        }
65
    }
66
67
    public function retrieveAllAfterVersion(AggregateRootId $id, int $aggregateRootVersion): Generator
68
    {
69
        $payloads = $this->connection
70
            ->table($this->table)
71
            ->select('payload')
72
            ->where('aggregate_root_id', $id->toString())
73
            ->where('aggregate_root_version', '>', $aggregateRootVersion)
74
            ->orderBy('recorded_at')
75
            ->get();
76
77
        foreach ($payloads as $payload) {
78
            yield from $this->serializer->unserializePayload(json_decode($payload->payload, true));
79
        }
80
    }
81
}
82