Passed
Pull Request — main (#143)
by Andreas
12:11
created

ServerPool::execute_bulk()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 36
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 15.566

Importance

Changes 0
Metric Value
cc 6
eloc 20
c 0
b 0
f 0
nc 5
nop 2
dl 0
loc 36
ccs 5
cts 14
cp 0.357
crap 15.566
rs 8.9777
1
<?php
2
/**
3
 * Licensed to CRATE Technology GmbH("Crate") under one or more contributor
4
 * license agreements.  See the NOTICE file distributed with this work for
5
 * additional information regarding copyright ownership.  Crate licenses
6
 * this file to you under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.  You may
8
 * obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
15
 * License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * However, if you have executed another commercial license agreement
19
 * with Crate these terms will supersede the license and you may use the
20
 * software solely pursuant to the terms of the relevant commercial agreement.
21
 */
22
23
declare(strict_types=1);
24
25
namespace Crate\PDO\Http;
26
27
use Crate\PDO\Exception\RuntimeException;
28
use Crate\PDO\PDO;
29
use Crate\PDO\PDOInterface;
30
use Crate\Stdlib\BulkResponse;
31
use Crate\Stdlib\BulkResponseInterface;
32
use Crate\Stdlib\Collection;
33
use Crate\Stdlib\CollectionInterface;
34
use GuzzleHttp\Client;
35
use GuzzleHttp\ClientInterface;
36
use GuzzleHttp\Exception\BadResponseException;
37
use GuzzleHttp\Exception\ConnectException;
38
use GuzzleHttp\RequestOptions;
39
40
/**
41
 * Class ServerPool
42
 *
43
 * Very basic round robin implementation
44
 */
45
final class ServerPool implements ServerInterface
46
{
47
    private const DEFAULT_SERVER = 'localhost:4200';
48
49
    /**
50
     * @var string
51
     */
52
    private $protocol = 'http';
53
54
    /**
55
     * @var array
56
     */
57
    private $httpOptions = [];
58
59
    /**
60
     * @var string[]
61
     */
62
    private $availableServers = [];
63
64
    /**
65
     * @var Client
66
     */
67
    private $httpClient;
68
69
    /**
70
     * Client constructor.
71
     *
72
     * @param array $servers
73 26
     * @param ClientInterface|null $client
74
     */
75 26
    public function __construct(array $servers, ClientInterface $client = null)
76
    {
77
        if (\count($servers) === 0) {
78
            $servers = [self::DEFAULT_SERVER];
79
        }
80 26
81
        // micro optimization so we don't always hit the same server first
82 26
        shuffle($servers);
83 26
84
        foreach ($servers as $server) {
85
            $this->availableServers[] = $server;
86 26
        }
87
88
        $this->httpClient = $client ?: new Client();
89
    }
90
91
    /**
92
     * {@Inheritdoc}
93 26
     * @throws \GuzzleHttp\Exception\ConnectException
94
     */
95 26
    public function execute(string $query, array $parameters): CollectionInterface
96
    {
97 26
        $numServers = count($this->availableServers) - 1;
98
99 26
        for ($i = 0; $i <= $numServers; $i++) {
100
            // always get the first available server
101
            $server = $this->availableServers[0];
102 26
103
            // Move the selected server to the end of the stack
104 26
            $this->availableServers[] = array_shift($this->availableServers);
105 26
106 26
            try {
107 26
                return $this->send_request($server, $query, $parameters, false);
108 26
0 ignored issues
show
Coding Style introduced by
Blank line found at end of control structure
Loading history...
109 26
            } catch (ConnectException $exception) {
110 26
                // Catch it before the BadResponseException but do nothing.
111
                continue;
112
            } catch (BadResponseException $exception) {
113 26
                $body = (string)$exception->getResponse()->getBody();
114 22
                $json = json_decode($body, true);
115
116 22
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
117 22
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
118 22
                }
119 22
120 22
                $errorCode    = $json['error']['code'];
121 22
                $errorMessage = $json['error']['message'];
122 6
123
                throw new RuntimeException($errorMessage, $errorCode, $exception);
124 4
            }
125 2
        }
126 2
127 2
        throw new ConnectException(
128
            sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $exception does not seem to be defined for all execution paths leading up to this point.
Loading history...
129 2
            $exception->getRequest(),
130
            $exception
131
        );
132
    }
133 2
134 2
    /**
135
     * {@Inheritdoc}
136 2
     * @throws \GuzzleHttp\Exception\ConnectException
137
     */
138
    public function execute_bulk(string $query, array $parameters): BulkResponseInterface
0 ignored issues
show
Coding Style introduced by
Method name "ServerPool::execute_bulk" is not in camel caps format
Loading history...
139
    {
140 2
        $numServers = count($this->availableServers) - 1;
141 2
142 2
        for ($i = 0; $i <= $numServers; $i++) {
143 2
            // always get the first available server
144 2
            $server = $this->availableServers[0];
145
146
            // Move the selected server to the end of the stack
147
            $this->availableServers[] = array_shift($this->availableServers);
148
149
            try {
150
                return $this->send_request($server, $query, $parameters, true);
151
0 ignored issues
show
Coding Style introduced by
Blank line found at end of control structure
Loading history...
152
            } catch (ConnectException $exception) {
153
                // Catch it before the BadResponseException but do nothing.
154
                continue;
155
            } catch (BadResponseException $exception) {
156
                $body = (string)$exception->getResponse()->getBody();
157
                $json = json_decode($body, true);
158
159
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
160
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
161
                }
162
163
                $errorCode = $json['error']['code'];
164
                $errorMessage = $json['error']['message'];
165
166
                throw new RuntimeException($errorMessage, $errorCode, $exception);
167
            }
168
        }
169
170
        throw new ConnectException(
171
            sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $exception does not seem to be defined for all execution paths leading up to this point.
Loading history...
172
            $exception->getRequest(),
173
            $exception
174
        );
175
    }
