Completed
Push — v4.1 ( f3a0c7...588d2a )
by Masiukevich
07:33
created

AmpPostgreSQLAdapter::transactional()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 38
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 14
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 38
ccs 13
cts 13
cp 1
crap 2
rs 9.7998
1
<?php
2
3
/**
4
 * SQL databases adapters implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\Storage\Sql\AmpPosgreSQL;
14
15
use Amp\Postgres\PgSqlCommandResult;
16
use Amp\Postgres\PooledResultSet;
17
use Amp\Postgres\PqCommandResult;
18
use Amp\Sql\ResultSet as AmpResultSet;
19
use function Amp\call;
20
use function Amp\Postgres\pool;
21
use Amp\Coroutine;
22
use Amp\Postgres\ConnectionConfig;
23
use Amp\Postgres\Pool;
24
use Amp\Promise;
25
use Psr\Log\LoggerInterface;
26
use Psr\Log\NullLogger;
27
use ServiceBus\Storage\Common\DatabaseAdapter;
28
use ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions;
29
use ServiceBus\Storage\Common\StorageConfiguration;
30
31
/**
32
 * @see https://github.com/amphp/postgres
33
 */
34
final class AmpPostgreSQLAdapter implements DatabaseAdapter
35
{
36
    /** @var StorageConfiguration StorageConfiguration */
37
    private $configuration;
38
39
    /** @var Pool|null */
40
    private $pool = null;
41
42
    /** @var LoggerInterface|NullLogger */
43
    private $logger;
44
45
    /**
46
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
47
     */
48 1
    public function __construct(StorageConfiguration $configuration, ?LoggerInterface $logger = null)
49
    {
50
        // @codeCoverageIgnoreStart
51
        if (\extension_loaded('pgsql') === false)
52
        {
53
            throw new InvalidConfigurationOptions('ext-pgsql must be installed');
54
        }
55
        // @codeCoverageIgnoreEnd
56
57 1
        $this->configuration = $configuration;
58 1
        $this->logger        = $logger ?? new NullLogger();
59 1
    }
60
61 1
    public function __destruct()
62
    {
63
        /** @psalm-suppress RedundantConditionGivenDocblockType Null in case of error */
64 1
        if ($this->pool !== null)
65
        {
66 1
            $this->pool->close();
67
        }
68 1
    }
69
70
    /**
71
     * {@inheritdoc}
72
     *
73
     * @psalm-suppress MixedReturnTypeCoercion
74
     */
75 21
    public function execute(string $queryString, array $parameters = []): Promise
76
    {
77 21
        return call(
78
            function () use ($queryString, $parameters) : \Generator
79
            {
80
                try
81
                {
82 21
                    $this->logger->debug($queryString, $parameters);
83
84
                    /** @var AmpResultSet|PgSqlCommandResult|PooledResultSet|PqCommandResult $resultSet  */
85 21
                    $resultSet = yield $this->pool()->execute($queryString, $parameters);
86
87 21
                    return new AmpPostgreSQLResultSet($resultSet);
88
                }
89 3
                catch (\Throwable $throwable)
90
                {
91 3
                    throw adaptAmpThrowable($throwable);
92
                }
93 21
            }
94
        );
95
    }
96
97
    /**
98
     * {@inheritdoc}
99
     *
100
     * @psalm-suppress MixedReturnTypeCoercion
101
     */
102 2
    public function transactional(callable $function): Promise
103
    {
104 2
        return call(
105
            function () use ($function): \Generator
106
            {
107
                /**
108
                 * @psalm-suppress TooManyTemplateParams
109
                 *
110
                 * @var \Amp\Postgres\Transaction $originalTransaction
111
                 */
112 2
                $originalTransaction = yield $this->pool()->beginTransaction();
113
114 2
                $transaction = new AmpPostgreSQLTransaction($originalTransaction, $this->logger);
115
116 2
                $this->logger->debug('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');
117
118
                try
119
                {
120
                    /** @var \Generator<mixed> $generator */
121 2
                    $generator = $function($transaction);
122
123
                    /**
124
                     * @psalm-suppress MixedArgumentTypeCoercion
125
                     * @psalm-suppress InvalidArgument
126
                     */
127 2
                    yield new Coroutine($generator);
128
129 1
                    yield $transaction->commit();
130
                }
131 1
                catch (\Throwable $throwable)
132
                {
133 1
                    yield $transaction->rollback();
134
135 1
                    throw $throwable;
136
                }
137
                finally
138 1
                {
139 2
                    unset($transaction);
140
                }
141 2
            }
142
        );
143
    }
144
145
    /**
146
     * {@inheritdoc}
147
     *
148
     * @psalm-suppress MixedReturnTypeCoercion
149
     */
150 3
    public function transaction(): Promise
151
    {
152 3
        return call(
153
            function (): \Generator
154
            {
155
                try
156
                {
157 3
                    $this->logger->debug('BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED');
158
159
                    /** @var \Amp\Postgres\Transaction $transaction */
160 3
                    $transaction = yield $this->pool()->beginTransaction();
161
162 3
                    return new AmpPostgreSQLTransaction($transaction, $this->logger);
163
                }
164
                // @codeCoverageIgnoreStart
165
                catch (\Throwable $throwable)
166
                {
167
                    throw adaptAmpThrowable($throwable);
168
                }
169
                // @codeCoverageIgnoreEnd
170 3
            }
171
        );
172
    }
173
174
    /**
175
     * {@inheritdoc}
176
     */
177 2
    public function unescapeBinary($payload): string
178
    {
179 2
        if (\is_resource($payload) === true)
180
        {
181
            $payload = \stream_get_contents($payload, -1, 0);
182
        }
183
184 2
        return \pg_unescape_bytea((string) $payload);
185
    }
186
187
    /**
188
     * Receive connection pool.
189
     */
190 21
    private function pool(): Pool
191
    {
192 21
        if ($this->pool === null)
193
        {
194 1
            $queryData = $this->configuration->queryParameters;
195
196 1
            $maxConnectionsCount = (int) ($queryData['max_connections'] ?? Pool::DEFAULT_MAX_CONNECTIONS);
197 1
            $idleTimeout         = (int) ($queryData['idle_timeout'] ?? Pool::DEFAULT_IDLE_TIMEOUT);
198
199 1
            $this->pool = pool(
200 1
                new ConnectionConfig(
201 1
                    (string) $this->configuration->host,
202 1
                    $this->configuration->port ?? ConnectionConfig::DEFAULT_PORT,
203 1
                    $this->configuration->username,
204 1
                    $this->configuration->password,
205 1
                    $this->configuration->databaseName
206
                ),
207 1
                $maxConnectionsCount,
208 1
                $idleTimeout
209
            );
210
        }
211
212 21
        return $this->pool;
213
    }
214
}
215