Completed
Push — v3.2 ( 72f2ef...bca77e )
by Masiukevich
03:54
created

SqlEventStreamStore::append()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 33
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.0491

Importance

Changes 0
Metric Value
cc 2
eloc 13
nc 1
nop 1
dl 0
loc 33
ccs 10
cts 13
cp 0.7692
crap 2.0491
rs 9.8333
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE = 'event_store_stream';
40
41
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
42
43
    /**
44
     * @var DatabaseAdapter
45
     */
46
    private $adapter;
47
48
    /**
49
     * @param DatabaseAdapter $adapter
50
     */
51 8
    public function __construct(DatabaseAdapter $adapter)
52
    {
53 8
        $this->adapter = $adapter;
54 8
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
60
    {
61 7
        $adapter = $this->adapter;
62
63
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
64 7
        return call(
65
            static function(StoredAggregateEventStream $aggregateEventStream) use ($adapter): \Generator
66
            {
67
                /**
68
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
69
                 *
70
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
71
                 */
72 7
                $transaction = yield $adapter->transaction();
73
74
                try
75
                {
76 7
                    yield from self::doSaveStream($transaction, $aggregateEventStream);
77 7
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
78
79 7
                    yield $transaction->commit();
80
                }
81 1
                catch(\Throwable $throwable)
82
                {
83 1
                    yield $transaction->rollback();
84
85 1
                    throw $throwable;
86
                }
87
                finally
88 7
                {
89 7
                    unset($transaction);
90
                }
91 7
            },
92 7
            $aggregateEventStream
93
        );
94
    }
95
96
    /**
97
     * {@inheritdoc}
98
     */
99 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
100
    {
101 5
        $adapter = $this->adapter;
102
103
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
104 5
        return call(
105
            static function(StoredAggregateEventStream $aggregateEventStream) use ($adapter): \Generator
106
            {
107
                /**
108
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
109
                 *
110
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
111
                 */
112 5
                $transaction = yield $adapter->transaction();
113
114
                try
115
                {
116 5
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
117
118 5
                    yield $transaction->commit();
119
                }
120
                catch(\Throwable $throwable)
121
                {
122
                    yield $transaction->rollback();
123
124
                    throw $throwable;
125
                }
126
                finally
127 5
                {
128 5
                    unset($transaction);
129
                }
130 5
            },
131 5
            $aggregateEventStream
132
        );
133
    }
134
135
    /**
136
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
137
     *
138
     * {@inheritdoc}
139
     */
140 5
    public function load(AggregateId $id, int $fromVersion = Aggregate::START_PLAYHEAD_INDEX, ?int $toVersion = null): Promise
141
    {
142 5
        $adapter = $this->adapter;
143
144
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
145 5
        return call(
146
            static function(AggregateId $id, int $fromVersion, ?int $toVersion) use ($adapter): \Generator
147
            {
148 5
                $aggregateEventStream = null;
149
150
                /** @var array<string, string>|null $streamData */
151 5
                $streamData = yield from self::doLoadStream($adapter, $id);
152
153 5
                if(null !== $streamData)
154
                {
155
                    /** @var array<int, array>|null $streamEventsData */
156 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
157 5
                        $adapter,
158 5
                        (string) $streamData['id'],
159 5
                        $fromVersion,
160 5
                        $toVersion
161
                    );
162
163 5
                    $aggregateEventStream = self::restoreEventStream($adapter, $streamData, $streamEventsData);
164
                }
165
166 5
                return $aggregateEventStream;
167 5
            },
168 5
            $id,
169 5
            $fromVersion,
170 5
            $toVersion
171
        );
172
    }
173
174
    /**
175
     * {@inheritdoc}
176
     */
177
    public function close(AggregateId $id): Promise
178
    {
179
        $adapter = $this->adapter;
180
181
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
182
        return call(
183
            static function(AggregateId $id) use ($adapter): \Generator
184
            {
185
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
186
                    ->where(equalsCriteria('id', $id->toString()))
187
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
188
189
                $compiledQuery = $updateQuery->compile();
190
191
                /**
192
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
193
                 * @psalm-suppress MixedTypeCoercion Invalid params() docblock
194
                 */
195
                yield $adapter->execute($compiledQuery->sql(), $compiledQuery->params());
196
            },
197
            $id
198
        );
199
    }
200
201
    /**
202
     * {@inheritdoc}
203
     */
