Passed
Push — v3.0 ( 2d96b5...8256f3 )
by Masiukevich
02:53
created

SqlEventStreamStore::revert()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 59
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 5.0342

Importance

Changes 0
Metric Value
cc 5
eloc 27
nc 1
nop 3
dl 0
loc 59
ccs 24
cts 27
cp 0.8889
crap 5.0342
rs 9.1768
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE = 'event_store_stream';
40
41
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
42
43
    /**
44
     * @var DatabaseAdapter
45
     */
46
    private $adapter;
47
48
    /**
49
     * @param DatabaseAdapter $adapter
50
     */
51 8
    public function __construct(DatabaseAdapter $adapter)
52
    {
53 8
        $this->adapter = $adapter;
54 8
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
60
    {
61
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
62 7
        return call(
63
            function(StoredAggregateEventStream $aggregateEventStream): \Generator
64
            {
65
                /**
66
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
67
                 *
68
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
69
                 */
70 7
                $transaction = yield $this->adapter->transaction();
71
72
                try
73
                {
74 7
                    yield from self::doSaveStream($transaction, $aggregateEventStream);
75 7
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
76
77 7
                    yield $transaction->commit();
78
                }
79 1
                catch(\Throwable $throwable)
80
                {
81 1
                    yield $transaction->rollback();
82
83 1
                    throw $throwable;
84
                }
85
                finally
86 7
                {
87 7
                    unset($transaction);
88
                }
89 7
            },
90 7
            $aggregateEventStream
91
        );
92
    }
93
94
    /**
95
     * {@inheritdoc}
96
     */
97 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
98
    {
99
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
100 5
        return call(
101
            function(StoredAggregateEventStream $aggregateEventStream): \Generator
102
            {
103
                /**
104
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
105
                 *
106
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
107
                 */
108 5
                $transaction = yield $this->adapter->transaction();
109
110
                try
111
                {
112 5
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
113
114 5
                    yield $transaction->commit();
115
                }
116
                catch(\Throwable $throwable)
117
                {
118
                    yield $transaction->rollback();
119
120
                    throw $throwable;
121
                }
122
                finally
123 5
                {
124 5
                    unset($transaction);
125
                }
126 5
            },
127 5
            $aggregateEventStream
128
        );
129
    }
130
131
    /**
132
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
133
     *
134
     * {@inheritdoc}
135
     */
136 5
    public function load(AggregateId $id, int $fromVersion = Aggregate::START_PLAYHEAD_INDEX, ?int $toVersion = null): Promise
137
    {
138
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
139 5
        return call(
140
            function(AggregateId $id, int $fromVersion, ?int $toVersion): \Generator
141
            {
142 5
                $aggregateEventStream = null;
143
144
                /** @var array<string, string>|null $streamData */
145 5
                $streamData = yield from self::doLoadStream($this->adapter, $id);
146
147 5
                if(null !== $streamData)
148
                {
149
                    /** @var array<int, array>|null $streamEventsData */
150 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
151 5
                        $this->adapter,
152 5
                        (string) $streamData['id'],
153 5
                        $fromVersion,
154 5
                        $toVersion
155
                    );
156
157 5
                    $aggregateEventStream = self::restoreEventStream($this->adapter, $streamData, $streamEventsData);
158
                }
159
160 5
                return $aggregateEventStream;
161 5
            },
162 5
            $id,
163 5
            $fromVersion,
164 5
            $toVersion
165
        );
166
    }
167
168
    /**
169
     * {@inheritdoc}
170
     */
171
    public function close(AggregateId $id): Promise
172
    {
173
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
174
        return call(
175
            function(AggregateId $id): \Generator
176
            {
177
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
178
                    ->where(equalsCriteria('id', $id))
179
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
180
181
                $compiledQuery = $updateQuery->compile();
182
183
                /**
184
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
185
                 * @psalm-suppress MixedTypeCoercion Invalid params() docblock
186
                 *
187
                 * @var \ServiceBus\Storage\Common\ResultSet $resultSet
188
                 */
189
                $resultSet = yield $this->adapter->execute($compiledQuery->sql(), $compiledQuery->params());
190
191
                unset($updateQuery, $compiledQuery, $resultSet);
192
            },
193
            $id
194
        );
195
    }
196
197
    /**
198
     * {@inheritdoc}
199
     */
