Completed
Push — v3.0 ( 928391...a7a97e )
by Masiukevich
03:12
created

SqlEventStreamStore::createSaveEventQueryString()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 1
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use function Latitude\QueryBuilder\field;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
21
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
22
use ServiceBus\Storage\Common\BinaryDataDecoder;
23
use ServiceBus\Storage\Common\DatabaseAdapter;
24
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
25
use ServiceBus\Storage\Common\QueryExecutor;
26
use function ServiceBus\Storage\Sql\deleteQuery;
27
use function ServiceBus\Storage\Sql\equalsCriteria;
28
use function ServiceBus\Storage\Sql\fetchAll;
29
use function ServiceBus\Storage\Sql\fetchOne;
30
use function ServiceBus\Storage\Sql\insertQuery;
31
use function ServiceBus\Storage\Sql\selectQuery;
32
use function ServiceBus\Storage\Sql\updateQuery;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE       = 'event_store_stream';
40
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
41
42
    /**
43
     * @var DatabaseAdapter
44
     */
45
    private $adapter;
46
47
    /**
48
     * @param DatabaseAdapter $adapter
49
     */
50 8
    public function __construct(DatabaseAdapter $adapter)
51
    {
52 8
        $this->adapter = $adapter;
53 8
    }
54
55
    /**
56
     * @inheritDoc
57
     */
58 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
59
    {
60
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
61 7
        return call(
62
            function(StoredAggregateEventStream $aggregateEventStream): \Generator
63
            {
64
                /**
65
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
66
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
67
                 */
68 7
                $transaction = yield $this->adapter->transaction();
69
70
                try
71
                {
72 7
                    yield from self::doSaveStream($transaction, $aggregateEventStream);
73 7
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
74
75 7
                    yield $transaction->commit();
76
                }
77 1
                catch(\Throwable $throwable)
78
                {
79 1
                    yield $transaction->rollback();
80
81 1
                    throw $throwable;
82
                }
83
                finally
84 7
                {
85 7
                    unset($transaction);
86
                }
87 7
            },
88 7
            $aggregateEventStream
89
        );
90
    }
91
92
    /**
93
     * @inheritDoc
94
     */
95 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
96
    {
97
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
98 5
        return call(
99
            function(StoredAggregateEventStream $aggregateEventStream): \Generator
100
            {
101
                /**
102
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
103
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
104
                 */
105 5
                $transaction = yield $this->adapter->transaction();
106
107
                try
108
                {
109 5
                    yield from self::doSaveEvents($transaction, $aggregateEventStream);
110
111 5
                    yield $transaction->commit();
112
                }
113
                catch(\Throwable $throwable)
114
                {
115
                    yield $transaction->rollback();
116
117
                    throw $throwable;
118
                }
119
                finally
120 5
                {
121 5
                    unset($transaction);
122
                }
123 5
            },
124 5
            $aggregateEventStream
125
        );
126
    }
127
128
    /**
129
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
130
     *
131
     * @inheritDoc
132
     */
133 5
    public function load(AggregateId $id, int $fromVersion = Aggregate::START_PLAYHEAD_INDEX, ?int $toVersion = null): Promise
134
    {
135
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
136 5
        return call(
137
            function(AggregateId $id, int $fromVersion, ?int $toVersion): \Generator
138
            {
139 5
                $aggregateEventStream = null;
140
141
                /** @var array<string, string>|null $streamData */
142 5
                $streamData = yield from self::doLoadStream($this->adapter, $id);
143
144 5
                if(null !== $streamData)
145
                {
146
                    /** @var array<int, array>|null $streamEventsData */
147 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
148 5
                        $this->adapter,
149 5
                        (string) $streamData['id'],
150 5
                        $fromVersion,
151 5
                        $toVersion
152
                    );
153
154 5
                    $aggregateEventStream = self::restoreEventStream($this->adapter, $streamData, $streamEventsData);
155
                }
156
157 5
                return $aggregateEventStream;
158 5
            },
159 5
            $id, $fromVersion, $toVersion
160
        );
161
    }
162
163
    /**
164
     * @inheritDoc
165
     */
166
    public function close(AggregateId $id): Promise
