Passed
Push — v4.2 ( 50993c )
by Masiukevich
07:39
created

SqlEventStreamStore::restoreEvents()   A

Complexity

Conditions 5
Paths 3

Size

Total Lines 45
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 5.0488

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
eloc 17
c 2
b 0
f 0
nc 3
nop 2
dl 0
loc 45
ccs 14
cts 16
cp 0.875
crap 5.0488
rs 9.3888
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Common\now;
18
use function ServiceBus\Storage\Sql\deleteQuery;
19
use function ServiceBus\Storage\Sql\equalsCriteria;
20
use function ServiceBus\Storage\Sql\fetchAll;
21
use function ServiceBus\Storage\Sql\fetchOne;
22
use function ServiceBus\Storage\Sql\insertQuery;
23
use function ServiceBus\Storage\Sql\selectQuery;
24
use function ServiceBus\Storage\Sql\updateQuery;
25
use Amp\Promise;
26
use ServiceBus\EventSourcing\Aggregate;
27
use ServiceBus\EventSourcing\AggregateId;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
29
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
30
use ServiceBus\Storage\Common\BinaryDataDecoder;
31
use ServiceBus\Storage\Common\DatabaseAdapter;
32
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
33
use ServiceBus\Storage\Common\QueryExecutor;
34
35
/**
36
 *
37
 */
38
final class SqlEventStreamStore implements EventStreamStore
39
{
40
    private const STREAMS_TABLE = 'event_store_stream';
41
42
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
43
44
    /** @var DatabaseAdapter */
45
    private $adapter;
46
47 8
    public function __construct(DatabaseAdapter $adapter)
48
    {
49 8
        $this->adapter = $adapter;
50 8
    }
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
56
    {
57 7
        return $this->adapter->transactional(
58
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
59
            {
60 7
                yield from self::doSaveStream($queryExecutor, $aggregateEventStream);
61 7
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
62 7
            }
63
        );
64
    }
65
66
    /**
67
     * {@inheritdoc}
68
     */
69 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
70
    {
71 5
        return $this->adapter->transactional(
72
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
73
            {
74 5
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
75 5
            }
76
        );
77
    }
78
79
    /**
80
     * {@inheritdoc}
81
     */
82 5
    public function load(
83
        AggregateId $id,
84
        int $fromVersion = Aggregate::START_PLAYHEAD_INDEX,
85
        ?int $toVersion = null
86
    ): Promise {
87 5
        return call(
88
            function() use ($id, $fromVersion, $toVersion): \Generator
89
            {
90 5
                $aggregateEventStream = null;
91
92
                /** @var array<string, string>|null $streamData */
93 5
                $streamData = yield from self::doLoadStream($this->adapter, $id);
94
95 5
                if (null !== $streamData)
96
                {
97
                    /** @var array<int, array>|null $streamEventsData */
98 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
99 5
                        $this->adapter,
100 5
                        (string) $streamData['id'],
101
                        $fromVersion,
102
                        $toVersion
103
                    );
104
105 5
                    $aggregateEventStream = self::restoreEventStream($this->adapter, $streamData, $streamEventsData);
106
                }
107
108 5
                return $aggregateEventStream;
109 5
            }
110
        );
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function close(AggregateId $id): Promise
117
    {
118
        return call(
119
            function() use ($id): \Generator
120
            {
121
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
122
                    ->where(equalsCriteria('id', $id->toString()))
123
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
124
125
                $compiledQuery = $updateQuery->compile();
126
127
                /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
128
                yield $this->adapter->execute($compiledQuery->sql(), $compiledQuery->params());
129
            }
130
        );
131
    }
132
133
    /**
134
     * {@inheritdoc}
135
     */
136 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
137
    {
138 4
        return call(
139
            function() use ($id, $toVersion, $force): \Generator
140
            {
141
                /** @var array<string, string>|null $streamData */
142 4
                $streamData = yield from self::doLoadStream($this->adapter, $id);
143
144 4
                if (null === $streamData)
145
                {
146 1
                    throw EventStreamDoesNotExist::create($id);
147
                }
148
149
                /** @var string $streamId */
150 3
                $streamId = $streamData['id'];
151
152
                try
153
                {
154 3
                    yield $this->adapter->transactional(
155
                        static function(QueryExecutor $queryExecutor) use ($force, $streamId, $toVersion): \Generator
156
                        {
157 3
                            true === $force
158 1
                                ? yield from self::doDeleteTailEvents($queryExecutor, $streamId, $toVersion)
159 2
                                : yield from self::doSkipEvents($queryExecutor, $streamId, $toVersion);
160
161
                            /** restore soft deleted events */
162 3
                            yield from self::doRestoreEvents($queryExecutor, $streamId, $toVersion);
163 3
                        }
164
                    );
165
                }
166 1
                catch (UniqueConstraintViolationCheckFailed $exception)
167
                {
168 1
                    throw new EventStreamIntegrityCheckFailed(
169 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
170 1
                        (int) $exception->getCode(),
171
                        $exception
172
                    );
173
                }
174 4
            }
175
        );
176
    }
177
178
    /**
179
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
180
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
181
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
182
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
183
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
184
     */
185 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
186
    {
187 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
188 7
            'id'               => $eventsStream->aggregateId,
189 7
            'identifier_class' => $eventsStream->aggregateIdClass,
190 7
            'aggregate_class'  => $eventsStream->aggregateClass,
191 7
            'created_at'       => $eventsStream->createdAt,
192 7
            'closed_at'        => $eventsStream->closedAt,
193
        ]);