204 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
205
    {
206 4
        $adapter = $this->adapter;
207
208
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
209 4
        return call(
210
            static function(AggregateId $id, int $toVersion, bool $force) use ($adapter): \Generator
211
            {
212
                /** @var array<string, string>|null $streamData */
213 4
                $streamData = yield from self::doLoadStream($adapter, $id);
214
215 4
                if(null === $streamData)
216
                {
217 1
                    throw EventStreamDoesNotExist::create($id);
218
                }
219
220
                /** @var string $streamId */
221 3
                $streamId = $streamData['id'];
222
223
                /**
224
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
225
                 *
226
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
227
                 */
228 3
                $transaction = yield $adapter->transaction();
229
230
                try
231
                {
232 3
                    true === $force
233 1
                        ? yield from self::doDeleteTailEvents($transaction, $streamId, $toVersion)
234 2
                        : yield from self::doSkipEvents($transaction, $streamId, $toVersion);
235
236
                    /** restore soft deleted events */
237 3
                    yield from self::doRestoreEvents($transaction, $streamId, $toVersion);
238
239 3
                    yield $transaction->commit();
240
                }
241 1
                catch(UniqueConstraintViolationCheckFailed $exception)
242
                {
243 1
                    yield $transaction->rollback();
244
245 1
                    throw new EventStreamIntegrityCheckFailed(
246 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
247 1
                        (int) $exception->getCode(),
248 1
                        $exception
249
                    );
250
                }
251
                catch(\Throwable $throwable)
252
                {
253
                    yield $transaction->rollback();
254
255
                    throw $throwable;
256
                }
257
                finally
258 3
                {
259 3
                    unset($transaction);
260
                }
261 4
            },
262 4
            $id,
263 4
            $toVersion,
264 4
            $force
265
        );
266
    }
267
268
    /**
269
     * @param QueryExecutor              $queryExecutor
270
     * @param StoredAggregateEventStream $eventsStream
271
     *
272
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
273
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
274
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
275
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
276
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
277
     *
278
     * @return \Generator
279
     */
280 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
281
    {
282 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
283 7
            'id'               => $eventsStream->aggregateId,
284 7
            'identifier_class' => $eventsStream->aggregateIdClass,
285 7
            'aggregate_class'  => $eventsStream->aggregateClass,
286 7
            'created_at'       => $eventsStream->createdAt,
287 7
            'closed_at'        => $eventsStream->closedAt,
288
        ]);
289
290 7
        $compiledQuery = $insertQuery->compile();
291
292
        /**
293
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
294
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
295
         *
296
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
297
         */
298 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
299
300 7
        unset($insertQuery, $compiledQuery, $resultSet);
301 7
    }
302
303
    /**
304
     * Saving events in stream.
305
     *
306
     * @param QueryExecutor              $queryExecutor
307
     * @param StoredAggregateEventStream $eventsStream
308
     *
309
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
310
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
311
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
312
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
313
     *
314
     * @return \Generator
315
     */
316 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
317
    {
318 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
319
320 7
        if(0 !== $eventsCount)
321
        {
322
            /**
323
             * @psalm-suppress TooManyTemplateParams Wrong Promise template
324
             * @psalm-suppress MixedTypeCoercion Invalid params() docblock
325
             */
326 7
            yield $queryExecutor->execute(
327 7
                self::createSaveEventQueryString($eventsCount),
328 7
                self::collectSaveEventQueryParameters($eventsStream)
329
            );
330
        }
331 7
    }
332
333
    /**
334
     * @noinspection PhpDocMissingThrowsInspection
335
     *
336
     * @param QueryExecutor $queryExecutor
337
     * @param AggregateId   $id
338
     *
339
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
340
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
341
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
342
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
343
     *
344
     * @return \Generator
345
     */
346 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
347
    {
348
        /** @noinspection PhpUnhandledExceptionInspection */
349 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
350 6
            ->where(equalsCriteria('id', $id->toString()))
351 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
352
353
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
354 6
        $compiledQuery = $selectQuery->compile();
355
356
        /**
357
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
358
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
359
         *
360
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
361
         */
362
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
363 6
            yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
364
365
        /**
366
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
367
         * @psalm-var      array<string, string>|null $data
368
         *
369
         * @var array $data
370
         */
371
        $data = /** @noinspection PhpUnhandledExceptionInspection */
372 6
            yield fetchOne($resultSet);
373
374 6
        return $data;
375
    }
