SqlEventStreamStore::doSaveEvents()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
eloc 5
c 2
b 0
f 0
nc 2
nop 2
dl 0
loc 10
ccs 6
cts 6
cp 1
crap 2
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Common\now;
18
use function ServiceBus\Storage\Sql\deleteQuery;
19
use function ServiceBus\Storage\Sql\equalsCriteria;
20
use function ServiceBus\Storage\Sql\fetchAll;
21
use function ServiceBus\Storage\Sql\fetchOne;
22
use function ServiceBus\Storage\Sql\insertQuery;
23
use function ServiceBus\Storage\Sql\selectQuery;
24
use function ServiceBus\Storage\Sql\updateQuery;
25
use Amp\Promise;
26
use ServiceBus\EventSourcing\Aggregate;
27
use ServiceBus\EventSourcing\AggregateId;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
29
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
30
use ServiceBus\Storage\Common\BinaryDataDecoder;
31
use ServiceBus\Storage\Common\DatabaseAdapter;
32
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
33
use ServiceBus\Storage\Common\QueryExecutor;
34
35
/**
36
 *
37
 */
38
final class SqlEventStreamStore implements EventStreamStore
39
{
40
    private const STREAMS_TABLE = 'event_store_stream';
41
42
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
43
44
    /** @var DatabaseAdapter */
45
    private $adapter;
46
47 14
    public function __construct(DatabaseAdapter $adapter)
48
    {
49 14
        $this->adapter = $adapter;
50 14
    }
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
56
    {
57
        /** @psalm-suppress MixedArgumentTypeCoercion */
58 7
        return $this->adapter->transactional(
59
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
60
            {
61 7
                yield from self::doSaveStream($queryExecutor, $aggregateEventStream);
62 7
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
63 7
            }
64
        );
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70 5
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
71
    {
72
        /** @psalm-suppress MixedArgumentTypeCoercion */
73 5
        return $this->adapter->transactional(
74
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
75
            {
76 5
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
77 5
            }
78
        );
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84 5
    public function load(
85
        AggregateId $id,
86
        int $fromVersion = Aggregate::START_PLAYHEAD_INDEX,
87
        ?int $toVersion = null
88
    ): Promise {
89 5
        return call(
90
            function() use ($id, $fromVersion, $toVersion): \Generator
91
            {
92 5
                $aggregateEventStream = null;
93
94
                /** @var array<string, string>|null $streamData */
95 5
                $streamData = yield from self::doLoadStream($this->adapter, $id);
96
97 5
                if (null !== $streamData)
98
                {
99
                    /** @var array<int, array>|null $streamEventsData */
100 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
101 5
                        $this->adapter,
102 5
                        (string) $streamData['id'],
103
                        $fromVersion,
104
                        $toVersion
105
                    );
106
107 5
                    $aggregateEventStream = self::restoreEventStream($this->adapter, $streamData, $streamEventsData);
108
                }
109
110 5
                return $aggregateEventStream;
111 5
            }
112
        );
113
    }
114
115
    /**
116
     * {@inheritdoc}
117
     */
118
    public function close(AggregateId $id): Promise
119
    {
120
        return call(
121
            function() use ($id): \Generator
122
            {
123
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
124
                    ->where(equalsCriteria('id', $id->toString()))
125
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
126
127
                $compiledQuery = $updateQuery->compile();
128
129
                /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
130
                yield $this->adapter->execute($compiledQuery->sql(), $compiledQuery->params());
131
            }
132
        );
133
    }
134
135
    /**
136
     * {@inheritdoc}
137
     */
138 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
139
    {
140 4
        return call(
141
            function() use ($id, $toVersion, $force): \Generator
142
            {
143
                /** @var array<string, string>|null $streamData */
144 4
                $streamData = yield from self::doLoadStream($this->adapter, $id);
145
146 4
                if (null === $streamData)
147
                {
148 1
                    throw EventStreamDoesNotExist::create($id);
149
                }
150
151
                /** @var string $streamId */
152 3
                $streamId = $streamData['id'];
153
154
                try
155
                {
156
                    /** @psalm-suppress MixedArgumentTypeCoercion */
157 3
                    yield $this->adapter->transactional(
158
                        static function(QueryExecutor $queryExecutor) use ($force, $streamId, $toVersion): \Generator
159
                        {
160 3
                            $force
161 1
                                ? yield from self::doDeleteTailEvents($queryExecutor, $streamId, $toVersion)
162 2
                                : yield from self::doSkipEvents($queryExecutor, $streamId, $toVersion);
163
164
                            /** restore soft deleted events */
165 3
                            yield from self::doRestoreEvents($queryExecutor, $streamId, $toVersion);
166 3
                        }
167
                    );
168
                }
169 1
                catch (UniqueConstraintViolationCheckFailed $exception)
170
                {
171 1
                    throw new EventStreamIntegrityCheckFailed(
172 1
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
173 1
                        (int) $exception->getCode(),
174
                        $exception
175
                    );
176
                }
177 4
            }
178
        );
179
    }
180
181
    /**
182
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
183
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
184
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
185
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
186
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
187
     */
188 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
189
    {
190 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
191 7
            'id'               => $eventsStream->aggregateId,
192 7
            'identifier_class' => $eventsStream->aggregateIdClass,
193 7
            'aggregate_class'  => $eventsStream->aggregateClass,
194 7
            'created_at'       => $eventsStream->createdAt,
195 7
            'closed_at'        => $eventsStream->closedAt,
196
        ]);
197
198 7
        $compiledQuery = $insertQuery->compile();
199
200
        /**
201
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
202
         *
203
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
204
         */
205 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
206
207 7
        unset($insertQuery, $compiledQuery, $resultSet);
208 7
    }