200 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
201
    {
202
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
203 4
        return call(
204
            function(AggregateId $id, int $toVersion, bool $force): \Generator
205
            {
206
                /** @var array<string, string>|null $streamData */
207 4
                $streamData = yield from self::doLoadStream($this->adapter, $id);
208
209 4
                if(null === $streamData)
210
                {
211 1
                    throw EventStreamDoesNotExist::create($id);
212
                }
213
214
                /** @var string $streamId */
215 3
                $streamId = $streamData['id'];
216
217
                /**
218
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
219
                 *
220
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
221
                 */
222 3
                $transaction = yield $this->adapter->transaction();
223
224
                try
225
                {
226 3
                    true === $force
227 1
                        ? yield from self::doDeleteTailEvents($transaction, $streamId, $toVersion)
228 2
                        : yield from self::doSkipEvents($transaction, $streamId, $toVersion);
229
230
                    /** restore soft deleted events */
231 3
                    yield from self::doRestoreEvents($transaction, $streamId, $toVersion);
232
233 3
                    yield $transaction->commit();
234
                }
235 1
                catch(UniqueConstraintViolationCheckFailed $exception)
236
                {
237 1
                    yield $transaction->rollback();
238
239 1
                    throw new EventStreamIntegrityCheckFailed(
240 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id),
241 1
                        (int) $exception->getCode(),
242 1
                        $exception
243
                    );
244
                }
245
                catch(\Throwable $throwable)
246
                {
247
                    yield $transaction->rollback();
248
249
                    throw $throwable;
250
                }
251
                finally
252 3
                {
253 3
                    unset($transaction);
254
                }
255 4
            },
256 4
            $id,
257 4
            $toVersion,
258 4
            $force
259
        );
260
    }
261
262
    /**
263
     * @param QueryExecutor              $queryExecutor
264
     * @param StoredAggregateEventStream $eventsStream
265
     *
266
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
267
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
268
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
269
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
270
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
271
     *
272
     * @return \Generator
273
     */
274 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
275
    {
276 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
277 7
            'id'               => $eventsStream->aggregateId,
278 7
            'identifier_class' => $eventsStream->aggregateIdClass,
279 7
            'aggregate_class'  => $eventsStream->aggregateClass,
280 7
            'created_at'       => $eventsStream->createdAt,
281 7
            'closed_at'        => $eventsStream->closedAt,
282
        ]);
283
284 7
        $compiledQuery = $insertQuery->compile();
285
286
        /**
287
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
288
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
289
         *
290
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
291
         */
292 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
293
294 7
        unset($insertQuery, $compiledQuery, $resultSet);
295 7
    }
296
297
    /**
298
     * Saving events in stream.
299
     *
300
     * @param QueryExecutor              $queryExecutor
301
     * @param StoredAggregateEventStream $eventsStream
302
     *
303
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
304
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
305
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
306
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
307
     *
308
     * @return \Generator
309
     */
310 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
311
    {
312 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
313
314 7
        if(0 !== $eventsCount)
315
        {
316
            /**
317
             * @psalm-suppress TooManyTemplateParams Wrong Promise template
318
             * @psalm-suppress MixedTypeCoercion Invalid params() docblock
319
             */
320 7
            yield $queryExecutor->execute(
321 7
                self::createSaveEventQueryString($eventsCount),
322 7
                self::collectSaveEventQueryParameters($eventsStream)
323
            );
324
        }
325 7
    }
326
327
    /**
328
     * @noinspection PhpDocMissingThrowsInspection
329
     *
330
     * @param QueryExecutor $queryExecutor
331
     * @param AggregateId   $id
332
     *
333
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
334
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
335
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
336
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
337
     *
338
     * @return \Generator
339
     */
340 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
341
    {
342
        /** @noinspection PhpUnhandledExceptionInspection */
343 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
344 6
            ->where(equalsCriteria('id', $id))
345 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
346
347
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
348 6
        $compiledQuery = $selectQuery->compile();
349
350
        /**
351
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
352
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
353
         *
354
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
355
         */
356
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
357 6
            yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
358
359
        /**
360
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
361
         * @psalm-var      array<string, string>|null $data
362
         *
363
         * @var array $data
364
         */
365
        $data = /** @noinspection PhpUnhandledExceptionInspection */
366 6
            yield fetchOne($resultSet);
367
368 6
        return $data;
369
    }