167
    {
168
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
169
        return call(
170
            function(AggregateId $id): \Generator
171
            {
172
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
173
                    ->where(equalsCriteria('id', $id))
174
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
175
176
                $compiledQuery = $updateQuery->compile();
177
178
                /**
179
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
180
                 * @var \ServiceBus\Storage\Common\ResultSet $resultSet
181
                 */
182
                $resultSet = yield $this->adapter->execute($compiledQuery->sql(), $compiledQuery->params());
183
184
                unset($updateQuery, $compiledQuery, $resultSet);
185
            },
186
            $id
187
        );
188
    }
189
190
    /**
191
     * @inheritDoc
192
     */
193 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
194
    {
195
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
196 4
        return call(
197
            function(AggregateId $id, int $toVersion, bool $force): \Generator
198
            {
199
                /** @var array<string, string>|null $streamData */
200 4
                $streamData = yield from self::doLoadStream($this->adapter, $id);
201
202 4
                if(null === $streamData)
203
                {
204 1
                    throw EventStreamDoesNotExist::create($id);
205
                }
206
207
                /** @var string $streamId */
208 3
                $streamId = $streamData['id'];
209
210
                /**
211
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
212
                 * @var \ServiceBus\Storage\Common\Transaction $transaction
213
                 */
214 3
                $transaction = yield $this->adapter->transaction();
215
216
                try
217
                {
218 3
                    true === $force
219 1
                        ? yield from self::doDeleteTailEvents($transaction, $streamId, $toVersion)
220 2
                        : yield from self::doSkipEvents($transaction, $streamId, $toVersion);
221
222
                    /** restore soft deleted events */
223 3
                    yield from self::doRestoreEvents($transaction, $streamId, $toVersion);
224
225 3
                    yield $transaction->commit();
226
                }
227 1
                catch(UniqueConstraintViolationCheckFailed $exception)
228
                {
229 1
                    yield $transaction->rollback();
230
231 1
                    throw new EventStreamIntegrityCheckFailed(
232 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id),
233 1
                        (int) $exception->getCode(),
234 1
                        $exception
235
                    );
236
                }
237
                catch(\Throwable $throwable)
238
                {
239
                    yield $transaction->rollback();
240
241
                    throw $throwable;
242
                }
243
                finally
244 3
                {
245 3
                    unset($transaction);
246
                }
247 4
            },
248 4
            $id, $toVersion, $force
249
        );
250
    }
251
252
    /**
253
     * @param QueryExecutor              $queryExecutor
254
     * @param StoredAggregateEventStream $eventsStream
255
     *
256
     * @return \Generator
257
     *
258
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
259
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
260
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
261
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
262
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
263
     */
264 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
265
    {
266 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
267 7
            'id'               => $eventsStream->aggregateId,
268 7
            'identifier_class' => $eventsStream->aggregateIdClass,
269 7
            'aggregate_class'  => $eventsStream->aggregateClass,
270 7
            'created_at'       => $eventsStream->createdAt,
271 7
            'closed_at'        => $eventsStream->closedAt
272
        ]);
273
274 7
        $compiledQuery = $insertQuery->compile();
275
276
        /**
277
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
278
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
279
         */
280 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
281
282 7
        unset($insertQuery, $compiledQuery, $resultSet);
283 7
    }
284
285
    /**
286
     * Saving events in stream
287
     *
288
     * @param QueryExecutor              $queryExecutor
289
     * @param StoredAggregateEventStream $eventsStream
290
     *
291
     * @return \Generator
292
     *
293
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
294
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
295
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
296
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
297
     */
298 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
299
    {
300 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
301
302 7
        if(0 !== $eventsCount)
303
        {
304
            /** @psalm-suppress TooManyTemplateParams Wrong Promise template */
305 7
            yield $queryExecutor->execute(
306 7
                self::createSaveEventQueryString($eventsCount),
307 7
                self::collectSaveEventQueryParameters($eventsStream)
308
            );
309
        }
310 7
    }
311
312
    /**
313
     * @noinspection PhpDocMissingThrowsInspection
314
     *
315
     * @param QueryExecutor $queryExecutor
316
     * @param AggregateId   $id
317
     *
318
     * @return \Generator
319
     *
320
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
321
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
322
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
323
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
324
     */
325 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
326
    {
327
        /** @noinspection PhpUnhandledExceptionInspection */
328 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
329 6
            ->where(equalsCriteria('id', $id))
330 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
331
332
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
333 6
        $compiledQuery = $selectQuery->compile();
334
335
        /**
336
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
337
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
338
         */
339
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
340 6
            yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
341
342
        /**
343
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
344
         * @psalm-var      array<string, string>|null $data
345
         * @var array $data
346
         */
347
        $data = /** @noinspection PhpUnhandledExceptionInspection */
348 6
            yield fetchOne($resultSet);
349
350 6
        return $data;
351
    }
