Passed
Pull Request — main (#143)
by Andreas
01:39
created

ServerPool::sendRequest()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 29
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 21
nc 4
nop 4
dl 0
loc 29
ccs 24
cts 24
cp 1
crap 3
rs 9.584
c 0
b 0
f 0
1
<?php
2
/**
3
 * Licensed to CRATE Technology GmbH("Crate") under one or more contributor
4
 * license agreements.  See the NOTICE file distributed with this work for
5
 * additional information regarding copyright ownership.  Crate licenses
6
 * this file to you under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.  You may
8
 * obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
15
 * License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * However, if you have executed another commercial license agreement
19
 * with Crate these terms will supersede the license and you may use the
20
 * software solely pursuant to the terms of the relevant commercial agreement.
21
 */
22
23
declare(strict_types=1);
24
25
namespace Crate\PDO\Http;
26
27
use Crate\PDO\Exception\RuntimeException;
28
use Crate\PDO\PDO;
29
use Crate\PDO\PDOInterface;
30
use Crate\Stdlib\BulkResponse;
31
use Crate\Stdlib\BulkResponseInterface;
32
use Crate\Stdlib\Collection;
33
use Crate\Stdlib\CollectionInterface;
34
use GuzzleHttp\Client;
35
use GuzzleHttp\ClientInterface;
36
use GuzzleHttp\Exception\BadResponseException;
37
use GuzzleHttp\Exception\ConnectException;
38
use GuzzleHttp\RequestOptions;
39
40
/**
41
 * Class ServerPool
42
 *
43
 * Very basic round robin implementation
44
 */
45
final class ServerPool implements ServerInterface
46
{
47
    private const DEFAULT_SERVER = 'localhost:4200';
48
49
    /**
50
     * @var string
51
     */
52
    private $protocol = 'http';
53
54
    /**
55
     * @var array
56
     */
57
    private $httpOptions = [];
58
59
    /**
60
     * @var string[]
61
     */
62
    private $availableServers = [];
63
64
    /**
65
     * @var Client
66
     */
67
    private $httpClient;
68
69
    /**
70
     * Client constructor.
71
     *
72
     * @param array $servers
73
     * @param ClientInterface|null $client
74
     */
75 28
    public function __construct(array $servers, ClientInterface $client = null)
76
    {
77 28
        if (\count($servers) === 0) {
78
            $servers = [self::DEFAULT_SERVER];
79
        }
80
81
        // micro optimization so we don't always hit the same server first
82 28
        shuffle($servers);
83
84 28
        foreach ($servers as $server) {
85 28
            $this->availableServers[] = $server;
86
        }
87
88 28
        $this->httpClient = $client ?: new Client();
89
    }
90
91
    /**
92
     * {@Inheritdoc}
93
     * @throws \GuzzleHttp\Exception\ConnectException
94
     */
95 26
    public function execute(string $query, array $parameters): CollectionInterface
96
    {
97 26
        $numServers = count($this->availableServers) - 1;
98
99 26
        for ($i = 0; $i <= $numServers; $i++) {
100
            // always get the first available server
101 26
            $server = $this->availableServers[0];
102
103
            // Move the selected server to the end of the stack
104 26
            $this->availableServers[] = array_shift($this->availableServers);
105
106
            try {
107 26
                return $this->sendRequest($server, $query, $parameters, false);
108 6
            } catch (ConnectException $exception) {
109
                // Catch it before the BadResponseException but do nothing.
110 4
                continue;
111 2
            } catch (BadResponseException $exception) {
112 2
                $body = (string)$exception->getResponse()->getBody();
113 2
                $json = json_decode($body, true);
114
115 2
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
116
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
117
                }
118
119 2
                $errorCode    = $json['error']['code'];
120 2
                $errorMessage = $json['error']['message'];
121
122 2
                throw new RuntimeException($errorMessage, $errorCode, $exception);
123
            }
124
        }
125
126 2
        throw new ConnectException(
127 2
            sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $exception does not seem to be defined for all execution paths leading up to this point.
Loading history...
128 2
            $exception->getRequest(),
129 2
            $exception
130 2
        );
131
    }
132
133
    /**
134
     * {@Inheritdoc}
135
     * @throws \GuzzleHttp\Exception\ConnectException
136
     */
137 2
    public function executeBulk(string $query, array $parameters): BulkResponseInterface
138
    {
139 2
        $numServers = count($this->availableServers) - 1;
140
141 2
        for ($i = 0; $i <= $numServers; $i++) {
142
            // always get the first available server
143 2
            $server = $this->availableServers[0];
144
145
            // Move the selected server to the end of the stack
146 2
            $this->availableServers[] = array_shift($this->availableServers);
147
148
            try {
149 2
                return $this->sendRequest($server, $query, $parameters, true);
150
            } catch (ConnectException $exception) {
151
                // Catch it before the BadResponseException but do nothing.
152
                continue;
153
            } catch (BadResponseException $exception) {
154
                $body = (string)$exception->getResponse()->getBody();
155
                $json = json_decode($body, true);
156
157
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
158
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
159
                }
160
161
                $errorCode = $json['error']['code'];
162
                $errorMessage = $json['error']['message'];
163
164
                throw new RuntimeException($errorMessage, $errorCode, $exception);
165
            }
166
        }
167
168
        throw new ConnectException(
169
            sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $exception does not seem to be defined for all execution paths leading up to this point.
Loading history...
170
            $exception->getRequest(),
171
            $exception
172
        );
173
    }
