Completed
Push — v3.2 ( e2e0a9...eca451 )
by Masiukevich
02:18
created

SqlEventStreamStore::load()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 31
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 16
nc 1
nop 3
dl 0
loc 31
ccs 17
cts 17
cp 1
crap 2
rs 9.7333
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
use ServiceBus\Storage\Common\Transaction;
34
35
/**
36
 *
37
 */
38
final class SqlEventStreamStore implements EventStreamStore
39
{
40
    private const STREAMS_TABLE = 'event_store_stream';
41
42
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
43
44
    /**
45
     * @var DatabaseAdapter
46
     */
47
    private $adapter;
48
49
    /**
50
     * @param DatabaseAdapter $adapter
51
     */
52 8
    public function __construct(DatabaseAdapter $adapter)
53
    {
54 8
        $this->adapter = $adapter;
55 8
    }
56
57
    /**
58
     * {@inheritdoc}
59
     */
60 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
61
    {
62 7
        return $this->adapter->transactional(
63
            static function(Transaction $transaction) use ($aggregateEventStream): \Generator
64
            {
65 7
                yield from self::doSaveStream($transaction, $aggregateEventStream);
66 7
                yield from self::doSaveEvents($transaction, $aggregateEventStream);
67 7
            }
68
        );
69
    }
70
71
    /**
72
     * {@inheritdoc}
73
     */
74 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
75
    {
76 5
        return $this->adapter->transactional(
77
            static function(Transaction $transaction) use ($aggregateEventStream): \Generator
78
            {
79 5
                yield from self::doSaveEvents($transaction, $aggregateEventStream);
80 5
            }
81
        );
82
    }
83
84
    /**
85
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
86
     *
87
     * {@inheritdoc}
88
     */
89 5
    public function load(AggregateId $id, int $fromVersion = Aggregate::START_PLAYHEAD_INDEX, ?int $toVersion = null): Promise
90
    {
91 5
        $adapter = $this->adapter;
92
93
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
94 5
        return call(
95
            static function(AggregateId $id, int $fromVersion, ?int $toVersion) use ($adapter): \Generator
96
            {
97 5
                $aggregateEventStream = null;
98
99
                /** @var array<string, string>|null $streamData */
100 5
                $streamData = yield from self::doLoadStream($adapter, $id);
101
102 5
                if (null !== $streamData)
103
                {
104
                    /** @var array<int, array>|null $streamEventsData */
105 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
106 5
                        $adapter,
107 5
                        (string) $streamData['id'],
108 5
                        $fromVersion,
109 5
                        $toVersion
110
                    );
111
112 5
                    $aggregateEventStream = self::restoreEventStream($adapter, $streamData, $streamEventsData);
113
                }
114
115 5
                return $aggregateEventStream;
116 5
            },
117 5
            $id,
118 5
            $fromVersion,
119 5
            $toVersion
120
        );
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     */
126
    public function close(AggregateId $id): Promise
127
    {
128
        $adapter = $this->adapter;
129
130
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
131
        return call(
132
            static function(AggregateId $id) use ($adapter): \Generator
133
            {
134
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
135
                    ->where(equalsCriteria('id', $id->toString()))
136
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
137
138
                $compiledQuery = $updateQuery->compile();
139
140
                /**
141
                 * @psalm-suppress TooManyTemplateParams Wrong Promise template
142
                 * @psalm-suppress MixedTypeCoercion Invalid params() docblock
143
                 */
144
                yield $adapter->execute($compiledQuery->sql(), $compiledQuery->params());
145
            },
146
            $id
147
        );
148
    }
149
150
    /**
151
     * {@inheritdoc}
152
     */
153 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
154
    {
155 4
        $adapter = $this->adapter;
156
157
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
158 4
        return call(
159
            static function(AggregateId $id, int $toVersion, bool $force) use ($adapter): \Generator
160
            {
161
                /** @var array<string, string>|null $streamData */
162 4
                $streamData = yield from self::doLoadStream($adapter, $id);
163
164 4
                if (null === $streamData)
165
                {
166 1
                    throw EventStreamDoesNotExist::create($id);
167
                }
168
169
                /** @var string $streamId */
170 3
                $streamId = $streamData['id'];
171
172
                try
173
                {
174 3
                    yield $adapter->transactional(
175
                        static function(Transaction $transaction) use ($force, $streamId, $toVersion): \Generator
176
                        {
177 3
                            true === $force
178 1
                                ? yield from self::doDeleteTailEvents($transaction, $streamId, $toVersion)
179 2
                                : yield from self::doSkipEvents($transaction, $streamId, $toVersion);
180
181
                            /** restore soft deleted events */
182 3
                            yield from self::doRestoreEvents($transaction, $streamId, $toVersion);
183 3
                        }
184
                    );
185
                }
186 1
                catch (UniqueConstraintViolationCheckFailed $exception)
187
                {
188 1
                    throw new EventStreamIntegrityCheckFailed(
189 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
190 1
                        (int) $exception->getCode(),
191 1
                        $exception
192
                    );
193
                }
194 4
            },
195 4
            $id,
196 4
            $toVersion,
197 4
            $force
198
        );
199
    }
200
201
    /**
202
     * @param QueryExecutor              $queryExecutor
203
     * @param StoredAggregateEventStream $eventsStream
204
     *
205
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
206
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
207
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
208
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
209
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
210
     *
211
     * @return \Generator
212
     */
213 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
214
    {
215 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
216 7
            'id'               => $eventsStream->aggregateId,
217 7
            'identifier_class' => $eventsStream->aggregateIdClass,
218 7
            'aggregate_class'  => $eventsStream->aggregateClass,
219 7
            'created_at'       => $eventsStream->createdAt,
220 7
            'closed_at'        => $eventsStream->closedAt,
221
        ]);