194
195 7
        $compiledQuery = $insertQuery->compile();
196
197
        /**
198
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
199
         *
200
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
201
         */
202 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
203
204 7
        unset($insertQuery, $compiledQuery, $resultSet);
205 7
    }
206
207
    /**
208
     * Saving events in stream.
209
     *
210
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
211
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
212
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
213
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
214
     */
215 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
216
    {
217 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
218
219 7
        if (0 !== $eventsCount)
220
        {
221
            /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
222 7
            yield $queryExecutor->execute(
223 7
                self::createSaveEventQueryString($eventsCount),
224 7
                self::collectSaveEventQueryParameters($eventsStream)
225
            );
226
        }
227 7
    }
228
229
    /**
230
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
231
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
232
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
233
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
234
     */
235 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
236
    {
237 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
238 6
            ->where(equalsCriteria('id', $id->toString()))
239 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
240
241
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
242 6
        $compiledQuery = $selectQuery->compile();
243
244
        /**
245
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
246
         *
247
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
248
         */
249 6
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
250
251
        /**
252
         * @psalm-var      array<string, string>|null $data
253
         *
254
         * @var array $data
255
         */
256 6
        $data = yield fetchOne($resultSet);
257
258 6
        return $data;
259
    }
260
261
    /**
262
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
263
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
264
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
265
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
266
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
267
     */
268 5
    private static function doLoadStreamEvents(
269
        QueryExecutor $queryExecutor,
270
        string $streamId,
271
        int $fromVersion,
272
        ?int $toVersion
273
    ): \Generator {
274
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
275 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
276 5
            ->where(field('stream_id')->eq($streamId))
277 5
            ->andWhere(field('playhead')->gte($fromVersion))
278 5
            ->andWhere(field('canceled_at')->isNull());
279
280 5
        if (null !== $toVersion && $fromVersion < $toVersion)
281
        {
282
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
283
        }
284
285
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
286 5
        $compiledQuery = $selectQuery->compile();
287
288
        /**
289
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
290
         *
291
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
292
         */
293 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
294
295
        /**
296
         * @psalm-var      array<int, array>|null $result
297
         *
298
         * @var array $result
299
         */
300 5
        $result = yield fetchAll($resultSet);
301
302 5
        return $result;
303
    }
304
305
    /**
306
     * Complete removal of the "tail" events from the database.
307
     *
308
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
309
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
310
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
311
     */
312 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
313
    {
314 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
315 1
            ->where(equalsCriteria('stream_id', $streamId))
316 1
            ->andWhere(field('playhead')->gt($toVersion));
317
318 1
        $compiledQuery = $deleteQuery->compile();
319
320
        /**
321
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
322
         *
323
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
324
         */
325 1
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
326
327 1
        unset($deleteQuery, $compiledQuery, $resultSet);
328 1
    }
329
330
    /**
331
     * Soft deletion of events following the specified version.
332
     *
333
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
334
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
335
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
336
     */
337 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
338
    {
339 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
340 2
            ->where(equalsCriteria('stream_id', $streamId))
341 2
            ->andWhere(field('playhead')->gt($toVersion));
342
343
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
344 2
        $compiledQuery = $updateQuery->compile();
345
346
        /**
347
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
348
         *
349
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
350
         */
351 2
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
352
353 2
        unset($updateQuery, $compiledQuery, $resultSet);
354 2
    }
355
356
    /**
357
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
358
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
359
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
360
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
361
     */
