Passed
Pull Request — main (#143)
by Andreas
01:52
created

ServerPool::sendRequest()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 29
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 21
nc 4
nop 4
dl 0
loc 29
ccs 24
cts 24
cp 1
crap 3
rs 9.584
c 0
b 0
f 0
1
<?php
2
/**
3
 * Licensed to CRATE Technology GmbH("Crate") under one or more contributor
4
 * license agreements.  See the NOTICE file distributed with this work for
5
 * additional information regarding copyright ownership.  Crate licenses
6
 * this file to you under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.  You may
8
 * obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
15
 * License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * However, if you have executed another commercial license agreement
19
 * with Crate these terms will supersede the license and you may use the
20
 * software solely pursuant to the terms of the relevant commercial agreement.
21
 */
22
23
declare(strict_types=1);
24
25
namespace Crate\PDO\Http;
26
27
use Crate\PDO\Exception\RuntimeException;
28
use Crate\PDO\PDO;
29
use Crate\PDO\PDOInterface;
30
use Crate\Stdlib\BulkResponse;
31
use Crate\Stdlib\BulkResponseInterface;
32
use Crate\Stdlib\Collection;
33
use Crate\Stdlib\CollectionInterface;
34
use GuzzleHttp\Client;
35
use GuzzleHttp\ClientInterface;
36
use GuzzleHttp\Exception\BadResponseException;
37
use GuzzleHttp\Exception\ConnectException;
38
use GuzzleHttp\RequestOptions;
39
40
/**
41
 * Class ServerPool
42
 *
43
 * Very basic round robin implementation
44
 */
45
final class ServerPool implements ServerInterface
46
{
47
    private const DEFAULT_SERVER = 'localhost:4200';
48
49
    /**
50
     * @var string
51
     */
52
    private $protocol = 'http';
53
54
    /**
55
     * @var array
56
     */
57
    private $httpOptions = [];
58
59
    /**
60
     * @var string[]
61
     */
62
    private $availableServers = [];
63
64
    /**
65
     * @var Client
66
     */
67
    private $httpClient;
68
69
    /**
70
     * Client constructor.
71
     *
72
     * @param array $servers
73
     * @param ClientInterface|null $client
74
     */
75 28
    public function __construct(array $servers, ClientInterface $client = null)
76
    {
77 28
        if (\count($servers) === 0) {
78
            $servers = [self::DEFAULT_SERVER];
79
        }
80
81
        // micro optimization so we don't always hit the same server first
82 28
        shuffle($servers);
83
84 28
        foreach ($servers as $server) {
85 28
            $this->availableServers[] = $server;
86
        }
87
88 28
        $this->httpClient = $client ?: new Client();
89
    }
90
91
    /**
92
     * {@Inheritdoc}
93
     * @throws \GuzzleHttp\Exception\ConnectException
94
     */
95 26
    public function execute(string $query, array $parameters): ?CollectionInterface
96
    {
97 26
        return $this->executeGeneric($query, $parameters, false);
98
    }
99
100
    /**
101
     * {@Inheritdoc}
102
     * @throws \GuzzleHttp\Exception\ConnectException
103
     */
104 2
    public function executeBulk(string $query, array $parameters): ?BulkResponseInterface
105
    {
106 2
        return $this->executeGeneric($query, $parameters, true);
107
    }
108
109
    /**
110
     * {@Inheritdoc}
111
     * @throws \GuzzleHttp\Exception\ConnectException
112
     */
113 28
    private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
114
    {
115 28
        $exception = null;
116 28
        $numServers = count($this->availableServers) - 1;
117
118 28
        for ($i = 0; $i <= $numServers; $i++) {
119
            // always get the first available server
120 28
            $server = $this->availableServers[0];
121
122
            // Move the selected server to the end of the stack
123 28
            $this->availableServers[] = array_shift($this->availableServers);
124
125
            try {
126 28
                return $this->sendRequest($server, $query, $parameters, $bulk_mode);
127 6
            } catch (ConnectException $exception) {
128
                // Catch it before the BadResponseException but do nothing.
129 4
                continue;
130 2
            } catch (BadResponseException $exception) {
131 2
                $body = (string)$exception->getResponse()->getBody();
132 2
                $json = json_decode($body, true);
133
134 2
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
135
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
136
                }
137
138 2
                $errorCode = $json['error']['code'];
139 2
                $errorMessage = $json['error']['message'];
140
141 2
                throw new RuntimeException($errorMessage, $errorCode, $exception);
142
            }
143
        }
144
145 2
        if ($exception !== null) {
146 2
            throw new ConnectException(
147 2
                sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
148 2
                $exception->getRequest(),
149 2
                $exception
150 2
            );
151
        }
152
    }
153
154 28
    private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
155
    {
156 28
        $args_name = 'args';
157 28
        if ($bulk_mode) {
158 2
            $args_name = 'bulk_args';
159
        }
160 28
        $options = array_merge($this->httpOptions, [
161 28
            'base_uri' => sprintf('%s://%s', $this->protocol, $server),
162 28
            'json'     => [
163 28
                'stmt' => $query,
164 28
                $args_name => $parameters,
165 28
            ],
166 28
        ]);
167
168 28
        $response     = $this->httpClient->request('POST', '/_sql', $options);
169 24
        $responseBody = json_decode((string)$response->getBody(), true);
170
171 24
        if ($bulk_mode) {
172 2
            return new BulkResponse(
173 2
                $responseBody['results'],
174 2
                $responseBody['cols'],
175 2
                $responseBody['duration']
176 2
            );
177
        } else {
178 22
            return new Collection(
179 22
                $responseBody['rows'],
180 22
                $responseBody['cols'],
181 22
                $responseBody['duration'],
182 22
                $responseBody['rowcount']
183 22
            );
184
        }
185
    }
186
187
    /**
188
     * {@Inheritdoc}
189
     */
190
    public function getServerInfo(): array
191
    {
192
        return [
193
            'serverVersion' => $this->getServerVersion(),
194
        ];
195
    }
196
197
    /**
198
     * {@Inheritdoc}
199
     */
200
    public function getServerVersion(): string
201
    {
202
        $result = $this->execute("select version['number'] from sys.nodes limit 1", []);
203
204
        if (count($result->getRows()) !== 1) {
205
            throw new RuntimeException('Failed to determine server version');
206
        }
207
208
        return $result->getRows()[0][0];
209
    }
210
211
    /**
212
     * Reconfigure the the server pool based on the attributes in PDO
213
     *
214
     * @param PDOInterface $pdo
215
     */
216 28
    public function configure(PDOInterface $pdo): void
217
    {
218 28
        $sslMode = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_MODE);
219
220 28
        $protocol = $sslMode === PDO::CRATE_ATTR_SSL_MODE_DISABLED ? 'http' : 'https';
221
222 28
        $options = [
223 28
            RequestOptions::TIMEOUT         => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
224 28
            RequestOptions::CONNECT_TIMEOUT => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
225 28
            RequestOptions::AUTH            => $pdo->getAttribute(PDO::CRATE_ATTR_HTTP_BASIC_AUTH) ?: null,
226 28
            RequestOptions::HEADERS         => [
227 28
                'Default-Schema' => $pdo->getAttribute(PDO::CRATE_ATTR_DEFAULT_SCHEMA),
228 28
            ],
229 28
        ];
230
231 28
        if ($sslMode === PDO::CRATE_ATTR_SSL_MODE_ENABLED_BUT_WITHOUT_HOST_VERIFICATION) {
232 4
            $options['verify'] = false;
233
        }
234
235 28
        $ca         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PATH);
236 28
        $caPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CA_PASSWORD);
237
238 28
        if ($ca) {
239 4
            if ($caPassword) {
240 2
                $options[RequestOptions::VERIFY] = [$ca, $caPassword];
241
            } else {
242 4
                $options[RequestOptions::VERIFY] = $ca;
243
            }
244
        }
245
246 28
        $cert         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PATH);
247 28
        $certPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_CERT_PASSWORD);
248
249 28
        if ($cert) {
250 4
            if ($certPassword) {
251 2
                $options[RequestOptions::CERT] = [$cert, $certPassword];
252
            } else {
253 4
                $options[RequestOptions::CERT] = $cert;
254
            }
255
        }
256
257 28
        $key         = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PATH);
258 28
        $keyPassword = $pdo->getAttribute(PDO::CRATE_ATTR_SSL_KEY_PASSWORD);
259
260 28
        if ($key) {
261 4
            if ($keyPassword) {
262 2
                $options[RequestOptions::SSL_KEY] = [$key, $keyPassword];
263
            } else {
264 4
                $options[RequestOptions::SSL_KEY] = $key;
265
            }
266
        }
267
268 28
        $this->protocol    = $protocol;
269 28
        $this->httpOptions = $options;
270
    }
271
}
272