174
175 28
    private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
176
    {
177 28
        $args_name = 'args';
178 28
        if ($bulk_mode) {
179 2
            $args_name = 'bulk_args';
180
        }
181 28
        $options = array_merge($this->httpOptions, [
182 28
            'base_uri' => sprintf('%s://%s', $this->protocol, $server),
183 28
            'json'     => [
184 28
                'stmt' => $query,
185 28
                $args_name => $parameters,
186 28
            ],
187 28
        ]);
188
189 28
        $response     = $this->httpClient->request('POST', '/_sql', $options);
190 24
        $responseBody = json_decode((string)$response->getBody(), true);
191
192 24
        if ($bulk_mode) {
193 2
            return new BulkResponse(
194 2
                $responseBody['results'],
195 2
                $responseBody['cols'],
196 2
                $responseBody['duration']
197 2
            );
198
        } else {
199 22
            return new Collection(
200 22
                $responseBody['rows'],
201 22
                $responseBody['cols'],
202 22
                $responseBody['duration'],
203 22
                $responseBody['rowcount']
204 22
            );
205
        }
206
    }
207
208
    /**
209
     * {@Inheritdoc}
210
     */
211
    public function getServerInfo(): array
212
    {
213
        return [
214
            'serverVersion' => $this->getServerVersion(),
215
        ];
216
    }
217
218
    /**
219
     * {@Inheritdoc}
220
     */
221
    public function getServerVersion(): string
222
    {
223
        $result = $this->execute("select version['number'] from sys.nodes limit 1", []);
224
225
        if (count($result->getRows()) !== 1) {
226
            throw new RuntimeException('Failed to determine server version');
227
        }
228
229
        return $result->getRows()[0][0];
230
    }
231
232
    /**
233
     * Reconfigure the the server pool based on the attributes in PDO
234
     *
235
     * @param PDOInterface $pdo
236
     */
237 28
    public function configure(PDOInterface $pdo): void
238
    {
239 28
        $sslMode = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_MODE);
240
241 28
        $protocol = $sslMode === PDO::CRATE_ATTR_SSL_MODE_DISABLED ? 'http' : 'https';
242
243 28
        $options = [
244 28
            RequestOptions::TIMEOUT         => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
245 28
            RequestOptions::CONNECT_TIMEOUT => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
246 28
            RequestOptions::AUTH            => $pdo->getAttribute(PDO::CRATE_ATTR_HTTP_BASIC_AUTH) ?: null,
247 28
            RequestOptions::HEADERS         => [
248 28
                'Default-Schema' => $pdo->getAttribute(PDO::CRATE_ATTR_DEFAULT_SCHEMA),
249 28
            ],
250 28
        ];
251
252 28
        if ($sslMode === PDO::CRATE_ATTR_SSL_MODE_ENABLED_BUT_WITHOUT_HOST_VERIFICATION) {
253 4
            $options['verify'] = false;
254
        }
255
256 28
        $ca         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PATH);
257 28
        $caPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PASSWORD);
258
259 28
        if ($ca) {
260 4
            if ($caPassword) {
261 2
                $options[RequestOptions::VERIFY] = [$ca, $caPassword];
262
            } else {
263 4
                $options[RequestOptions::VERIFY] = $ca;
264
            }
265
        }
266
267 28
        $cert         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PATH);
268 28
        $certPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PASSWORD);
269
270 28
        if ($cert) {
271 4
            if ($certPassword) {
272 2
                $options[RequestOptions::CERT] = [$cert, $certPassword];
273
            } else {
274 4
                $options[RequestOptions::CERT] = $cert;
275
            }
276
        }
277
278 28
        $key         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PATH);
279 28
        $keyPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PASSWORD);
280
281 28
        if ($key) {
282 4
            if ($keyPassword) {
283 2
                $options[RequestOptions::SSL_KEY] = [$key, $keyPassword];
284
            } else {
285 4
                $options[RequestOptions::SSL_KEY] = $key;
286
            }
287
        }
288
289 28
        $this->protocol    = $protocol;
290 28
        $this->httpOptions = $options;
291
    }
292
}
293