370
371
    /**
372
     * @param QueryExecutor $queryExecutor
373
     * @param string        $streamId
374
     * @param int           $fromVersion
375
     * @param int|null      $toVersion
376
     *
377
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
378
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
379
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
380
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
381
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
382
     *
383
     * @return \Generator
384
     */
385 5
    private static function doLoadStreamEvents(
386
        QueryExecutor $queryExecutor,
387
        string $streamId,
388
        int $fromVersion,
389
        ?int $toVersion
390
    ): \Generator
391
    {
392
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
393 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
394 5
            ->where(field('stream_id')->eq($streamId))
395 5
            ->andWhere(field('playhead')->gte($fromVersion))
396 5
            ->andWhere(field('canceled_at')->isNull());
397
398 5
        if(null !== $toVersion && $fromVersion < $toVersion)
399
        {
400
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
401
        }
402
403
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
404 5
        $compiledQuery = $selectQuery->compile();
405
406
        /**
407
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
408
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
409
         *
410
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
411
         */
412 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
413
414
        /**
415
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
416
         * @psalm-var      array<int, array>|null $result
417
         *
418
         * @var array $result
419
         */
420 5
        $result = yield fetchAll($resultSet);
421
422 5
        return $result;
423
    }
424
425
    /**
426
     * @noinspection PhpDocMissingThrowsInspection
427
     *
428
     * Complete removal of the "tail" events from the database
429
     *
430
     * @param QueryExecutor $executor
431
     * @param string        $streamId
432
     * @param int           $toVersion
433
     *
434
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
435
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
436
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
437
     *
438
     * @return \Generator
439
     */
440 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
441
    {
442
        /** @noinspection PhpUnhandledExceptionInspection */
443 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
444 1
            ->where(equalsCriteria('stream_id', $streamId))
445 1
            ->andWhere(field('playhead')->gt($toVersion));
446
447 1
        $compiledQuery = $deleteQuery->compile();
448
449
        /**
450
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
451
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
452
         *
453
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
454
         */
455
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
456 1
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
457
458 1
        unset($deleteQuery, $compiledQuery, $resultSet);
459 1
    }
460
461
    /**
462
     * @noinspection PhpDocMissingThrowsInspection
463
     *
464
     * Soft deletion of events following the specified version
465
     *
466
     * @param QueryExecutor $executor
467
     * @param string        $streamId
468
     * @param int           $toVersion
469
     *
470
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
471
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
472
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
473
     *
474
     * @return \Generator
475
     */
476 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
477
    {
478
        /** @noinspection PhpUnhandledExceptionInspection */
479 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
480 2
            ->where(equalsCriteria('stream_id', $streamId))
481 2
            ->andWhere(field('playhead')->gt($toVersion));
482
483 2
        $compiledQuery = $updateQuery->compile();
484
485
        /**
486
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
487
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
488
         *
489
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
490
         */
491
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
492 2
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
493
494 2
        unset($updateQuery, $compiledQuery, $resultSet);
495 2
    }
496
497
    /**
498
     * @noinspection PhpDocMissingThrowsInspection
499
     *
500
     * @param QueryExecutor $executor
501
     * @param string        $streamId
502
     * @param int           $toVersion
503
     *
504
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
505
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
506
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
507
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
508
     *
509
     * @return \Generator
510
     */
511 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
512
    {
513
        /** @noinspection PhpUnhandledExceptionInspection */
514 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
515 3
            ->where(equalsCriteria('stream_id', $streamId))
516 3
            ->andWhere(field('playhead')->lte($toVersion));
517
518 3
        $compiledQuery = $updateQuery->compile();
519
520
        /**
521
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
522
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
523
         *
524
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
525
         */
526 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
527
528 3
        unset($updateQuery, $compiledQuery, $resultSet);
529 3
    }
530
531
    /**
532
     * Transform events stream array data to stored representation.
533
     *
534
     * @psalm-param array<string, string>  $streamData
535
     * @psalm-param array<int, array>|null $streamEventsData
536
     *
537
     * @param DatabaseAdapter $adapter
538
     * @param array           $streamData
539
     * @param array           $streamEventsData
540
     *
541
     * @throws \LogicException
542
     *
543
     * @return StoredAggregateEventStream
544
     */