222
223 7
        $compiledQuery = $insertQuery->compile();
224
225
        /**
226
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
227
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
228
         *
229
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
230
         */
231 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
232
233 7
        unset($insertQuery, $compiledQuery, $resultSet);
234 7
    }
235
236
    /**
237
     * Saving events in stream.
238
     *
239
     * @param QueryExecutor              $queryExecutor
240
     * @param StoredAggregateEventStream $eventsStream
241
     *
242
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
243
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
244
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
245
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
246
     *
247
     * @return \Generator
248
     */
249 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
250
    {
251 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
252
253 7
        if (0 !== $eventsCount)
254
        {
255
            /**
256
             * @psalm-suppress TooManyTemplateParams Wrong Promise template
257
             * @psalm-suppress MixedTypeCoercion Invalid params() docblock
258
             */
259 7
            yield $queryExecutor->execute(
260 7
                self::createSaveEventQueryString($eventsCount),
261 7
                self::collectSaveEventQueryParameters($eventsStream)
262
            );
263
        }
264 7
    }
265
266
    /**
267
     * @noinspection PhpDocMissingThrowsInspection
268
     *
269
     * @param QueryExecutor $queryExecutor
270
     * @param AggregateId   $id
271
     *
272
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
273
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
274
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
275
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
276
     *
277
     * @return \Generator
278
     */
279 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
280
    {
281
        /** @noinspection PhpUnhandledExceptionInspection */
282 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
283 6
            ->where(equalsCriteria('id', $id->toString()))
284 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
285
286
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
287 6
        $compiledQuery = $selectQuery->compile();
288
289
        /**
290
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
291
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
292
         *
293
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
294
         */
295
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
296 6
            yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
297
298
        /**
299
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
300
         * @psalm-var      array<string, string>|null $data
301
         *
302
         * @var array $data
303
         */
304
        $data = /** @noinspection PhpUnhandledExceptionInspection */
305 6
            yield fetchOne($resultSet);
306
307 6
        return $data;
308
    }
309
310
    /**
311
     * @param QueryExecutor $queryExecutor
312
     * @param string        $streamId
313
     * @param int           $fromVersion
314
     * @param int|null      $toVersion
315
     *
316
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
317
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
318
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
319
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
320
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
321
     *
322
     * @return \Generator
323
     */
