Passed
Push — v4.1 ( 0ac185...4d3fc3 )
by Masiukevich
01:44
created

SqlMigrationProcessor::up()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 1
c 1
b 0
f 1
nc 1
nop 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
<?php
2
3
/**
4
 * SQL databases adapters implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\Storage\Sql\Migration;
14
15
use Amp\Promise;
16
use Psr\Log\LoggerInterface;
17
use Psr\Log\NullLogger;
18
use ServiceBus\Storage\Common\DatabaseAdapter;
19
use ServiceBus\Storage\Common\QueryExecutor;
20
use function Amp\call;
21
use function ServiceBus\Common\invokeReflectionMethod;
22
use function ServiceBus\Common\readReflectionPropertyValue;
23
24
/**
25
 * Migrations executor
26
 */
27
final class SqlMigrationProcessor
28
{
29
    private const DIRECTION_UP   = 'up';
30
    private const DIRECTION_DOWN = 'down';
31
32
    /**
33
     * @var DatabaseAdapter
34
     */
35
    private $storage;
36
37
    /** @var SqlMigrationLoader */
38
    private $migrationsLoader;
39
40
    /** @var LoggerInterface */
41
    private $logger;
42
43 2
    public function __construct(DatabaseAdapter $storage, SqlMigrationLoader $migrationsLoader, ?LoggerInterface $logger = null)
44
    {
45 2
        $this->storage          = $storage;
46 2
        $this->migrationsLoader = $migrationsLoader;
47 2
        $this->logger           = $logger ?? new NullLogger();
48 2
    }
49
50
    /**
51
     * @return Promise<int>
52
     *
53
     * @throws \RuntimeException Incorrect migration file
54
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
55
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
56
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
57
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
58
     */
59 1
    public function up(): Promise
60
    {
61 1
        return $this->process(self::DIRECTION_UP);
62
    }
63
64
    /**
65
     * @return Promise<int>
66
     *
67
     * @throws \RuntimeException Incorrect migration file
68
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
69
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
70
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
71
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
72
     */
73 1
    public function down(): Promise
74
    {
75 1
        return $this->process(self::DIRECTION_DOWN);
76
    }
77
78
    /**
79
     * Performing migrations in a given direction (up / down)
80
     *
81
     * @return Promise<int>
82
     */
83 2
    private function process(string $direction): Promise
84
    {
85 2
        return call(
86
            function () use ($direction): \Generator
87
            {
88
                /** @var \ServiceBus\Storage\Common\Transaction $transaction */
89 2
                $transaction = yield $this->storage->transaction();
90
91
                try
92
                {
93 2
                    $executedQueries = 0;
94
95
                    /**
96
                     * @psalm-var array<string, \ServiceBus\Storage\Sql\Migration\Migration> $migrations
97
                     */
98 2
                    $migrations = yield $this->migrationsLoader->load();
99
100 2
                    yield $transaction->execute(
101 2
                        'CREATE TABLE IF NOT EXISTS migration ( version varchar NOT NULL );'
102
                    );
103
104
                    /**
105
                     * @var string    $version
106
                     * @var Migration $migration
107
                     */
108 2
                    foreach ($migrations as $version => $migration)
109
                    {
110
                        /**
111
                         * @psalm-suppress InvalidScalarArgument
112
                         *
113
                         * @var \ServiceBus\Storage\Common\ResultSet $resultSet
114
                         */
115 2
                        $resultSet = yield $transaction->execute(
116 2
                            'INSERT INTO migration (version) VALUES (?) ON CONFLICT DO NOTHING',
117 2
                            [$version]
118
                        );
119
120
                        /** Миграция была добавлена ранее */
121
                        if ($resultSet->affectedRows() === 0)
122
                        {
123
                            $this->logger->debug('Skip "{version}" migration', ['version' => $version]);
124
125
                            continue;
126
                        }
127
128
                        invokeReflectionMethod($migration, $direction);
129
130
                        /** @var string[] $queries */
131
                        $queries = readReflectionPropertyValue($migration, 'queries');
132
133
                        /** @var array $parameters */
134
                        $parameters = readReflectionPropertyValue($migration, 'params');
135
136
                        foreach ($queries as $query)
137
                        {
138
                            /** @psalm-suppress MixedArgument */
139
                            yield $transaction->execute($query, $parameters[\sha1($query)] ?? []);
140
141
                            $executedQueries++;
142
                        }
143
                    }
144
145
                    yield $transaction->commit();
146
147
                    return $executedQueries;
148
                }
149 2
                catch (\Throwable $throwable)
150
                {
151 2
                    yield $transaction->rollback();
152
153 2
                    throw $throwable;
154
                }
155 2
            }
156
        );
157
    }
158
}
159