GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 9d9a89...73bba6 )
by Simon
08:00
created

LaravelEventStore::deleteStream()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
nc 1
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace SmoothPhp\LaravelAdapter\EventStore;
4
5
use Illuminate\Database\Connection;
6
use Illuminate\Database\DatabaseManager;
7
use Illuminate\Database\QueryException;
8
use SmoothPhp\Contracts\Domain\DomainEventStream;
9
use SmoothPhp\Contracts\Domain\DomainMessage;
10
use SmoothPhp\Contracts\EventStore\DomainEventStreamInterface;
11
use SmoothPhp\Contracts\EventStore\EventStore;
12
use SmoothPhp\Contracts\Serialization\Serializer;
13
use SmoothPhp\Domain\DateTime;
14
use SmoothPhp\EventStore\DuplicateAggregatePlayhead;
15
use SmoothPhp\EventStore\EventStreamNotFound;
16
17
/**
18
 * Class LaravelEventStore
19
 * @package SmoothPhp\LaravelAdapter\EventStore
20
 * @author Simon Bennett <[email protected]>
21
 */
22
final class LaravelEventStore implements EventStore
23
{
24
    /** @var Serializer */
25
    private $serializer;
26
27
    /** @var string */
28
    private $eventStoreTableName;
29
30
    /** @var Connection */
31
    private $db;
32
33
    /**
34
     * @param DatabaseManager $databaseManager
35
     * @param Serializer $serializer
36
     * @param string $eventStoreConnectionName
37
     * @param string $eventStoreTableName
38
     */
39
    public function __construct(
40
        DatabaseManager $databaseManager,
41
        Serializer $serializer,
42
        $eventStoreConnectionName,
43
        $eventStoreTableName
44
    ) {
45
        $this->db = $databaseManager->connection($eventStoreConnectionName);
46
        $this->serializer = $serializer;
47
        $this->eventStoreTableName = $eventStoreTableName;
48
    }
49
50
    /**
51
     * @param string $id
52
     * @return DomainEventStream
53
     * @throws EventStreamNotFound
54
     */
55
    public function load(string $id) : DomainEventStream
56
    {
57
        $rows = $this->db->table($this->eventStoreTableName)
58
                         ->select(['uuid', 'playhead', 'metadata', 'payload', 'recorded_on'])
59
                         ->where('uuid', $id)
60
                         ->orderBy('playhead', 'asc')
61
                         ->get();
62
        $events = [];
63
64
        foreach ($rows as $row) {
65
            $events[] = $this->deserializeEvent($row);
66
        }
67
68
        if (empty($events)) {
69
            throw new EventStreamNotFound(sprintf('EventStream not found for aggregate with id %s', $id));
70
        }
71
72
        return new \SmoothPhp\Domain\DomainEventStream($events);
73
    }
74
75
    /**
76
     * @param string $id
77
     * @param DomainEventStream $eventStream
78
     * @throws \PDOException
79
     * @throws \SmoothPhp\EventStore\DuplicateAggregatePlayhead
80
     * @throws \Illuminate\Database\QueryException
81
     */
82
    public function append(string $id, DomainEventStream $eventStream) : void
83
    {
84
        $this->db->reconnect();
85
        $this->db->beginTransaction();
86
87
        try {
88
            foreach ($eventStream as $domainMessage) {
89
                $this->insertEvent($this->domainMessageToArray($domainMessage));
90
            }
91
92
            $this->db->commit();
93
        } catch (QueryException $ex) {
94
            $this->db->rollBack();
95
96
            throw  $ex;
97
        }
98
    }
99
100
    /**
101
     * @param array $eventRow
102
     * @throws DuplicateAggregatePlayhead
103
     * @throws \PDOException
104
     */
105
    private function insertEvent(array $eventRow) : void
106
    {
107
        try {
108
            $this->db->table($this->eventStoreTableName)->insert($eventRow);
109
        } catch (\PDOException $ex) {
110
            if ((string)$ex->getCode() === '23000') {
111
                throw new DuplicateAggregatePlayhead($eventRow['uuid'], $eventRow['playhead'], $ex);
0 ignored issues
show
Unused Code introduced by
The call to DuplicateAggregatePlayhead::__construct() has too many arguments starting with $ex.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
112
            }
113
            throw $ex;
114
        }
115
    }
116
117
    /**
118
     * @param \stdClass
119
     * @return DomainMessage
120
     */
121
    private function deserializeEvent($row) : DomainMessage
122
    {
123
        return new \SmoothPhp\Domain\DomainMessage(
124
            $row->uuid,
125
            $row->playhead,
126
            $this->serializer->deserialize(json_decode($row->metadata, true)),
127
            $this->serializer->deserialize(json_decode($row->payload, true)),
128
            new DateTime($row->recorded_on)
129
        );
130
    }
131
132
    /**
133
     * @param string[] $eventTypes
134
     * @return int
135
     */
136
    public function getEventCountByTypes(array $eventTypes) : int
137
    {
138
        return $this->db->table($this->eventStoreTableName)
139
                        ->whereIn('type', $eventTypes)
140
                        ->count();
141
    }
142
143
    /**
144
     * @param string[] $eventTypes
145
     * @param int $take
146
     * @return \Generator
147
     */
148
    public function getEventsByType(array $eventTypes, int $take) : \Generator
149
    {
150
        $lastId = 0;
151
        do {
152
            $rows = $this->db->table(
153
                $this->db->raw("`{$this->eventStoreTableName}` FORCE INDEX (eventstore_type_index)")
154
            )
155
                             ->select(['id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on'])
156
                             ->whereIn('type', $eventTypes)
157
                             ->where('id', '>', $lastId)
158
                             ->take($take)
159
                             ->orderBy('id')
160
                             ->get();
161
            $events = [];
162
            foreach ($rows as $row) {
163
                $events[] = $this->deserializeEvent($row);
164
                $lastId = $row->id;
165
            }
166
167
            yield new \SmoothPhp\Domain\DomainEventStream($events);
168
        } while (count($rows) > 0);
169
    }
170
171
    /**
172
     * @param DomainMessage $domainMessage
173
     * @return array
174
     */
175
    private function domainMessageToArray(DomainMessage $domainMessage) : array
176
    {
177
        return [
178
            'uuid'        => (string)$domainMessage->getId(),
179
            'playhead'    => $domainMessage->getPlayHead(),
180
            'metadata'    => json_encode($this->serializer->serialize($domainMessage->getMetadata())),
181
            'payload'     => json_encode($this->serializer->serialize($domainMessage->getPayload())),
182
            'recorded_on' => $domainMessage->getRecordedOn()->format('Y-m-d H:i:s'),
183
            'type'        => $domainMessage->getType(),
184
        ];
185
    }
186
187
    /**
188
     * @param string $streamId
189
     */
190
    public function deleteStream(string $streamId) : void
191
    {
192
        $this->db->table($this->eventStoreTableName)
193
                 ->where('uuid', $streamId)
194
                 ->delete();
195
    }
196
}