Passed
Pull Request — main (#143)
by Andreas
11:58
created

ServerPool::executeBulk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
<?php
2
/**
3
 * Licensed to CRATE Technology GmbH("Crate") under one or more contributor
4
 * license agreements.  See the NOTICE file distributed with this work for
5
 * additional information regarding copyright ownership.  Crate licenses
6
 * this file to you under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.  You may
8
 * obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
15
 * License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * However, if you have executed another commercial license agreement
19
 * with Crate these terms will supersede the license and you may use the
20
 * software solely pursuant to the terms of the relevant commercial agreement.
21
 */
22
23
declare(strict_types=1);
24
25
namespace Crate\PDO\Http;
26
27
use Crate\PDO\Exception\RuntimeException;
28
use Crate\PDO\PDO;
29
use Crate\PDO\PDOInterface;
30
use Crate\Stdlib\BulkResponse;
31
use Crate\Stdlib\BulkResponseInterface;
32
use Crate\Stdlib\Collection;
33
use Crate\Stdlib\CollectionInterface;
34
use GuzzleHttp\Client;
35
use GuzzleHttp\ClientInterface;
36
use GuzzleHttp\Exception\BadResponseException;
37
use GuzzleHttp\Exception\ConnectException;
38
use GuzzleHttp\RequestOptions;
39
40
/**
41
 * Class ServerPool
42
 *
43
 * Very basic round robin implementation
44
 */
45
final class ServerPool implements ServerInterface
46
{
47
    private const DEFAULT_SERVER = 'localhost:4200';
48
49
    /**
50
     * @var string
51
     */
52
    private $protocol = 'http';
53
54
    /**
55
     * @var array
56
     */
57
    private $httpOptions = [];
58
59
    /**
60
     * @var string[]
61
     */
62
    private $availableServers = [];
63
64
    /**
65
     * @var Client
66
     */
67
    private $httpClient;
68
69
    /**
70
     * Client constructor.
71
     *
72
     * @param array $servers
73
     * @param ClientInterface|null $client
74
     */
75 28
    public function __construct(array $servers, ClientInterface $client = null)
76
    {
77 28
        if (\count($servers) === 0) {
78
            $servers = [self::DEFAULT_SERVER];
79
        }
80
81
        // micro optimization so we don't always hit the same server first
82 28
        shuffle($servers);
83
84 28
        foreach ($servers as $server) {
85 28
            $this->availableServers[] = $server;
86
        }
87
88 28
        $this->httpClient = $client ?: new Client();
89
    }
90
91
    /**
92
     * {@Inheritdoc}
93
     * @throws \GuzzleHttp\Exception\ConnectException
94
     */
95 26
    public function execute(string $query, array $parameters): CollectionInterface
96
    {
97 26
        return $this->executeGeneric($query, $parameters, false);
98
    }
99
100
    /**
101
     * {@Inheritdoc}
102
     * @throws \GuzzleHttp\Exception\ConnectException
103
     */
104 2
    public function executeBulk(string $query, array $parameters): BulkResponseInterface
105
    {
106 2
        return $this->executeGeneric($query, $parameters, true);
107
    }
108
109
    /**
110
     * {@Inheritdoc}
111
     * @throws \GuzzleHttp\Exception\ConnectException
112
     */
113 28
    private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
114
    {
115 28
        $numServers = count($this->availableServers) - 1;
116
117 28
        for ($i = 0; $i <= $numServers; $i++) {
118
            // always get the first available server
119 28
            $server = $this->availableServers[0];
120
121
            // Move the selected server to the end of the stack
122 28
            $this->availableServers[] = array_shift($this->availableServers);
123
124
            try {
125 28
                return $this->sendRequest($server, $query, $parameters, $bulk_mode);
126 6
            } catch (ConnectException $exception) {
127
                // Catch it before the BadResponseException but do nothing.
128 4
                continue;
129 2
            } catch (BadResponseException $exception) {
130 2
                $body = (string)$exception->getResponse()->getBody();
131 2
                $json = json_decode($body, true);
132
133 2
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
134
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
135
                }
136
137 2
                $errorCode = $json['error']['code'];
138 2
                $errorMessage = $json['error']['message'];
139
140 2
                throw new RuntimeException($errorMessage, $errorCode, $exception);
141
            }
142
        }
143
144 2
        throw new ConnectException(
145 2
            sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $exception does not seem to be defined for all execution paths leading up to this point.
Loading history...
146 2
            $exception->getRequest(),
147 2
            $exception
148 2
        );
149
    }
150
151 28
    private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
152
    {
153 28
        $args_name = 'args';
154 28
        if ($bulk_mode) {
155 2
            $args_name = 'bulk_args';
156
        }
157 28
        $options = array_merge($this->httpOptions, [
158 28
            'base_uri' => sprintf('%s://%s', $this->protocol, $server),
159 28
            'json'     => [
160 28
                'stmt' => $query,
161 28
                $args_name => $parameters,
162 28
            ],
163 28
        ]);
164
165 28
        $response     = $this->httpClient->request('POST', '/_sql', $options);
166 24
        $responseBody = json_decode((string)$response->getBody(), true);
167
168 24
        if ($bulk_mode) {
169 2
            return new BulkResponse(
170 2
                $responseBody['results'],
171 2
                $responseBody['cols'],
172 2
                $responseBody['duration']
173 2
            );
174
        } else {
175 22
            return new Collection(
176 22
                $responseBody['rows'],
177 22
                $responseBody['cols'],
178 22
                $responseBody['duration'],
179 22
                $responseBody['rowcount']
180 22
            );
181
        }
182
    }
183
184
    /**
185
     * {@Inheritdoc}
186
     */
187
    public function getServerInfo(): array
188
    {
189
        return [
190
            'serverVersion' => $this->getServerVersion(),
191
        ];
192
    }
193
194
    /**
195
     * {@Inheritdoc}
196
     */
197
    public function getServerVersion(): string
198
    {
199
        $result = $this->execute("select version['number'] from sys.nodes limit 1", []);
200
201
        if (count($result->getRows()) !== 1) {
202
            throw new RuntimeException('Failed to determine server version');
203
        }
204
205
        return $result->getRows()[0][0];
206
    }
207
208
    /**
209
     * Reconfigure the the server pool based on the attributes in PDO
210
     *
211
     * @param PDOInterface $pdo
212
     */
213 28
    public function configure(PDOInterface $pdo): void
214
    {
215 28
        $sslMode = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_MODE);
216
217 28
        $protocol = $sslMode === PDO::CRATE_ATTR_SSL_MODE_DISABLED ? 'http' : 'https';
218
219 28
        $options = [
220 28
            RequestOptions::TIMEOUT         => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
221 28
            RequestOptions::CONNECT_TIMEOUT => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
222 28
            RequestOptions::AUTH            => $pdo->getAttribute(PDO::CRATE_ATTR_HTTP_BASIC_AUTH) ?: null,
223 28
            RequestOptions::HEADERS         => [
224 28
                'Default-Schema' => $pdo->getAttribute(PDO::CRATE_ATTR_DEFAULT_SCHEMA),
225 28
            ],
226 28
        ];
227
228 28
        if ($sslMode === PDO::CRATE_ATTR_SSL_MODE_ENABLED_BUT_WITHOUT_HOST_VERIFICATION) {
229 4
            $options['verify'] = false;
230
        }
231
232 28
        $ca         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PATH);
233 28
        $caPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PASSWORD);
234
235 28
        if ($ca) {
236 4
            if ($caPassword) {
237 2
                $options[RequestOptions::VERIFY] = [$ca, $caPassword];
238
            } else {
239 4
                $options[RequestOptions::VERIFY] = $ca;
240
            }
241
        }
242
243 28
        $cert         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PATH);
244 28
        $certPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PASSWORD);
245
246 28
        if ($cert) {
247 4
            if ($certPassword) {
248 2
                $options[RequestOptions::CERT] = [$cert, $certPassword];
249
            } else {
250 4
                $options[RequestOptions::CERT] = $cert;
251
            }
252
        }
253
254 28
        $key         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PATH);
255 28
        $keyPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PASSWORD);
256
257 28
        if ($key) {
258 4
            if ($keyPassword) {
259 2
                $options[RequestOptions::SSL_KEY] = [$key, $keyPassword];
260
            } else {
261 4
                $options[RequestOptions::SSL_KEY] = $key;
262
            }
263
        }
264
265 28
        $this->protocol    = $protocol;
266 28
        $this->httpOptions = $options;
267
    }
268
}
269