Completed
Push — v3.3 ( a2928b...399132 )
by Masiukevich
02:26
created

SqlEventStreamStore::prepareEventRows()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
eloc 12
c 2
b 0
f 0
nc 2
nop 1
dl 0
loc 21
ccs 12
cts 12
cp 1
crap 2
rs 9.8666
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE = 'event_store_stream';
40
41
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
42
43
    /**
44
     * @var DatabaseAdapter
45
     */
46
    private $adapter;
47
48
    /**
49
     * @param DatabaseAdapter $adapter
50
     */
51 8
    public function __construct(DatabaseAdapter $adapter)
52
    {
53 8
        $this->adapter = $adapter;
54 8
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
60
    {
61 7
        return $this->adapter->transactional(
62
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
63
            {
64 7
                yield from self::doSaveStream($queryExecutor, $aggregateEventStream);
65 7
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
66 7
            }
67
        );
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
74
    {
75 5
        return $this->adapter->transactional(
76
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
77
            {
78 5
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
79 5
            }
80
        );
81
    }
82
83
    /**
84
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
85
     *
86
     * {@inheritdoc}
87
     */
88 5
    public function load(AggregateId $id, int $fromVersion = Aggregate::START_PLAYHEAD_INDEX, ?int $toVersion = null): Promise
89
    {
90 5
        $adapter = $this->adapter;
91
92
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
93 5
        return call(
94
            static function(AggregateId $id, int $fromVersion, ?int $toVersion) use ($adapter): \Generator
95
            {
96 5
                $aggregateEventStream = null;
97
98
                /** @var array<string, string>|null $streamData */
99 5
                $streamData = yield from self::doLoadStream($adapter, $id);
100
101 5
                if (null !== $streamData)
102
                {
103
                    /** @var array<int, array>|null $streamEventsData */
104 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
105 5
                        $adapter,
106 5
                        (string) $streamData['id'],
107
                        $fromVersion,
108
                        $toVersion
109
                    );
110
111 5
                    $aggregateEventStream = self::restoreEventStream($adapter, $streamData, $streamEventsData);
112
                }
113
114 5
                return $aggregateEventStream;
115 5
            },
116 5
            $id,
117 5
            $fromVersion,
118 5
            $toVersion
119
        );
120
    }
121
122
    /**
123
     * {@inheritdoc}
124
     */
125
    public function close(AggregateId $id): Promise
126
    {
127
        $adapter = $this->adapter;
128
129
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
130
        return call(
131
            static function(AggregateId $id) use ($adapter): \Generator
132
            {
133
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
134
                    ->where(equalsCriteria('id', $id->toString()))
135
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
136
137
                $compiledQuery = $updateQuery->compile();
138
139
                /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
140
                yield $adapter->execute($compiledQuery->sql(), $compiledQuery->params());
141
            },
142
            $id
143
        );
144
    }
145
146
    /**
147
     * {@inheritdoc}
148
     */
149 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
150
    {
151 4
        $adapter = $this->adapter;
152
153
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
154 4
        return call(
155
            static function(AggregateId $id, int $toVersion, bool $force) use ($adapter): \Generator
156
            {
157
                /** @var array<string, string>|null $streamData */
158 4
                $streamData = yield from self::doLoadStream($adapter, $id);
159
160 4
                if (null === $streamData)
161
                {
162 1
                    throw EventStreamDoesNotExist::create($id);
163
                }
164
165
                /** @var string $streamId */
166 3
                $streamId = $streamData['id'];
167
168
                try
169
                {
170 3
                    yield $adapter->transactional(
171
                        static function(QueryExecutor $queryExecutor) use ($force, $streamId, $toVersion): \Generator
172
                        {
173 3
                            true === $force
174 1
                                ? yield from self::doDeleteTailEvents($queryExecutor, $streamId, $toVersion)
175 2
                                : yield from self::doSkipEvents($queryExecutor, $streamId, $toVersion);
176
177
                            /** restore soft deleted events */
178 3
                            yield from self::doRestoreEvents($queryExecutor, $streamId, $toVersion);
179 3
                        }
180
                    );
181
                }
182 1
                catch (UniqueConstraintViolationCheckFailed $exception)
183
                {
184 1
                    throw new EventStreamIntegrityCheckFailed(
185 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
186 1
                        (int) $exception->getCode(),
187
                        $exception
188
                    );
189
                }
190 4
            },
191 4
            $id,
192 4
            $toVersion,
193 4
            $force
194
        );
195
    }
196
197
    /**
198
     * @param QueryExecutor              $queryExecutor
199
     * @param StoredAggregateEventStream $eventsStream
200
     *
201
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
202
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
203
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
204
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
205
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
206
     *
207
     * @return \Generator
208
     */
209 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
210
    {
211 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
212 7
            'id'               => $eventsStream->aggregateId,
213 7
            'identifier_class' => $eventsStream->aggregateIdClass,
214 7
            'aggregate_class'  => $eventsStream->aggregateClass,
215 7
            'created_at'       => $eventsStream->createdAt,
216 7
            'closed_at'        => $eventsStream->closedAt,
217
        ]);
