Completed
Push — v4.0 ( ff495d...e0bacf )
by Masiukevich
01:44
created

SqlEventStreamStore::createSaveEventQueryString()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 6
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 9
ccs 7
cts 7
cp 1
crap 1
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE = 'event_store_stream';
40
41
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
42
43
    private DatabaseAdapter $adapter;
44
45 8
    public function __construct(DatabaseAdapter $adapter)
46
    {
47 8
        $this->adapter = $adapter;
48 8
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
54
    {
55 7
        return $this->adapter->transactional(
56
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
57
            {
58 7
                yield from self::doSaveStream($queryExecutor, $aggregateEventStream);
59 7
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
60 7
            }
61
        );
62
    }
63
64
    /**
65
     * {@inheritdoc}
66
     */
67 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
68
    {
69 5
        return $this->adapter->transactional(
70
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
71
            {
72 5
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
73 5
            }
74
        );
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80 5
    public function load(
81
        AggregateId $id,
82
        int $fromVersion = Aggregate::START_PLAYHEAD_INDEX,
83
        ?int $toVersion = null
84
    ): Promise {
85 5
        $adapter = $this->adapter;
86
87 5
        return call(
88
            static function(AggregateId $id, int $fromVersion, ?int $toVersion) use ($adapter): \Generator
89
            {
90 5
                $aggregateEventStream = null;
91
92
                /** @var array<string, string>|null $streamData */
93 5
                $streamData = yield from self::doLoadStream($adapter, $id);
94
95 5
                if (null !== $streamData)
96
                {
97
                    /** @var array<int, array>|null $streamEventsData */
98 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
99 5
                        $adapter,
100 5
                        (string) $streamData['id'],
101
                        $fromVersion,
102
                        $toVersion
103
                    );
104
105 5
                    $aggregateEventStream = self::restoreEventStream($adapter, $streamData, $streamEventsData);
106
                }
107
108 5
                return $aggregateEventStream;
109 5
            },
110 5
            $id,
111 5
            $fromVersion,
112 5
            $toVersion
113
        );
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119
    public function close(AggregateId $id): Promise
120
    {
121
        $adapter = $this->adapter;
122
123
        return call(
124
            static function(AggregateId $id) use ($adapter): \Generator
125
            {
126
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
127
                    ->where(equalsCriteria('id', $id->toString()))
128
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
129
130
                $compiledQuery = $updateQuery->compile();
131
132
                /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
133
                yield $adapter->execute($compiledQuery->sql(), $compiledQuery->params());
134
            },
135
            $id
136
        );
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
143
    {
144 4
        $adapter = $this->adapter;
145
146 4
        return call(
147
            static function(AggregateId $id, int $toVersion, bool $force) use ($adapter): \Generator
148
            {
149
                /** @var array<string, string>|null $streamData */
150 4
                $streamData = yield from self::doLoadStream($adapter, $id);
151
152 4
                if (null === $streamData)
153
                {
154 1
                    throw EventStreamDoesNotExist::create($id);
155
                }
156
157
                /** @var string $streamId */
158 3
                $streamId = $streamData['id'];
159
160
                try
161
                {
162 3
                    yield $adapter->transactional(
163
                        static function(QueryExecutor $queryExecutor) use ($force, $streamId, $toVersion): \Generator
164
                        {
165 3
                            true === $force
166 1
                                ? yield from self::doDeleteTailEvents($queryExecutor, $streamId, $toVersion)
167 2
                                : yield from self::doSkipEvents($queryExecutor, $streamId, $toVersion);
168
169
                            /** restore soft deleted events */
170 3
                            yield from self::doRestoreEvents($queryExecutor, $streamId, $toVersion);
171 3
                        }
172
                    );
173
                }
174 1
                catch (UniqueConstraintViolationCheckFailed $exception)
175
                {
176 1
                    throw new EventStreamIntegrityCheckFailed(
177 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
178 1
                        (int) $exception->getCode(),
179
                        $exception
180
                    );
181
                }
182 4
            },
183 4
            $id,
184 4
            $toVersion,
185 4
            $force
186
        );
187
    }
188
189
    /**
190
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
191
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
192
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
193
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
194
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
195
     */
196 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
197
    {
198 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
199 7
            'id'               => $eventsStream->aggregateId,
200 7
            'identifier_class' => $eventsStream->aggregateIdClass,
201 7
            'aggregate_class'  => $eventsStream->aggregateClass,
202 7
            'created_at'       => $eventsStream->createdAt,
203 7
            'closed_at'        => $eventsStream->closedAt,
204
        ]);
205
206 7
        $compiledQuery = $insertQuery->compile();
207
208
        /**
209
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
210
         *
211
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
212
         */
213 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
214
215 7
        unset($insertQuery, $compiledQuery, $resultSet);
216 7
    }
217
218
    /**
219
     * Saving events in stream.
220
     *
221
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
222
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
223
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
224
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
225
     */
226 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
227
    {
228 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
229
230 7
        if (0 !== $eventsCount)
231
        {
232
            /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
233 7
            yield $queryExecutor->execute(
234 7
                self::createSaveEventQueryString($eventsCount),
235 7
                self::collectSaveEventQueryParameters($eventsStream)
236
            );
237
        }
238 7
    }
239
240
    /**
241
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
242
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
243
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
244
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
245
     */
246 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
247
    {
248 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
249 6
            ->where(equalsCriteria('id', $id->toString()))
250 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
251
252
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
253 6
        $compiledQuery = $selectQuery->compile();
254
255
        /**
256
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
257
         *
258
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
259
         */
260 6
        $resultSet =  yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
261
262
        /**
263
         * @psalm-var      array<string, string>|null $data
264
         *
265
         * @var array $data
266
         */
267 6
        $data = yield fetchOne($resultSet);
268
269 6
        return $data;
270
    }
271
272
    /**
273
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
274
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
275
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
276
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
277
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
278
     */
279 5
    private static function doLoadStreamEvents(
280
        QueryExecutor $queryExecutor,
281
        string $streamId,
282
        int $fromVersion,
283
        ?int $toVersion
284
    ): \Generator {
285
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
286 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
287 5
            ->where(field('stream_id')->eq($streamId))
288 5
            ->andWhere(field('playhead')->gte($fromVersion))
289 5
            ->andWhere(field('canceled_at')->isNull());
290
291 5
        if (null !== $toVersion && $fromVersion < $toVersion)
292
        {
293
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
294
        }
295
296
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
297 5
        $compiledQuery = $selectQuery->compile();
298
299
        /**
300
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
301
         *
302
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
303
         */
304 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
305
306
        /**
307
         * @psalm-var      array<int, array>|null $result
308
         *
309
         * @var array $result
310
         */
311 5
        $result = yield fetchAll($resultSet);
312
313 5
        return $result;
314
    }
315
316
    /**
317
     * Complete removal of the "tail" events from the database.
318
     *
319
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
320
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
321
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
322
     */
323 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
324
    {
325 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
326 1
            ->where(equalsCriteria('stream_id', $streamId))
327 1
            ->andWhere(field('playhead')->gt($toVersion));
328
329 1
        $compiledQuery = $deleteQuery->compile();
330
331
        /**
332
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
333
         *
334
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
335
         */
336 1
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
337
338 1
        unset($deleteQuery, $compiledQuery, $resultSet);
339 1
    }
340
341
    /**
342
     * Soft deletion of events following the specified version.
343
     *
344
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
345
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
346
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
347
     */
348 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
349
    {
350 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
351 2
            ->where(equalsCriteria('stream_id', $streamId))
352 2
            ->andWhere(field('playhead')->gt($toVersion));
353
354
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
355 2
        $compiledQuery = $updateQuery->compile();
356
357
        /**
358
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
359
         *
360
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
361
         */
362 2
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
363
364 2
        unset($updateQuery, $compiledQuery, $resultSet);
365 2
    }
366
367
    /**
368
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
369
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
370
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
371
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
372
     */
373 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
374
    {
375 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
376 3
            ->where(equalsCriteria('stream_id', $streamId))
377 3
            ->andWhere(field('playhead')->lte($toVersion));
378
379 3
        $compiledQuery = $updateQuery->compile();
380
381
        /**
382
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
383
         *
384
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
385
         */
386 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
387
388 3
        unset($updateQuery, $compiledQuery, $resultSet);
389 3
    }
390
391
    /**
392
     * Transform events stream array data to stored representation.
393
     *
394
     * @psalm-param array<string, string>  $streamData
395
     * @psalm-param array<int, array>|null $streamEventsData
396
     *
397
     * @throws \LogicException
398
     */
399 5
    private static function restoreEventStream(
400
        DatabaseAdapter $adapter,
401
        array $streamData,
402
        ?array $streamEventsData
403
    ): StoredAggregateEventStream {
404
        /** @psalm-var array{
405
         *     id:string,
406
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
407
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
408
         *     created_at:string,
409
         *     closed_at:string|null
410
         * } $streamData
411
         */
412
413 5
        return new StoredAggregateEventStream(
414 5
            $streamData['id'],
415 5
            $streamData['identifier_class'],
416 5
            $streamData['aggregate_class'],
417 5
            self::restoreEvents($adapter, $streamEventsData),
418 5
            $streamData['created_at'],
419 5
            $streamData['closed_at']
420
        );
421
    }
422
423
    /**
424
     * Restore events from rows.
425
     *
426
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
427
     *
428
     * @throws \LogicException
429
     *
430
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
431
     */
432 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
433
    {
434 5
        $events = [];
435
436 5
        if (true === \is_array($eventsData) && 0 !== \count($eventsData))
437
        {
438
            /**
439
             * @psalm-var array{
440
             *   id:string,
441
             *   playhead:string,
442
             *   payload:string,
443
             *   event_class:class-string,
444
             *   occured_at:string,
445
             *   recorded_at:string
446
             * } $eventRow
447
             */
448 4
            foreach ($eventsData as $eventRow)
449
            {
450 4
                $playhead = (int) $eventRow['playhead'];
451
452 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
453
454 4
                if (true === \is_string($payload))
455
                {
456 4
                    $events[$playhead] = StoredAggregateEvent::restore(
457 4
                        $eventRow['id'],
458
                        $playhead,
459
                        $payload,
460 4
                        $eventRow['event_class'],
461 4
                        $eventRow['occured_at'],
462 4
                        $eventRow['recorded_at']
463
                    );
464
465 4
                    continue;
466
                }
467
468
                throw new \LogicException(
469
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
470
                );
471
            }
472
        }
473
474
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
475
476 5
        return $events;
477
    }
478
479
    /**
480
     * Create a sql query to store events.
481
     */
482 7
    private static function createSaveEventQueryString(int $eventsCount): string
483
    {
484 7
        return \sprintf(
485
        /** @lang text */
486 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
487 7
            self::STREAM_EVENTS_TABLE,
488 7
            \implode(
489 7
                ', ',
490 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
491
            )
492
        );
493
    }
494
495
    /**
496
     * Gathering parameters for sending to a request to save events.
497
     */
498 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
499
    {
500 7
        $queryParameters = [];
501 7
        $rowSetIndex     = 0;
502
503
        /** @psalm-var array<int, string|int|float|null> $parameters */
504 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
505
        {
506 7
            foreach ($parameters as $parameter)
507
            {
508 7
                $queryParameters[$rowSetIndex] = $parameter;
509
510 7
                $rowSetIndex++;
511
            }
512
        }
513
514 7
        return $queryParameters;
515
    }
516
517
    /**
518
     * Prepare events to insert.
519
     *
520
     * @psalm-return array<int, array<int, string|int>>
521
     */
522 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
523
    {
524 7
        $eventsRows = [];
525
526 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
527
        {
528
            /** @var StoredAggregateEvent $storedAggregateEvent */
529
            $row = [
530 7
                $storedAggregateEvent->eventId,
531 7
                $eventsStream->aggregateId,
532 7
                $storedAggregateEvent->playheadPosition,
533 7
                $storedAggregateEvent->eventClass,
534 7
                \base64_encode($storedAggregateEvent->eventData),
535 7
                $storedAggregateEvent->occuredAt,
536 7
                \date('Y-m-d H:i:s'),
537
            ];
538
539 7
            $eventsRows[] = $row;
540
        }
541
542 7
        return $eventsRows;
543
    }
544
}
545