324 5
    private static function doLoadStreamEvents(
325
        QueryExecutor $queryExecutor,
326
        string $streamId,
327
        int $fromVersion,
328
        ?int $toVersion
329
    ): \Generator {
330
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
331 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
332 5
            ->where(field('stream_id')->eq($streamId))
333 5
            ->andWhere(field('playhead')->gte($fromVersion))
334 5
            ->andWhere(field('canceled_at')->isNull());
335
336 5
        if (null !== $toVersion && $fromVersion < $toVersion)
337
        {
338
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
339
        }
340
341
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
342 5
        $compiledQuery = $selectQuery->compile();
343
344
        /**
345
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
346
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
347
         *
348
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
349
         */
350 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
351
352
        /**
353
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
354
         * @psalm-var      array<int, array>|null $result
355
         *
356
         * @var array $result
357
         */
358 5
        $result = yield fetchAll($resultSet);
359
360 5
        return $result;
361
    }
362
363
    /**
364
     * @noinspection PhpDocMissingThrowsInspection
365
     *
366
     * Complete removal of the "tail" events from the database
367
     *
368
     * @param QueryExecutor $executor
369
     * @param string        $streamId
370
     * @param int           $toVersion
371
     *
372
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
373
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
374
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
375
     *
376
     * @return \Generator
377
     */
378 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
379
    {
380
        /** @noinspection PhpUnhandledExceptionInspection */
381 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
382 1
            ->where(equalsCriteria('stream_id', $streamId))
383 1
            ->andWhere(field('playhead')->gt($toVersion));
384
385 1
        $compiledQuery = $deleteQuery->compile();
386
387
        /**
388
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
389
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
390
         *
391
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
392
         */
393
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
394 1
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
395
396 1
        unset($deleteQuery, $compiledQuery, $resultSet);
397 1
    }
398
399
    /**
400
     * @noinspection PhpDocMissingThrowsInspection
401
     *
402
     * Soft deletion of events following the specified version
403
     *
404
     * @param QueryExecutor $executor
405
     * @param string        $streamId
406
     * @param int           $toVersion
407
     *
408
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
409
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
410
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
411
     *
412
     * @return \Generator
413
     */
414 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
415
    {
416
        /** @noinspection PhpUnhandledExceptionInspection */
417 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
418 2
            ->where(equalsCriteria('stream_id', $streamId))
419 2
            ->andWhere(field('playhead')->gt($toVersion));
420
421
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
422 2
        $compiledQuery = $updateQuery->compile();
423
424
        /**
425
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
426
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
427
         *
428
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
429
         */
430
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
431 2
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
432
433 2
        unset($updateQuery, $compiledQuery, $resultSet);
434 2
    }
435
436
    /**
437
     * @noinspection PhpDocMissingThrowsInspection
438
     *
439
     * @param QueryExecutor $executor
440
     * @param string        $streamId
441
     * @param int           $toVersion
442
     *
443
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
444
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
445
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
446
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
447
     *
448
     * @return \Generator
449
     */
450 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
451
    {
452
        /** @noinspection PhpUnhandledExceptionInspection */
453 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
454 3
            ->where(equalsCriteria('stream_id', $streamId))
455 3
            ->andWhere(field('playhead')->lte($toVersion));
456
457 3
        $compiledQuery = $updateQuery->compile();
458
459
        /**
460
         * @psalm-suppress TooManyTemplateParams Wrong Promise template
461
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
462
         *
463
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
464
         */
465 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
466
467 3
        unset($updateQuery, $compiledQuery, $resultSet);
468 3
    }
469
470
    /**
471
     * Transform events stream array data to stored representation.
472
     *
473
     * @psalm-param array<string, string>  $streamData
474
     * @psalm-param array<int, array>|null $streamEventsData
475
     *
476
     * @param DatabaseAdapter $adapter
477
     * @param array           $streamData
478
     * @param array           $streamEventsData
479
     *
480
     * @throws \LogicException
481
     *
482
     * @return StoredAggregateEventStream
483
     */
