Completed
Push — v4.0 ( 989be1...ff495d )
by Masiukevich
02:21
created

SqlEventStreamStore::revert()   A

Complexity

Conditions 4
Paths 1

Size

Total Lines 44
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4.1106

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 4
eloc 22
c 3
b 0
f 0
nc 1
nop 3
dl 0
loc 44
ccs 17
cts 21
cp 0.8095
crap 4.1106
rs 9.568
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream\Store;
14
15
use function Amp\call;
16
use function Latitude\QueryBuilder\field;
17
use function ServiceBus\Storage\Sql\deleteQuery;
18
use function ServiceBus\Storage\Sql\equalsCriteria;
19
use function ServiceBus\Storage\Sql\fetchAll;
20
use function ServiceBus\Storage\Sql\fetchOne;
21
use function ServiceBus\Storage\Sql\insertQuery;
22
use function ServiceBus\Storage\Sql\selectQuery;
23
use function ServiceBus\Storage\Sql\updateQuery;
24
use Amp\Promise;
25
use ServiceBus\EventSourcing\Aggregate;
26
use ServiceBus\EventSourcing\AggregateId;
27
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamDoesNotExist;
28
use ServiceBus\EventSourcing\EventStream\Exceptions\EventStreamIntegrityCheckFailed;
29
use ServiceBus\Storage\Common\BinaryDataDecoder;
30
use ServiceBus\Storage\Common\DatabaseAdapter;
31
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
32
use ServiceBus\Storage\Common\QueryExecutor;
33
34
/**
35
 *
36
 */