209
210
    /**
211
     * Saving events in stream.
212
     *
213
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
214
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
215
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
216
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
217
     */
218 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
219
    {
220 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
221
222 7
        if (0 !== $eventsCount)
223
        {
224
            /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
225 7
            yield $queryExecutor->execute(
226 7
                self::createSaveEventQueryString($eventsCount),
227 7
                self::collectSaveEventQueryParameters($eventsStream)
228
            );
229
        }
230 7
    }
231
232
    /**
233
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
234
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
235
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
236
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
237
     */
238 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
239
    {
240 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
241 6
            ->where(equalsCriteria('id', $id->toString()))
242 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
243
244
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
245 6
        $compiledQuery = $selectQuery->compile();
246
247
        /**
248
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
249
         *
250
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
251
         */
252 6
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
253
254
        /**
255
         * @psalm-var      array<string, string>|null $data
256
         *
257
         * @var array $data
258
         */
259 6
        $data = yield fetchOne($resultSet);
260
261 6
        return $data;
262
    }
263
264
    /**
265
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
266
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
267
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
268
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
269
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
270
     */
271 5
    private static function doLoadStreamEvents(
272
        QueryExecutor $queryExecutor,
273
        string $streamId,
274
        int $fromVersion,
275
        ?int $toVersion
276
    ): \Generator {
277
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
278 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
279 5
            ->where(field('stream_id')->eq($streamId))
280 5
            ->andWhere(field('playhead')->gte($fromVersion))
281 5
            ->andWhere(field('canceled_at')->isNull());
282
283 5
        if (null !== $toVersion && $fromVersion < $toVersion)
284
        {
285
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
286
        }
287
288
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
289 5
        $compiledQuery = $selectQuery->compile();
290
291
        /**
292
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
293
         *
294
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
295
         */
296 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
297
298
        /**
299
         * @psalm-var      array<int, array>|null $result
300
         *
301
         * @var array $result
302
         */
303 5
        $result = yield fetchAll($resultSet);
304
305 5
        return $result;
306
    }
307
308
    /**
309
     * Complete removal of the "tail" events from the database.
310
     *
311
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
312
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
313
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
314
     */
315 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
316
    {
317 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
318 1
            ->where(equalsCriteria('stream_id', $streamId))
319 1
            ->andWhere(field('playhead')->gt($toVersion));
320
321 1
        $compiledQuery = $deleteQuery->compile();
322
323
        /**
324
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
325
         *
326
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
327
         */
328 1
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
329
330 1
        unset($deleteQuery, $compiledQuery, $resultSet);
331 1
    }
332
333
    /**
334
     * Soft deletion of events following the specified version.
335
     *
336
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
337
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
338
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
339
     */
340 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
341
    {
342 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
343 2
            ->where(equalsCriteria('stream_id', $streamId))
344 2
            ->andWhere(field('playhead')->gt($toVersion));
345
346
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
347 2
        $compiledQuery = $updateQuery->compile();
348
349
        /**
350
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
351
         *
352
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
353
         */
354 2
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
355
356 2
        unset($updateQuery, $compiledQuery, $resultSet);
357 2
    }
358
359
    /**
360
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
361
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
362
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
363
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
364
     */