362 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
363
    {
364 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
365 3
            ->where(equalsCriteria('stream_id', $streamId))
366 3
            ->andWhere(field('playhead')->lte($toVersion));
367
368 3
        $compiledQuery = $updateQuery->compile();
369
370
        /**
371
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
372
         *
373
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
374
         */
375 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
376
377 3
        unset($updateQuery, $compiledQuery, $resultSet);
378 3
    }
379
380
    /**
381
     * Transform events stream array data to stored representation.
382
     *
383
     * @psalm-param array<string, string>  $streamData
384
     * @psalm-param array<int, array>|null $streamEventsData
385
     *
386
     * @throws \LogicException
387
     */
388 5
    private static function restoreEventStream(
389
        DatabaseAdapter $adapter,
390
        array $streamData,
391
        ?array $streamEventsData
392
    ): StoredAggregateEventStream {
393
        /** @psalm-var array{
394
         *     id:string,
395
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
396
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
397
         *     created_at:string,
398
         *     closed_at:string|null
399
         * } $streamData
400
         */
401
402 5
        return new StoredAggregateEventStream(
403 5
            $streamData['id'],
404 5
            $streamData['identifier_class'],
405 5
            $streamData['aggregate_class'],
406 5
            self::restoreEvents($adapter, $streamEventsData),
407 5
            $streamData['created_at'],
408 5
            $streamData['closed_at']
409
        );
410
    }
411
412
    /**
413
     * Restore events from rows.
414
     *
415
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
416
     *
417
     * @throws \LogicException
418
     *
419
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
420
     */
421 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
422
    {
423 5
        $events = [];
424
425 5
        if (true === \is_array($eventsData) && 0 !== \count($eventsData))
426
        {
427
            /**
428
             * @psalm-var array{
429
             *   id:string,
430
             *   playhead:string,
431
             *   payload:string,
432
             *   event_class:class-string,
433
             *   occured_at:string,
434
             *   recorded_at:string
435
             * } $eventRow
436
             */
437 4
            foreach ($eventsData as $eventRow)
438
            {
439 4
                $playhead = (int) $eventRow['playhead'];
440
441 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
442
443 4
                if (true === \is_string($payload))
444
                {
445 4
                    $events[$playhead] = StoredAggregateEvent::restore(
446 4
                        $eventRow['id'],
447
                        $playhead,
448
                        $payload,
449 4
                        $eventRow['event_class'],
450 4
                        $eventRow['occured_at'],
451 4
                        $eventRow['recorded_at']
452
                    );
453
454 4
                    continue;
455
                }
456
457
                throw new \LogicException(
458
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
459
                );
460
            }
461
        }
462
463
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
464
465 5
        return $events;
466
    }
467
468
    /**
469
     * Create a sql query to store events.
470
     */
471 7
    private static function createSaveEventQueryString(int $eventsCount): string
472
    {
473 7
        return \sprintf(
474
        /** @lang text */
475 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
476 7
            self::STREAM_EVENTS_TABLE,
477 7
            \implode(
478 7
                ', ',
479 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
480
            )
481
        );
482
    }
483
484
    /**
485
     * Gathering parameters for sending to a request to save events.
486
     */
487 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
488
    {
489 7
        $queryParameters = [];
490 7
        $rowSetIndex     = 0;
491
492
        /** @psalm-var array<int, string|int|float|null> $parameters */
493 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
494
        {
495 7
            foreach ($parameters as $parameter)
496
            {
497 7
                $queryParameters[$rowSetIndex] = $parameter;
498
499 7
                $rowSetIndex++;
500
            }
501
        }
502
503 7
        return $queryParameters;
504
    }
505
506
    /**
507
     * Prepare events to insert.
508
     *
509
     * @psalm-return array<int, array<int, string|int>>
510
     */
511 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
512
    {
513 7
        $eventsRows = [];
514
515 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
516
        {
517
            /** @var StoredAggregateEvent $storedAggregateEvent */
518
            $row = [
519 7
                $storedAggregateEvent->eventId,
520 7
                $eventsStream->aggregateId,
521 7
                $storedAggregateEvent->playheadPosition,
522 7
                $storedAggregateEvent->eventClass,
523 7
                \base64_encode($storedAggregateEvent->eventData),
524 7
                $storedAggregateEvent->occuredAt,
525 7
                now()->format('Y-m-d H:i:s.u'),
526
            ];
527
528 7
            $eventsRows[] = $row;
529
        }
530
531 7
        return $eventsRows;
532
    }
533
}
534