484 5
    private static function restoreEventStream(
485
        DatabaseAdapter $adapter,
486
        array $streamData,
487
        ?array $streamEventsData
488
    ): StoredAggregateEventStream {
489
        /** @psalm-var array{
490
         *     id:string,
491
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
492
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
493
         *     created_at:string,
494
         *     closed_at:string|null
495
         * } $streamData
496
         */
497
498 5
        return StoredAggregateEventStream::create(
499 5
            $streamData['id'],
500 5
            $streamData['identifier_class'],
501 5
            $streamData['aggregate_class'],
502 5
            self::restoreEvents($adapter, $streamEventsData),
503 5
            $streamData['created_at'],
504 5
            $streamData['closed_at']
505
        );
506
    }
507
508
    /**
509
     * Restore events from rows.
510
     *
511
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
512
     *
513
     * @param BinaryDataDecoder $decoder
514
     * @param array|null        $eventsData
515
     *
516
     * @throws \LogicException
517
     *
518
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
519
     */
520 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
521
    {
522 5
        $events = [];
523
524 5
        if (true === \is_array($eventsData) && 0 !== \count($eventsData))
525
        {
526
            /**
527
             * @psalm-var array{
528
             *   id:string,
529
             *   playhead:string,
530
             *   payload:string,
531
             *   event_class:class-string,
532
             *   occured_at:string,
533
             *   recorded_at:string
534
             * } $eventRow
535
             */
536 4
            foreach ($eventsData as $eventRow)
537
            {
538 4
                $playhead = (int) $eventRow['playhead'];
539
540 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
541
542 4
                if (true === \is_string($payload))
543
                {
544 4
                    $events[$playhead] = StoredAggregateEvent::restore(
545 4
                        $eventRow['id'],
546 4
                        $playhead,
547 4
                        $payload,
548 4
                        $eventRow['event_class'],
549 4
                        $eventRow['occured_at'],
550 4
                        $eventRow['recorded_at']
551
                    );
552
553 4
                    continue;
554
                }
555
556
                throw new \LogicException(
557
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
558
                );
559
            }
560
        }
561
562
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
563
564 5
        return $events;
565
    }
566
567
    /**
568
     * Create a sql query to store events.
569
     *
570
     * @param int $eventsCount
571
     *
572
     * @return string
573
     */
574 7
    private static function createSaveEventQueryString(int $eventsCount): string
575
    {
576 7
        return \sprintf(
577
        /** @lang text */
578 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
579 7
            self::STREAM_EVENTS_TABLE,
580 7
            \implode(
581 7
                ', ',
582 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
583
            )
584
        );
585
    }
586
587
    /**
588
     * Gathering parameters for sending to a request to save events.
589
     *
590
     * @param StoredAggregateEventStream $eventsStream
591
     *
592
     * @return array
593
     */
594 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
595
    {
596 7
        $queryParameters = [];
597 7
        $rowSetIndex     = 0;
598
599
        /** @psalm-var array<int, string|int|float|null> $parameters */
600 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
601
        {
602 7
            foreach ($parameters as $parameter)
603
            {
604 7
                $queryParameters[$rowSetIndex] = $parameter;
605
606 7
                $rowSetIndex++;
607
            }
608
        }
609
610 7
        return $queryParameters;
611
    }
612
613
    /**
614
     * Prepare events to insert.
615
     *
616
     * @psalm-return array<int, array<int, string|int>>
617
     *
618
     * @param StoredAggregateEventStream $eventsStream
619
     *
620
     * @return array
621
     */
622 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
623
    {
624 7
        $eventsRows = [];
625
626 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
627
        {
628
            /** @var StoredAggregateEvent $storedAggregateEvent */
629
            $row = [
630 7
                $storedAggregateEvent->eventId,
631 7
                $eventsStream->aggregateId,
632 7
                $storedAggregateEvent->playheadPosition,
633 7
                $storedAggregateEvent->eventClass,
634 7
                \base64_encode($storedAggregateEvent->eventData),
635 7
                $storedAggregateEvent->occuredAt,
636 7
                \date('Y-m-d H:i:s'),
637
            ];
638
639 7
            $eventsRows[] = $row;
640
        }
641
642 7
        return $eventsRows;
643
    }
644
}
645