545 5
    private static function restoreEventStream(
546
        DatabaseAdapter $adapter,
547
        array $streamData,
548
        ?array $streamEventsData
549
    ): StoredAggregateEventStream
550
    {
551
        /** @psalm-var array{
552
         *     id:string,
553
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
554
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
555
         *     created_at:string,
556
         *     closed_at:string|null
557
         * } $streamData
558
         */
559
560 5
        return StoredAggregateEventStream::create(
561 5
            $streamData['id'],
562 5
            $streamData['identifier_class'],
563 5
            $streamData['aggregate_class'],
564 5
            self::restoreEvents($adapter, $streamEventsData),
565 5
            $streamData['created_at'],
566 5
            $streamData['closed_at']
567
        );
568
    }
569
570
    /**
571
     * Restore events from rows.
572
     *
573
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
574
     *
575
     * @param BinaryDataDecoder $decoder
576
     * @param array|null        $eventsData
577
     *
578
     * @throws \LogicException
579
     *
580
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
581
     */
582 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
583
    {
584 5
        $events = [];
585
586 5
        if(true === \is_array($eventsData) && 0 !== \count($eventsData))
587
        {
588
            /**
589
             * @psalm-var array{
590
             *   id:string,
591
             *   playhead:string,
592
             *   payload:string,
593
             *   event_class:class-string,
594
             *   occured_at:string,
595
             *   recorded_at:string
596
             * } $eventRow
597
             */
598 4
            foreach($eventsData as $eventRow)
599
            {
600 4
                $playhead = (int) $eventRow['playhead'];
601
602 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
603
604 4
                if(true === \is_string($payload))
605
                {
606 4
                    $events[$playhead] = StoredAggregateEvent::restore(
607 4
                        $eventRow['id'],
608 4
                        $playhead,
609 4
                        $payload,
610 4
                        $eventRow['event_class'],
611 4
                        $eventRow['occured_at'],
612 4
                        $eventRow['recorded_at']
613
                    );
614
615 4
                    continue;
616
                }
617
618
                throw new \LogicException(
619
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
620
                );
621
            }
622
        }
623
624
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
625
626 5
        return $events;
627
    }
628
629
    /**
630
     * Create a sql query to store events.
631
     *
632
     * @param int $eventsCount
633
     *
634
     * @return string
635
     */
636 7
    private static function createSaveEventQueryString(int $eventsCount): string
637
    {
638 7
        return \sprintf(
639
        /** @lang text */
640 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
641 7
            self::STREAM_EVENTS_TABLE,
642 7
            \implode(
643 7
                ', ',
644 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
645
            )
646
        );
647
    }
648
649
    /**
650
     * Gathering parameters for sending to a request to save events.
651
     *
652
     * @param StoredAggregateEventStream $eventsStream
653
     *
654
     * @return array
655
     */
656 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
657
    {
658 7
        $queryParameters = [];
659 7
        $rowSetIndex     = 0;
660
661
        /** @psalm-var array<int, string|int|float|null> $parameters */
662 7
        foreach(self::prepareEventRows($eventsStream) as $parameters)
663
        {
664 7
            foreach($parameters as $parameter)
665
            {
666 7
                $queryParameters[$rowSetIndex] = $parameter;
667
668 7
                $rowSetIndex++;
669
            }
670
        }
671
672 7
        return $queryParameters;
673
    }
674
675
    /**
676
     * Prepare events to insert.
677
     *
678
     * @psalm-return array<int, array<int, string|int>>
679
     *
680
     * @param StoredAggregateEventStream $eventsStream
681
     *
682
     * @return array
683
     */
684 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
685
    {
686 7
        $eventsRows = [];
687
688 7
        foreach($eventsStream->storedAggregateEvents as $storedAggregateEvent)
689
        {
690
            /** @var StoredAggregateEvent $storedAggregateEvent */
691
            $row = [
692 7
                $storedAggregateEvent->eventId,
693 7
                $eventsStream->aggregateId,
694 7
                $storedAggregateEvent->playheadPosition,
695 7
                $storedAggregateEvent->eventClass,
696 7
                \base64_encode($storedAggregateEvent->eventData),
697 7
                $storedAggregateEvent->occuredAt,
698 7
                \date('Y-m-d H:i:s'),
699
            ];
700
701 7
            $eventsRows[] = $row;
702
        }
703
704 7
        return $eventsRows;
705
    }
706
}
707