218
219 7
        $compiledQuery = $insertQuery->compile();
220
221
        /**
222
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
223
         *
224
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
225
         */
226 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
227
228 7
        unset($insertQuery, $compiledQuery, $resultSet);
229 7
    }
230
231
    /**
232
     * Saving events in stream.
233
     *
234
     * @param QueryExecutor              $queryExecutor
235
     * @param StoredAggregateEventStream $eventsStream
236
     *
237
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
238
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
239
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
240
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
241
     *
242
     * @return \Generator
243
     */
244 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
245
    {
246 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
247
248 7
        if (0 !== $eventsCount)
249
        {
250
            /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
251 7
            yield $queryExecutor->execute(
252 7
                self::createSaveEventQueryString($eventsCount),
253 7
                self::collectSaveEventQueryParameters($eventsStream)
254
            );
255
        }
256 7
    }
257
258
    /**
259
     * @noinspection PhpDocMissingThrowsInspection
260
     *
261
     * @param QueryExecutor $queryExecutor
262
     * @param AggregateId   $id
263
     *
264
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
265
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
266
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
267
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
268
     *
269
     * @return \Generator
270
     */
271 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
272
    {
273
        /** @noinspection PhpUnhandledExceptionInspection */
274 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
275 6
            ->where(equalsCriteria('id', $id->toString()))
276 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
277
278
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
279 6
        $compiledQuery = $selectQuery->compile();
280
281
        /**
282
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
283
         *
284
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
285
         */
286
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
287 6
            yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
288
289
        /**
290
         * @psalm-var      array<string, string>|null $data
291
         *
292
         * @var array $data
293
         */
294
        $data = /** @noinspection PhpUnhandledExceptionInspection */
295 6
            yield fetchOne($resultSet);
296
297 6
        return $data;
298
    }
299
300
    /**
301
     * @param QueryExecutor $queryExecutor
302
     * @param string        $streamId
303
     * @param int           $fromVersion
304
     * @param int|null      $toVersion
305
     *
306
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
307
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
308
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
309
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
310
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
311
     *
312
     * @return \Generator
313
     */
314 5
    private static function doLoadStreamEvents(
315
        QueryExecutor $queryExecutor,
316
        string $streamId,
317
        int $fromVersion,
318
        ?int $toVersion
319
    ): \Generator {
320
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
321 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
322 5
            ->where(field('stream_id')->eq($streamId))
323 5
            ->andWhere(field('playhead')->gte($fromVersion))
324 5
            ->andWhere(field('canceled_at')->isNull());
325
326 5
        if (null !== $toVersion && $fromVersion < $toVersion)
327
        {
328
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
329
        }
330
331
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
332 5
        $compiledQuery = $selectQuery->compile();
333
334
        /**
335
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
336
         *
337
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
338
         */
339 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
340
341
        /**
342
         * @psalm-var      array<int, array>|null $result
343
         *
344
         * @var array $result
345
         */
346 5
        $result = yield fetchAll($resultSet);
347
348 5
        return $result;
349
    }
350
351
    /**
352
     * @noinspection PhpDocMissingThrowsInspection
353
     *
354
     * Complete removal of the "tail" events from the database
355
     *
356
     * @param QueryExecutor $executor
357
     * @param string        $streamId
358
     * @param int           $toVersion
359
     *
360
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
361
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
362
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
363
     *
364
     * @return \Generator
365
     */
366 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
367
    {
368
        /** @noinspection PhpUnhandledExceptionInspection */
369 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
370 1
            ->where(equalsCriteria('stream_id', $streamId))
371 1
            ->andWhere(field('playhead')->gt($toVersion));
372
373 1
        $compiledQuery = $deleteQuery->compile();
374
375
        /**
376
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
377
         *
378
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
379
         */
380
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
381 1
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
382
383 1
        unset($deleteQuery, $compiledQuery, $resultSet);
384 1
    }
385
386
    /**
387
     * @noinspection PhpDocMissingThrowsInspection
388
     *
389
     * Soft deletion of events following the specified version
390
     *
391
     * @param QueryExecutor $executor
392
     * @param string        $streamId
393
     * @param int           $toVersion
394
     *
395
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
396
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
397
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
398
     *
399
     * @return \Generator
400
     */
401 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
402
    {
403
        /** @noinspection PhpUnhandledExceptionInspection */
404 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
405 2
            ->where(equalsCriteria('stream_id', $streamId))
406 2
            ->andWhere(field('playhead')->gt($toVersion));
407
408
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
409 2
        $compiledQuery = $updateQuery->compile();
410
411
        /**
412
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
413
         *
414
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
415
         */
416
        $resultSet = /** @noinspection PhpUnhandledExceptionInspection */
417 2
            yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
418
419 2
        unset($updateQuery, $compiledQuery, $resultSet);
420 2
    }
421
422
    /**
423
     * @noinspection PhpDocMissingThrowsInspection
424
     *
425
     * @param QueryExecutor $executor
426
     * @param string        $streamId
427
     * @param int           $toVersion
428
     *
429
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
430
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
431
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
432
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
433
     *
434
     * @return \Generator
435
     */