376
377
    /**
378
     * @param QueryExecutor $queryExecutor
379
     * @param string        $streamId
380
     * @param int           $fromVersion
381
     * @param int|null      $toVersion
382
     *
383
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
384
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
385
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
386
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
387
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
388
     *
389
     * @return \Generator
390
     */
391 5
    private static function doLoadStreamEvents(
392
        QueryExecutor $queryExecutor,
393
        string $streamId,
394
        int $fromVersion,
395
        ?int $toVersion
396
    ): \Generator
397
    {
398
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
399 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
400 5
            ->where(field('stream_id')->eq($streamId))
401 5
            ->andWhere(field('playhead')->gte($fromVersion))
402 5
            ->andWhere(field('canceled_at')->isNull());
403
404 5
        if(null !== $toVersion && $fromVersion < $toVersion)
405
        {
406
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
407
        }
408
409
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
410 5
        $compiledQuery = $selectQuery->compile();
411
412
        /**
413
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
414
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
415
         *
416
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
417
         */
418 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
419
420
        /**
421
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
422
         * @psalm-var      array<int, array>|null $result
423
         *
424
         * @var array $result
425
         */
426 5
        $result = yield fetchAll($resultSet);
427
428 5
        return $result;
429
    }
430
431
    /**
432
     * @noinspection PhpDocMissingThrowsInspection
433
     *
434
     * Complete removal of the "tail" events from the database
435
     *
436
     * @param QueryExecutor $executor
437
     * @param string        $streamId
438
     * @param int           $toVersion
439
     *
440
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
441
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
442
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
443
     *
444
     * @return \Generator
445
     */
446 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
447
    {
448
        /** @noinspection PhpUnhandledExceptionInspection */
449 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
450 1
            ->where(equalsCriteria('stream_id', $streamId))
451 1
            ->andWhere(field('playhead')->gt($toVersion));
452
453 1
        $compiledQuery = $deleteQuery->compile();
454
455
        /**
456
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
457
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
458
         *
459
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
460
         */
461
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
462 1
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
463
464 1
        unset($deleteQuery, $compiledQuery, $resultSet);
465 1
    }
466
467
    /**
468
     * @noinspection PhpDocMissingThrowsInspection
469
     *
470
     * Soft deletion of events following the specified version
471
     *
472
     * @param QueryExecutor $executor
473
     * @param string        $streamId
474
     * @param int           $toVersion
475
     *
476
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
477
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
478
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
479
     *
480
     * @return \Generator
481
     */
482 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
483
    {
484
        /** @noinspection PhpUnhandledExceptionInspection */
485 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
486 2
            ->where(equalsCriteria('stream_id', $streamId))
487 2
            ->andWhere(field('playhead')->gt($toVersion));
488
489
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
490 2
        $compiledQuery = $updateQuery->compile();
491
492
        /**
493
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
494
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
495
         *
496
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
497
         */
498
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
499 2
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
500
501 2
        unset($updateQuery, $compiledQuery, $resultSet);
502 2
    }
503
504
    /**
505
     * @noinspection PhpDocMissingThrowsInspection
506
     *
507
     * @param QueryExecutor $executor
508
     * @param string        $streamId
509
     * @param int           $toVersion
510
     *
511
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
512
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
513
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
514
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
515
     *
516
     * @return \Generator
517
     */
518 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
519
    {
520
        /** @noinspection PhpUnhandledExceptionInspection */
521 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
522 3
            ->where(equalsCriteria('stream_id', $streamId))
523 3
            ->andWhere(field('playhead')->lte($toVersion));
524
525 3
        $compiledQuery = $updateQuery->compile();
526
527
        /**
528
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
529
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
530
         *
531
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
532
         */
533 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
534
535 3
        unset($updateQuery, $compiledQuery, $resultSet);
536 3
    }
537
538
    /**
539
     * Transform events stream array data to stored representation.
540
     *
541
     * @psalm-param array<string, string>  $streamData
542
     * @psalm-param array<int, array>|null $streamEventsData
543
     *
544
     * @param DatabaseAdapter $adapter
545
     * @param array           $streamData
546
     * @param array           $streamEventsData
547
     *
548
     * @throws \LogicException
549
     *
550
     * @return StoredAggregateEventStream
551
     */
