Passed
Push — v4.1 ( 588d2a...f00803 )
by Masiukevich
01:49
created

SqlMigrationProcessor::process()   A

Complexity

Conditions 4
Paths 1

Size

Total Lines 49
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 4
eloc 17
c 1
b 0
f 1
nc 1
nop 1
dl 0
loc 49
ccs 0
cts 25
cp 0
crap 20
rs 9.7
1
<?php
2
3
/**
4
 * SQL databases adapters implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\Storage\Sql\Migration;
14
15
use Amp\Promise;
16
use Psr\Log\LoggerInterface;
17
use ServiceBus\Storage\Common\DatabaseAdapter;
18
use ServiceBus\Storage\Common\QueryExecutor;
19
use function ServiceBus\Common\invokeReflectionMethod;
20
use function ServiceBus\Common\readReflectionPropertyValue;
21
22
/**
23
 * Migrations executor
24
 */
25
final class SqlMigrationProcessor
26
{
27
    private const DIRECTION_UP   = 'up';
28
    private const DIRECTION_DOWN = 'down';
29
30
    /**
31
     * @var DatabaseAdapter
32
     */
33
    private $storage;
34
35
    /** @var SqlMigrationLoader */
36
    private $migrationsLoader;
37
38
    /** @var LoggerInterface */
39
    private $logger;
40
41
    public function __construct(DatabaseAdapter $storage, SqlMigrationLoader $migrationsLoader, LoggerInterface $logger)
42
    {
43
        $this->storage          = $storage;
44
        $this->migrationsLoader = $migrationsLoader;
45
        $this->logger           = $logger;
46
    }
47
48
    /**
49
     * @throws \RuntimeException Incorrect migration file
50
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
51
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
52
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
53
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
54
     */
55
    public function up(): Promise
56
    {
57
        return $this->process(self::DIRECTION_UP);
58
    }
59
60
    /**
61
     * @throws \RuntimeException Incorrect migration file
62
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
63
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
64
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
65
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
66
     */
67
    public function down(): Promise
68
    {
69
        return $this->process(self::DIRECTION_DOWN);
70
    }
71
72
    /**
73
     * Performing migrations in a given direction (up / down)
74
     *
75
     * @return Promise<void>
76
     */
77
    private function process(string $direction): Promise
78
    {
79
        /** @psalm-suppress InvalidArgument */
80
        return $this->storage->transactional(
81
            function(QueryExecutor $queryExecutor) use ($direction): \Generator
82
            {
83
                /** @var Migration[] $migrations */
84
                $migrations = yield from $this->migrationsLoader->load();
85
86
                yield $queryExecutor->execute(
87
                    'CREATE TABLE IF NOT EXISTS migration ( version varchar NOT NULL );'
88
                );
89
90
                /**
91
                 * @var string    $version
92
                 * @var Migration $migration
93
                 */
94
                foreach($migrations as $version => $migration)
95
                {
96
                    /**
97
                     * @psalm-suppress InvalidScalarArgument
98
                     *
99
                     * @var \ServiceBus\Storage\Common\ResultSet $resultSet
100
                     */
101
                    $resultSet = yield $queryExecutor->execute(
102
                        'INSERT INTO migration (version) VALUES (?) ON CONFLICT DO NOTHING',
103
                        [$version]
104
                    );
105
106
                    /** Миграция была добавлена ранее */
107
                    if($resultSet->affectedRows() === 0)
108
                    {
109
                        $this->logger->debug('Skip "{version}" migration', ['version' => $version]);
110
111
                        continue;
112
                    }
113
114
                    invokeReflectionMethod($migration, $direction);
115
116
                    /** @var string[] $queries */
117
                    $queries = readReflectionPropertyValue($migration, 'queries');
118
119
                    /** @var array $parameters */
120
                    $parameters = readReflectionPropertyValue($migration, 'params');
121
122
                    foreach($queries as $query)
123
                    {
124
                        /** @psalm-suppress MixedArgument */
125
                        yield $queryExecutor->execute($query, $parameters[\sha1($query)] ?? []);
126
                    }
127
                }
128
            }
129
        );
130
    }
131
132
133
}