436 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
437
    {
438
        /** @noinspection PhpUnhandledExceptionInspection */
439 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
440 3
            ->where(equalsCriteria('stream_id', $streamId))
441 3
            ->andWhere(field('playhead')->lte($toVersion));
442
443 3
        $compiledQuery = $updateQuery->compile();
444
445
        /**
446
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
447
         *
448
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
449
         */
450 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
451
452 3
        unset($updateQuery, $compiledQuery, $resultSet);
453 3
    }
454
455
    /**
456
     * Transform events stream array data to stored representation.
457
     *
458
     * @psalm-param array<string, string>  $streamData
459
     * @psalm-param array<int, array>|null $streamEventsData
460
     *
461
     * @param DatabaseAdapter $adapter
462
     * @param array           $streamData
463
     * @param array           $streamEventsData
464
     *
465
     * @throws \LogicException
466
     *
467
     * @return StoredAggregateEventStream
468
     */
469 5
    private static function restoreEventStream(
470
        DatabaseAdapter $adapter,
471
        array $streamData,
472
        ?array $streamEventsData
473
    ): StoredAggregateEventStream {
474
        /** @psalm-var array{
475
         *     id:string,
476
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
477
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
478
         *     created_at:string,
479
         *     closed_at:string|null
480
         * } $streamData
481
         */
482
483 5
        return StoredAggregateEventStream::create(
484 5
            $streamData['id'],
485 5
            $streamData['identifier_class'],
486 5
            $streamData['aggregate_class'],
487 5
            self::restoreEvents($adapter, $streamEventsData),
488 5
            $streamData['created_at'],
489 5
            $streamData['closed_at']
490
        );
491
    }
492
493
    /**
494
     * Restore events from rows.
495
     *
496
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
497
     *
498
     * @param BinaryDataDecoder $decoder
499
     * @param array|null        $eventsData
500
     *
501
     * @throws \LogicException
502
     *
503
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
504
     */
505 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
506
    {
507 5
        $events = [];
508
509 5
        if (true === \is_array($eventsData) && 0 !== \count($eventsData))
510
        {
511
            /**
512
             * @psalm-var array{
513
             *   id:string,
514
             *   playhead:string,
515
             *   payload:string,
516
             *   event_class:class-string,
517
             *   occured_at:string,
518
             *   recorded_at:string
519
             * } $eventRow
520
             */
521 5
            foreach ($eventsData as $eventRow)
522
            {
523 5
                $playhead = (int) $eventRow['playhead'];
524
525 5
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
526
527 5
                if (true === \is_string($payload))
528
                {
529 5
                    $events[$playhead] = StoredAggregateEvent::restore(
530 5
                        $eventRow['id'],
531
                        $playhead,
532
                        $payload,
533 5
                        $eventRow['event_class'],
534 5
                        $eventRow['occured_at'],
535 5
                        $eventRow['recorded_at']
536
                    );
537
538 5
                    continue;
539
                }
540
541
                throw new \LogicException(
542
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
543
                );
544
            }
545
        }
546
547
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
548
549 5
        return $events;
550
    }
551
552
    /**
553
     * Create a sql query to store events.
554
     *
555
     * @param int $eventsCount
556
     *
557
     * @return string
558
     */
559 7
    private static function createSaveEventQueryString(int $eventsCount): string
560
    {
561 7
        return \sprintf(
562
        /** @lang text */
563 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
564 7
            self::STREAM_EVENTS_TABLE,
565 7
            \implode(
566 7
                ', ',
567 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
568
            )
569
        );
570
    }
571
572
    /**
573
     * Gathering parameters for sending to a request to save events.
574
     *
575
     * @param StoredAggregateEventStream $eventsStream
576
     *
577
     * @return array
578
     */
579 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
580
    {
581 7
        $queryParameters = [];
582 7
        $rowSetIndex     = 0;
583
584
        /** @psalm-var array<int, string|int|float|null> $parameters */
585 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
586
        {
587 7
            foreach ($parameters as $parameter)
588
            {
589 7
                $queryParameters[$rowSetIndex] = $parameter;
590
591 7
                $rowSetIndex++;
592
            }
593
        }
594
595 7
        return $queryParameters;
596
    }
597
598
    /**
599
     * Prepare events to insert.
600
     *
601
     * @psalm-return array<int, array<int, string|int>>
602
     *
603
     * @param StoredAggregateEventStream $eventsStream
604
     *
605
     * @return array
606
     */
607 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
608
    {
609 7
        $eventsRows = [];
610
611 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
612
        {
613
            /** @var StoredAggregateEvent $storedAggregateEvent */
614
            $row = [
615 7
                $storedAggregateEvent->eventId,
616 7
                $eventsStream->aggregateId,
617 7
                $storedAggregateEvent->playheadPosition,
618 7
                $storedAggregateEvent->eventClass,
619 7
                \base64_encode($storedAggregateEvent->eventData),
620 7
                $storedAggregateEvent->occuredAt,
621 7
                \date('Y-m-d H:i:s'),
622
            ];
623
624 7
            $eventsRows[] = $row;
625
        }
626
627 7
        return $eventsRows;
628
    }
629
}
630