176 26
177
    private function send_request(string $server, string $query, array $parameters, bool $bulk_mode = false)
0 ignored issues
show
Coding Style introduced by
Method name "ServerPool::send_request" is not in camel caps format
Loading history...
178 26
    {
179
        $args_name = 'args';
180 26
        if ($bulk_mode) {
181
            $args_name = 'bulk_args';
182 26
        }
183 26
        $options = array_merge($this->httpOptions, [
184 26
            'base_uri' => sprintf('%s://%s', $this->protocol, $server),
185 26
            'json'     => [
186 26
                'stmt' => $query,
187 26
                $args_name => $parameters,
188 26
            ],
189 26
        ]);
190
191 26
        $response     = $this->httpClient->request('POST', '/_sql', $options);
192 4
        $responseBody = json_decode((string)$response->getBody(), true);
193
194
        if ($bulk_mode) {
195 26
            return new BulkResponse(
196 26
                $responseBody['results'],
197
                $responseBody['cols'],
198 26
                $responseBody['duration']
199 4
            );
200 2
        } else {
201
            return new Collection(
202 4
                $responseBody['rows'],
203
                $responseBody['cols'],
204
                $responseBody['duration'],
205
                $responseBody['rowcount']
206 26
            );
207 26
        }
208
    }
209 26
210 4
    /**
211 2
     * {@Inheritdoc}
212
     */
213 4
    public function getServerInfo(): array
214
    {
215
        return [
216
            'serverVersion' => $this->getServerVersion(),
217 26
        ];
218 26
    }
219
220 26
    /**
221 4
     * {@Inheritdoc}
222 2
     */
223
    public function getServerVersion(): string
224 4
    {
225
        $result = $this->execute("select version['number'] from sys.nodes limit 1", []);
226
227
        if (count($result->getRows()) !== 1) {
228 26
            throw new RuntimeException('Failed to determine server version');
229 26
        }
230
231
        return $result->getRows()[0][0];
232
    }
233
234
    /**
235
     * Reconfigure the the server pool based on the attributes in PDO
236
     *
237
     * @param PDOInterface $pdo
238
     */
239
    public function configure(PDOInterface $pdo): void
240
    {
241
        $sslMode = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_MODE);
242
243
        $protocol = $sslMode === PDO::CRATE_ATTR_SSL_MODE_DISABLED ? 'http' : 'https';
244
245
        $options = [
246
            RequestOptions::TIMEOUT         => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
247
            RequestOptions::CONNECT_TIMEOUT => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
248
            RequestOptions::AUTH            => $pdo->getAttribute(PDO::CRATE_ATTR_HTTP_BASIC_AUTH) ?: null,
249
            RequestOptions::HEADERS         => [
250
                'Default-Schema' => $pdo->getAttribute(PDO::CRATE_ATTR_DEFAULT_SCHEMA),
251
            ],
252
        ];
253
254
        if ($sslMode === PDO::CRATE_ATTR_SSL_MODE_ENABLED_BUT_WITHOUT_HOST_VERIFICATION) {
255
            $options['verify'] = false;
256
        }
257
258
        $ca         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PATH);
259
        $caPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PASSWORD);
260
261
        if ($ca) {
262
            if ($caPassword) {
263
                $options[RequestOptions::VERIFY] = [$ca, $caPassword];
264
            } else {
265
                $options[RequestOptions::VERIFY] = $ca;
266
            }
267
        }
268
269
        $cert         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PATH);
270
        $certPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PASSWORD);
271
272
        if ($cert) {
273
            if ($certPassword) {
274
                $options[RequestOptions::CERT] = [$cert, $certPassword];
275
            } else {
276
                $options[RequestOptions::CERT] = $cert;
277
            }
278
        }
279
280
        $key         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PATH);
281
        $keyPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PASSWORD);
282
283
        if ($key) {
284
            if ($keyPassword) {
285
                $options[RequestOptions::SSL_KEY] = [$key, $keyPassword];
286
            } else {
287
                $options[RequestOptions::SSL_KEY] = $key;
288
            }
289
        }
290
291
        $this->protocol    = $protocol;
292
        $this->httpOptions = $options;
293
    }
294
}
295