365 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
366
    {
367 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
368 3
            ->where(equalsCriteria('stream_id', $streamId))
369 3
            ->andWhere(field('playhead')->lte($toVersion));
370
371 3
        $compiledQuery = $updateQuery->compile();
372
373
        /**
374
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
375
         *
376
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
377
         */
378 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
379
380 3
        unset($updateQuery, $compiledQuery, $resultSet);
381 3
    }
382
383
    /**
384
     * Transform events stream array data to stored representation.
385
     *
386
     * @psalm-param array<string, string>  $streamData
387
     * @psalm-param array<int, array>|null $streamEventsData
388
     *
389
     * @throws \LogicException
390
     */
391 5
    private static function restoreEventStream(
392
        DatabaseAdapter $adapter,
393
        array $streamData,
394
        ?array $streamEventsData
395
    ): StoredAggregateEventStream {
396
        /** @psalm-var array{
397
         *     id:string,
398
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
399
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
400
         *     created_at:string,
401
         *     closed_at:string|null
402
         * } $streamData
403
         */
404
405 5
        return new StoredAggregateEventStream(
406 5
            $streamData['id'],
407 5
            $streamData['identifier_class'],
408 5
            $streamData['aggregate_class'],
409 5
            self::restoreEvents($adapter, $streamEventsData),
410 5
            $streamData['created_at'],
411 5
            $streamData['closed_at']
412
        );
413
    }
414
415
    /**
416
     * Restore events from rows.
417
     *
418
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
419
     *
420
     * @throws \LogicException
421
     *
422
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
423
     */
424 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
425
    {
426 5
        $events = [];
427
428 5
        if (\is_array($eventsData) && 0 !== \count($eventsData))
429
        {
430
            /**
431
             * @psalm-var array{
432
             *   id:string,
433
             *   playhead:string,
434
             *   payload:string,
435
             *   event_class:class-string,
436
             *   occured_at:string,
437
             *   recorded_at:string
438
             * } $eventRow
439
             */
440 4
            foreach ($eventsData as $eventRow)
441
            {
442 4
                $playhead = (int) $eventRow['playhead'];
443 4
                $payload = (string) \base64_decode($decoder->unescapeBinary($eventRow['payload']));
444
445 4
                $events[$playhead] = StoredAggregateEvent::restore(
446 4
                    $eventRow['id'],
447
                    $playhead,
448
                    $payload,
449 4
                    $eventRow['event_class'],
450 4
                    $eventRow['occured_at'],
451 4
                    $eventRow['recorded_at']
452
                );
453
            }
454
        }
455
456
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
457
458 5
        return $events;
459
    }
460
461
    /**
462
     * Create a sql query to store events.
463
     */
464 7
    private static function createSaveEventQueryString(int $eventsCount): string
465
    {
466 7
        return \sprintf(
467
        /** @lang text */
468 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
469 7
            self::STREAM_EVENTS_TABLE,
470 7
            \implode(
471 7
                ', ',
472 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
473
            )
474
        );
475
    }
476
477
    /**
478
     * Gathering parameters for sending to a request to save events.
479
     */
480 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
481
    {
482 7
        $queryParameters = [];
483 7
        $rowSetIndex     = 0;
484
485
        /** @psalm-var array<int, string|int|float|null> $parameters */
486 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
487
        {
488 7
            foreach ($parameters as $parameter)
489
            {
490 7
                $queryParameters[$rowSetIndex] = $parameter;
491
492 7
                $rowSetIndex++;
493
            }
494
        }
495
496 7
        return $queryParameters;
497
    }
498
499
    /**
500
     * Prepare events to insert.
501
     *
502
     * @psalm-return array<int, array<int, string|int>>
503
     */
504 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
505
    {
506 7
        $eventsRows = [];
507
508 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
509
        {
510
            /** @var StoredAggregateEvent $storedAggregateEvent */
511
            $row = [
512 7
                $storedAggregateEvent->eventId,
513 7
                $eventsStream->aggregateId,
514 7
                $storedAggregateEvent->playheadPosition,
515 7
                $storedAggregateEvent->eventClass,
516 7
                \base64_encode($storedAggregateEvent->eventData),
517 7
                $storedAggregateEvent->occuredAt,
518 7
                now()->format('Y-m-d H:i:s.u'),
519
            ];
520
521 7
            $eventsRows[] = $row;
522
        }
523
524 7
        return $eventsRows;
525
    }
526
}
527