37
final class SqlEventStreamStore implements EventStreamStore
38
{
39
    private const STREAMS_TABLE = 'event_store_stream';
40
41
    private const STREAM_EVENTS_TABLE = 'event_store_stream_events';
42
43
    private DatabaseAdapter $adapter;
44
45 8
    public function __construct(DatabaseAdapter $adapter)
46
    {
47 8
        $this->adapter = $adapter;
48 8
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53 7
    public function save(StoredAggregateEventStream $aggregateEventStream): Promise
54
    {
55 7
        return $this->adapter->transactional(
56
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
57
            {
58 7
                yield from self::doSaveStream($queryExecutor, $aggregateEventStream);
59 7
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
60 7
            }
61
        );
62
    }
63
64
    /**
65
     * {@inheritdoc}
66
     */
67 4
    public function append(StoredAggregateEventStream $aggregateEventStream): Promise
68
    {
69 4
        return $this->adapter->transactional(
70
            static function(QueryExecutor $queryExecutor) use ($aggregateEventStream): \Generator
71
            {
72 4
                yield from self::doSaveEvents($queryExecutor, $aggregateEventStream);
73 4
            }
74
        );
75
    }
76
77
    /**
78
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
79
     *
80
     * {@inheritdoc}
81
     */
82 5
    public function load(
83
        AggregateId $id,
84
        int $fromVersion = Aggregate::START_PLAYHEAD_INDEX,
85
        ?int $toVersion = null
86
    ): Promise
87
    {
88 5
        $adapter = $this->adapter;
89
90 5
        return call(
91
            static function(AggregateId $id, int $fromVersion, ?int $toVersion) use ($adapter): \Generator
92
            {
93 5
                $aggregateEventStream = null;
94
95
                /** @var array<string, string>|null $streamData */
96 5
                $streamData = yield from self::doLoadStream($adapter, $id);
97
98 5
                if (null !== $streamData)
99
                {
100
                    /** @var array<int, array>|null $streamEventsData */
101 5
                    $streamEventsData = yield from self::doLoadStreamEvents(
102 5
                        $adapter,
103 5
                        (string) $streamData['id'],
104
                        $fromVersion,
105
                        $toVersion
106
                    );
107
108 5
                    $aggregateEventStream = self::restoreEventStream($adapter, $streamData, $streamEventsData);
109
                }
110
111 5
                return $aggregateEventStream;
112 5
            },
113 5
            $id,
114 5
            $fromVersion,
115 5
            $toVersion
116
        );
117
    }
118
119
    /**
120
     * {@inheritdoc}
121
     */
122
    public function close(AggregateId $id): Promise
123
    {
124
        $adapter = $this->adapter;
125
126
        return call(
127
            static function(AggregateId $id) use ($adapter): \Generator
128
            {
129
                $updateQuery = updateQuery(self::STREAMS_TABLE, ['closed_at' => \date('Y-m-d H:i:s')])
130
                    ->where(equalsCriteria('id', $id->toString()))
131
                    ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
132
133
                $compiledQuery = $updateQuery->compile();
134
135
                /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
136
                yield $adapter->execute($compiledQuery->sql(), $compiledQuery->params());
137
            },
138
            $id
139
        );
140
    }
141
142
    /**
143
     * {@inheritdoc}
144
     */
145 4
    public function revert(AggregateId $id, int $toVersion, bool $force): Promise
146
    {
147 4
        $adapter = $this->adapter;
148
149 4
        return call(
150
            static function(AggregateId $id, int $toVersion, bool $force) use ($adapter): \Generator
151
            {
152
                /** @var array<string, string>|null $streamData */
153 4
                $streamData = yield from self::doLoadStream($adapter, $id);
154
155 4
                if (null === $streamData)
156
                {
157 1
                    throw EventStreamDoesNotExist::create($id);
158
                }
159
160
                /** @var string $streamId */
161 3
                $streamId = $streamData['id'];
162
163
                try
164
                {
165 3
                    yield $adapter->transactional(
166
                        static function(QueryExecutor $queryExecutor) use ($force, $streamId, $toVersion): \Generator
167
                        {
168 3
                            true === $force
169 1
                                ? yield from self::doDeleteTailEvents($queryExecutor, $streamId, $toVersion)
170 2
                                : yield from self::doSkipEvents($queryExecutor, $streamId, $toVersion);
171
172
                            /** restore soft deleted events */
173 3
                            yield from self::doRestoreEvents($queryExecutor, $streamId, $toVersion);
174 3
                        }
175
                    );
176
                }
177
                catch (UniqueConstraintViolationCheckFailed $exception)
178
                {
179
                    throw new EventStreamIntegrityCheckFailed(
180
                        \sprintf('Error verifying the integrity of the events stream with ID "%s"', $id->toString()),
181
                        (int) $exception->getCode(),
182
                        $exception
183
                    );
184
                }
185 4
            },
186 4
            $id,
187 4
            $toVersion,
188 4
            $force
189
        );
190
    }
191
192
    /**
193
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
194
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
195
     * @throws \ServiceBus\Storage\Common\Exceptions\IncorrectParameterCast
196
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
197
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
198
     */
199 7
    private static function doSaveStream(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
200
    {
201 7
        $insertQuery = insertQuery(self::STREAMS_TABLE, [
202 7
            'id'               => $eventsStream->aggregateId,
203 7
            'identifier_class' => $eventsStream->aggregateIdClass,
204 7
            'aggregate_class'  => $eventsStream->aggregateClass,
205 7
            'created_at'       => $eventsStream->createdAt,
206 7
            'closed_at'        => $eventsStream->closedAt,
207
        ]);
208
209 7
        $compiledQuery = $insertQuery->compile();
210
211
        /**
212
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
213
         *
214
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
215
         */
216 7
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
217
218 7
        unset($insertQuery, $compiledQuery, $resultSet);
219 7
    }
220
221
    /**
222
     * Saving events in stream.
223
     *
224
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
225
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
226
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
227
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
228
     */
229 7
    private static function doSaveEvents(QueryExecutor $queryExecutor, StoredAggregateEventStream $eventsStream): \Generator
230
    {
231 7
        $eventsCount = \count($eventsStream->storedAggregateEvents);
232
233 7
        if (0 !== $eventsCount)
234
        {
235
            /** @psalm-suppress MixedTypeCoercion Invalid params() docblock */
236 7
            yield $queryExecutor->execute(
237 7
                self::createSaveEventQueryString($eventsCount),
238 7
                self::collectSaveEventQueryParameters($eventsStream)
239
            );
240
        }
241 7
    }
242
243
    /**
244
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
245
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
246
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
247
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
248
     */
249 6
    private static function doLoadStream(QueryExecutor $queryExecutor, AggregateId $id): \Generator
250
    {
251 6
        $selectQuery = selectQuery(self::STREAMS_TABLE)
252 6
            ->where(equalsCriteria('id', $id->toString()))
253 6
            ->andWhere(equalsCriteria('identifier_class', \get_class($id)));
254
255
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
256 6
        $compiledQuery = $selectQuery->compile();
257
258
        /**
259
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
260
         *
261
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
262
         */
263 6
        $resultSet =  yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
264
265
        /**
266
         * @psalm-var      array<string, string>|null $data
267
         *
268
         * @var array $data
269
         */
270 6
        $data = yield fetchOne($resultSet);
271
272 6
        return $data;
273
    }
274
275
    /**
276
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
277
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
278
     * @throws \ServiceBus\Storage\Common\Exceptions\ResultSetIterationFailed
279
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
280
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
281
     */
282 5
    private static function doLoadStreamEvents(
283
        QueryExecutor $queryExecutor,
284
        string $streamId,
285
        int $fromVersion,
286
        ?int $toVersion
287
    ): \Generator {
288
        /** @var \Latitude\QueryBuilder\Query\SelectQuery $selectQuery */
289 5
        $selectQuery = selectQuery(self::STREAM_EVENTS_TABLE)
290 5
            ->where(field('stream_id')->eq($streamId))
291 5
            ->andWhere(field('playhead')->gte($fromVersion))
292 5
            ->andWhere(field('canceled_at')->isNull());
293
294 5
        if (null !== $toVersion && $fromVersion < $toVersion)
295
        {
296
            $selectQuery->andWhere(field('playhead')->lte($toVersion));
297
        }
298
299
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
300 5
        $compiledQuery = $selectQuery->compile();
301
302
        /**
303
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
304
         *
305
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
306
         */
307 5
        $resultSet = yield $queryExecutor->execute($compiledQuery->sql(), $compiledQuery->params());
308
309
        /**
310
         * @psalm-var      array<int, array>|null $result
311
         *
312
         * @var array $result
313
         */
314 5
        $result = yield fetchAll($resultSet);
315
316 5
        return $result;
317
    }
318
319
    /**
320
     * Complete removal of the "tail" events from the database
321
     *
322
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
323
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
324
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
325
     */
326 1
    private static function doDeleteTailEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
327
    {
328 1
        $deleteQuery = deleteQuery(self::STREAM_EVENTS_TABLE)
329 1
            ->where(equalsCriteria('stream_id', $streamId))
330 1
            ->andWhere(field('playhead')->gt($toVersion));
331
332 1
        $compiledQuery = $deleteQuery->compile();
333
334
        /**
335
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
336
         *
337
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
338
         */
339 1
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
340
341 1
        unset($deleteQuery, $compiledQuery, $resultSet);
342 1
    }
343
344
    /**
345
     * Soft deletion of events following the specified version
346
     *
347
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
348
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
349
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
350
     */
351 2
    private static function doSkipEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
352
    {
353 2
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => \date('Y-m-d H:i:s')])
354 2
            ->where(equalsCriteria('stream_id', $streamId))
355 2
            ->andWhere(field('playhead')->gt($toVersion));
356
357
        /** @var \Latitude\QueryBuilder\Query $compiledQuery */
358 2
        $compiledQuery = $updateQuery->compile();
359
360
        /**
361
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
362
         *
363
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
364
         */
365 2
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
366
367 2
        unset($updateQuery, $compiledQuery, $resultSet);
368 2
    }
369
370
    /**
371
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
372
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
373
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
374
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
375
     */
376 3
    private static function doRestoreEvents(QueryExecutor $executor, string $streamId, int $toVersion): \Generator
377
    {
378 3
        $updateQuery = updateQuery(self::STREAM_EVENTS_TABLE, ['canceled_at' => null])
379 3
            ->where(equalsCriteria('stream_id', $streamId))
380 3
            ->andWhere(field('playhead')->lte($toVersion));
381
382 3
        $compiledQuery = $updateQuery->compile();
383
384
        /**
385
         * @psalm-suppress MixedTypeCoercion Invalid params() docblock
386
         *
387
         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
388
         */
389 3
        $resultSet = yield $executor->execute($compiledQuery->sql(), $compiledQuery->params());
390
391 3
        unset($updateQuery, $compiledQuery, $resultSet);
392 3
    }
393
394
    /**
395
     * Transform events stream array data to stored representation.
396
     *
397
     * @psalm-param array<string, string>  $streamData
398
     * @psalm-param array<int, array>|null $streamEventsData
399
     *
400
     * @throws \LogicException
401
     */
402 5
    private static function restoreEventStream(
403
        DatabaseAdapter $adapter,
404
        array $streamData,
405
        ?array $streamEventsData
406
    ): StoredAggregateEventStream {
407
        /** @psalm-var array{
408
         *     id:string,
409
         *     identifier_class:class-string<\ServiceBus\EventSourcing\AggregateId>,
410
         *     aggregate_class:class-string<\ServiceBus\EventSourcing\Aggregate>,
411
         *     created_at:string,
412
         *     closed_at:string|null
413
         * } $streamData
414
         */
415
416 5
        return new StoredAggregateEventStream(
417 5
            $streamData['id'],
418 5
            $streamData['identifier_class'],
419 5
            $streamData['aggregate_class'],
420 5
            self::restoreEvents($adapter, $streamEventsData),
421 5
            $streamData['created_at'],
422 5
            $streamData['closed_at']
423
        );
424
    }
425
426
    /**
427
     * Restore events from rows.
428
     *
429
     * @psalm-return array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent>
430
     *
431
     * @throws \LogicException
432
     *
433
     * @return \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent[]
434
     */
435 5
    private static function restoreEvents(BinaryDataDecoder $decoder, ?array $eventsData): array
436
    {
437 5
        $events = [];
438
439 5
        if (true === \is_array($eventsData) && 0 !== \count($eventsData))
440
        {
441
            /**
442
             * @psalm-var array{
443
             *   id:string,
444
             *   playhead:string,
445
             *   payload:string,
446
             *   event_class:class-string,
447
             *   occured_at:string,
448
             *   recorded_at:string
449
             * } $eventRow
450
             */
451 4
            foreach ($eventsData as $eventRow)
452
            {
453 4
                $playhead = (int) $eventRow['playhead'];
454
455 4
                $payload = \base64_decode($decoder->unescapeBinary($eventRow['payload']));
456
457 4
                if (true === \is_string($payload))
458
                {
459 4
                    $events[$playhead] = StoredAggregateEvent::restore(
460 4
                        $eventRow['id'],
461
                        $playhead,
462
                        $payload,
463 4
                        $eventRow['event_class'],
464 4
                        $eventRow['occured_at'],
465 4
                        $eventRow['recorded_at']
466
                    );
467
468 4
                    continue;
469
                }
470
471
                throw new \LogicException(
472
                    \sprintf('Unable to decode event content with ID: %s', $eventRow['id'])
473
                );
474
            }
475
        }
476
477
        /** @psal-var array<int, \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEvent> */
478
479 5
        return $events;
480
    }
481
482
    /**
483
     * Create a sql query to store events.
484
     */
485 7
    private static function createSaveEventQueryString(int $eventsCount): string
486
    {
487 7
        return \sprintf(
488
        /** @lang text */
489 7
            'INSERT INTO %s (id, stream_id, playhead, event_class, payload, occured_at, recorded_at) VALUES %s',
490 7
            self::STREAM_EVENTS_TABLE,
491 7
            \implode(
492 7
                ', ',
493 7
                \array_fill(0, $eventsCount, '(?, ?, ?, ?, ?, ?, ?)')
494
            )
495
        );
496
    }
497
498
    /**
499
     * Gathering parameters for sending to a request to save events.
500
     */
501 7
    private static function collectSaveEventQueryParameters(StoredAggregateEventStream $eventsStream): array
502
    {
503 7
        $queryParameters = [];
504 7
        $rowSetIndex     = 0;
505
506
        /** @psalm-var array<int, string|int|float|null> $parameters */
507 7
        foreach (self::prepareEventRows($eventsStream) as $parameters)
508
        {
509 7
            foreach ($parameters as $parameter)
510
            {
511 7
                $queryParameters[$rowSetIndex] = $parameter;
512
513 7
                $rowSetIndex++;
514
            }
515
        }
516
517 7
        return $queryParameters;
518
    }
519
520
    /**
521
     * Prepare events to insert.
522
     *
523
     * @psalm-return array<int, array<int, string|int>>
524
     */
525 7
    private static function prepareEventRows(StoredAggregateEventStream $eventsStream): array
526
    {
527 7
        $eventsRows = [];
528
529 7
        foreach ($eventsStream->storedAggregateEvents as $storedAggregateEvent)
530
        {
531
            /** @var StoredAggregateEvent $storedAggregateEvent */
532
            $row = [
533 7
                $storedAggregateEvent->eventId,
534 7
                $eventsStream->aggregateId,
535 7
                $storedAggregateEvent->playheadPosition,
536 7
                $storedAggregateEvent->eventClass,
537 7
                \base64_encode($storedAggregateEvent->eventData),
538 7
                $storedAggregateEvent->occuredAt,
539 7
                \date('Y-m-d H:i:s'),
540
            ];
541
542 7
            $eventsRows[] = $row;
543
        }
544
545 7
        return $eventsRows;
546
    }
547
}
548