352
353
    /**
354
     * @param QueryExecutor $queryExecutor
355
     * @param string        $streamId
356
     * @param int           $fromVersion
357
     * @param int|null      $toVersion
358
     *
359
     * @return \Generator
360
     *
361
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
362
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
363
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
364
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
365
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
366
     */
367 5
    private static function doLoadStreamEvents(
368
        QueryExecutor $queryExecutor,
369
        string $streamId,
370
        int $fromVersion,
371
        ?int $toVersion
372
    ): \Generator
373
    {
374
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
375 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
376 5
            ->where(field('stream_id')->eq($streamId))
377 5
            ->andWhere(field('playhead')->gte($fromVersion))
378 5
            ->andWhere(field('canceled_at')->isNull());
379
380 5
        if(null !== $toVersion && $fromVersion < $toVersion)
381
        {
382
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
383
        }
384
385
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
386 5
        $compiledQuery = $selectQuery->compile();
387
388
        /**
389
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
390
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
391
         */
392 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
393
394
        /**
395
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
396
         * @psalm-var      array<int, array>|null $result
397
         * @var array $result
398
         */
399 5
        $result = yield fetchAll($resultSet);
400
401 5
        return $result;
402
    }
403
404
    /**
405
     * @noinspection PhpDocMissingThrowsInspection
406
     *
407
     * Complete removal of the "tail" events from the database
408
     *
409
     * @param QueryExecutor $executor
410
     * @param string        $streamId
411
     * @param int           $toVersion
412
     *
413
     * @return \Generator
414
     *
415
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
416
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
417
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
418
     */
419 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
420
    {
421
        /** @noinspection PhpUnhandledExceptionInspection */
422 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
423 1
            ->where(equalsCriteria('stream_id', $streamId))
424 1
            ->andWhere(field('playhead')->gt($toVersion));
425
426 1
        $compiledQuery = $deleteQuery->compile();
427
428
        /**
429
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
430
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
431
         */
432
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
433 1
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
434
435 1
        unset($deleteQuery, $compiledQuery, $resultSet);
436 1
    }
437
438
    /**
439
     * @noinspection PhpDocMissingThrowsInspection
440
     *
441
     * Soft deletion of events following the specified version
442
     *
443
     * @param QueryExecutor $executor
444
     * @param string        $streamId
445
     * @param int           $toVersion
446
     *
447
     * @return \Generator
448
     *
449
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
450
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
451
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
452
     */
453 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
454
    {
455
        /** @noinspection PhpUnhandledExceptionInspection */
456 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
457 2
            ->where(equalsCriteria('stream_id', $streamId))
458 2
            ->andWhere(field('playhead')->gt($toVersion));
459
460 2
        $compiledQuery = $updateQuery->compile();
461
462
        /**
463
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
464
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
465
         */
466
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
467 2
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
468
469 2
        unset($updateQuery, $compiledQuery, $resultSet);
470 2
    }
471
472
    /**
473
     * @noinspection PhpDocMissingThrowsInspection
474
     *
475
     * @param QueryExecutor $executor
476
     * @param string        $streamId
477
     * @param int           $toVersion
478
     *
479
     * @return \Generator
480
     *
481
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
482
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
483
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
484
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
485
     */
486 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
487
    {
488
        /** @noinspection PhpUnhandledExceptionInspection */
489 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
490 3
            ->where(equalsCriteria('stream_id', $streamId))
491 3
            ->andWhere(field('playhead')->lte($toVersion));
492
493 3
        $compiledQuery = $updateQuery->compile();
494
495
        /**
496
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
497
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
498
         */
499 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
500
501 3
        unset($updateQuery, $compiledQuery, $resultSet);
502 3
    }
503
504
    /**
505
     * Transform events stream array data to stored representation
506
     *
507
     * @psalm-param array<string, string>  $streamData
508
     * @psalm-param array<int, array>|null $streamEventsData
509
     *
510
     * @param DatabaseAdapter $adapter
511
     * @param array           $streamData
512
     * @param array           $streamEventsData
513
     *
514
     * @return StoredAggregateEventStream
515
     *
516
     * @throws \LogicException
517
     */