552 5
    private static function restoreEventStream(
553
        DatabaseAdapter $adapter,
554
        array $streamData,
555
        ?array $streamEventsData
556
    ): StoredAggregateEventStream
557
    {
558
        /** @psalm-var array{
559
         *     id:string,
560
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
561
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
562
         *     created_at:string,
563
         *     closed_at:string|null
564
         * } $streamData
565
         */
566
567 5
        return StoredAggregateEventStream::create(
568 5
            $streamData['id'],
569 5
            $streamData['identifier_class'],
570 5
            $streamData['aggregate_class'],
571 5
            self::restoreEvents($adapter, $streamEventsData),
572 5
            $streamData['created_at'],
573 5
            $streamData['closed_at']
574
        );
575
    }
576
577
    /**
578
     * Restore events from rows.
579
     *
580
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
581
     *
582
     * @param BinaryDataDecoder $decoder
583
     * @param array|null        $eventsData
584
     *
585
     * @throws \LogicException
586
     *
587
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
588
     */
589 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
590
    {
591 5
        $events = [];
592
593 5
        if(true === \is_array($eventsData) && 0 !== \count($eventsData))
594
        {
595
            /**
596
             * @psalm-var array{
597
             *   id:string,
598
             *   playhead:string,
599
             *   payload:string,
600
             *   event_class:class-string,
601
             *   occured_at:string,
602
             *   recorded_at:string
603
             * } $eventRow
604
             */
605 4
            foreach($eventsData as $eventRow)
606
            {
607 4
                $playhead = (int) $eventRow['playhead'];
608
609 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
610
611 4
                if(true === \is_string($payload))
612
                {
613 4
                    $events[$playhead] = StoredAggregateEvent::restore(
614 4
                        $eventRow['id'],
615 4
                        $playhead,
616 4
                        $payload,
617 4
                        $eventRow['event_class'],
618 4
                        $eventRow['occured_at'],
619 4
                        $eventRow['recorded_at']
620
                    );
621
622 4
                    continue;
623
                }
624
625
                throw new \LogicException(
626
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
627
                );
628
            }
629
        }
630
631
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
632
633 5
        return $events;
634
    }
635
636
    /**
637
     * Create a sql query to store events.
638
     *
639
     * @param int $eventsCount
640
     *
641
     * @return string
642
     */
643 7
    private static function createSaveEventQueryString(int $eventsCount): string
644
    {
645 7
        return \sprintf(
646
        /** @lang text */
647 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
648 7
            self::STREAM_EVENTS_TABLE,
649 7
            \implode(
650 7
                ', ',
651 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
652
            )
653
        );
654
    }
655
656
    /**
657
     * Gathering parameters for sending to a request to save events.
658
     *
659
     * @param StoredAggregateEventStream $eventsStream
660
     *
661
     * @return array
662
     */
663 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
664
    {
665 7
        $queryParameters = [];
666 7
        $rowSetIndex     = 0;
667
668
        /** @psalm-var array<int, string|int|float|null> $parameters */
669 7
        foreach(self::prepareEventRows($eventsStream) as $parameters)
670
        {
671 7
            foreach($parameters as $parameter)
672
            {
673 7
                $queryParameters[$rowSetIndex] = $parameter;
674
675 7
                $rowSetIndex++;
676
            }
677
        }
678
679 7
        return $queryParameters;
680
    }
681
682
    /**
683
     * Prepare events to insert.
684
     *
685
     * @psalm-return array<int, array<int, string|int>>
686
     *
687
     * @param StoredAggregateEventStream $eventsStream
688
     *
689
     * @return array
690
     */
691 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
692
    {
693 7
        $eventsRows = [];
694
695 7
        foreach($eventsStream->storedAggregateEvents as $storedAggregateEvent)
696
        {
697
            /** @var StoredAggregateEvent $storedAggregateEvent */
698
            $row = [
699 7
                $storedAggregateEvent->eventId,
700 7
                $eventsStream->aggregateId,
701 7
                $storedAggregateEvent->playheadPosition,
702 7
                $storedAggregateEvent->eventClass,
703 7
                \base64_encode($storedAggregateEvent->eventData),
704 7
                $storedAggregateEvent->occuredAt,
705 7
                \date('Y-m-d H:i:s'),
706
            ];
707
708 7
            $eventsRows[] = $row;
709
        }
710
711 7
        return $eventsRows;
712
    }
713
}
714