518 5
    private static function restoreEventStream(
519
        DatabaseAdapter $adapter,
520
        array $streamData,
521
        ?array $streamEventsData
522
    ): StoredAggregateEventStream
523
    {
524
        /** @psalm-var array{
525
         *     id:string,
526
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
527
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
528
         *     created_at:string,
529
         *     closed_at:string|null
530
         * } $streamData
531
         */
532
533 5
        return StoredAggregateEventStream::create(
534 5
            $streamData['id'],
535 5
            $streamData['identifier_class'],
536 5
            $streamData['aggregate_class'],
537 5
            self::restoreEvents($adapter, $streamEventsData),
538 5
            $streamData['created_at'],
539 5
            $streamData['closed_at']
540
        );
541
    }
542
543
    /**
544
     * Restore events from rows
545
     *
546
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
547
     *
548
     * @param BinaryDataDecoder $decoder
549
     * @param array|null        $eventsData
550
     *
551
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
552
     *
553
     * @throws \LogicException
554
     */
555 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
556
    {
557 5
        $events = [];
558
559 5
        if(true === \is_array($eventsData) && 0 !== \count($eventsData))
560
        {
561
            /**
562
             * @psalm-var array{
563
             *   id:string,
564
             *   playhead:string,
565
             *   payload:string,
566
             *   event_class:class-string,
567
             *   occured_at:string,
568
             *   recorded_at:string
569
             * } $eventRow
570
             */
571 4
            foreach($eventsData as $eventRow)
572
            {
573 4
                $playhead = (int) $eventRow['playhead'];
574
575 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
576
577 4
                if(true === \is_string($payload))
578
                {
579 4
                    $events[$playhead] = StoredAggregateEvent::restore(
580 4
                        $eventRow['id'],
581 4
                        $playhead,
582 4
                        $payload,
583 4
                        $eventRow['event_class'],
584 4
                        $eventRow['occured_at'],
585 4
                        $eventRow['recorded_at']
586
                    );
587
588 4
                    continue;
589
                }
590
591
                throw new \LogicException(
592
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
593
                );
594
            }
595
        }
596
597
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
598
599 5
        return $events;
600
    }
601
602
    /**
603
     * Create a sql query to store events
604
     *
605
     * @param int $eventsCount
606
     *
607
     * @return string
608
     */
609 7
    private static function createSaveEventQueryString(int $eventsCount): string
610
    {
611 7
        return \sprintf(
612
        /** @lang text */
613 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
614 7
            self::STREAM_EVENTS_TABLE,
615 7
            \implode(
616 7
                ', ', \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
617
            )
618
        );
619
    }
620
621
    /**
622
     * Gathering parameters for sending to a request to save events
623
     *
624
     * @param StoredAggregateEventStream $eventsStream
625
     *
626
     * @return array
627
     */
628 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
629
    {
630 7
        $queryParameters = [];
631 7
        $rowSetIndex     = 0;
632
633
        /** @psalm-var array<int, string|int> $parameters */
634 7
        foreach(self::prepareEventRows($eventsStream) as $parameters)
635
        {
636 7
            foreach($parameters as $parameter)
637
            {
638 7
                $queryParameters[$rowSetIndex] = $parameter;
639
640 7
                $rowSetIndex++;
641
            }
642
        }
643
644 7
        return $queryParameters;
645
    }
646
647
    /**
648
     * Prepare events to insert
649
     *
650
     * @psalm-return array<int, array<int, string|int>>
651
     *
652
     * @param StoredAggregateEventStream $eventsStream
653
     *
654
     * @return array
655
     */
656 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
657
    {
658 7
        $eventsRows = [];
659
660 7
        foreach($eventsStream->storedAggregateEvents as $storedAggregateEvent)
661
        {
662
            /** @var StoredAggregateEvent $storedAggregateEvent */
663
664
            $row = [
665 7
                $storedAggregateEvent->eventId,
666 7
                $eventsStream->aggregateId,
667 7
                $storedAggregateEvent->playheadPosition,
668 7
                $storedAggregateEvent->eventClass,
669 7
                \base64_encode($storedAggregateEvent->eventData),
670 7
                $storedAggregateEvent->occuredAt,
671 7
                \date('Y-m-d H:i:s')
672
            ];
673
674 7
            $eventsRows[] = $row;
675
        }
676
677 7
        return $eventsRows;
678
    }
679
}
680