ServerPool::getServerInfo()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 4
ccs 0
cts 3
cp 0
crap 2
rs 10
1
<?php
2
/**
3
 * Licensed to CRATE Technology GmbH("Crate") under one or more contributor
4
 * license agreements.  See the NOTICE file distributed with this work for
5
 * additional information regarding copyright ownership.  Crate licenses
6
 * this file to you under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.  You may
8
 * obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
15
 * License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * However, if you have executed another commercial license agreement
19
 * with Crate these terms will supersede the license and you may use the
20
 * software solely pursuant to the terms of the relevant commercial agreement.
21
 */
22
23
declare(strict_types=1);
24
25
namespace Crate\PDO\Http;
26
27
use Crate\PDO\Exception\RuntimeException;
28
use Crate\PDO\PDOCrateDB;
29
use Crate\PDO\PDOInterface;
30
use Crate\Stdlib\BulkResponse;
31
use Crate\Stdlib\BulkResponseInterface;
32
use Crate\Stdlib\Collection;
33
use Crate\Stdlib\CollectionInterface;
34
use GuzzleHttp\Client;
35
use GuzzleHttp\ClientInterface;
36
use GuzzleHttp\Exception\BadResponseException;
37
use GuzzleHttp\Exception\ConnectException;
38
use GuzzleHttp\RequestOptions;
39
use PDO;
40
41
/**
42
 * Class ServerPool
43
 *
44
 * Very basic round robin implementation
45
 */
46
final class ServerPool implements ServerInterface
47
{
48
    private const DEFAULT_SERVER = 'localhost:4200';
49
50
    /**
51
     * @var string
52
     */
53
    private $protocol = 'http';
54
55
    /**
56
     * @var array
57
     */
58
    private $httpOptions = [];
59
60
    /**
61
     * @var string[]
62
     */
63
    private $availableServers = [];
64
65
    /**
66
     * @var Client
67
     */
68
    private $httpClient;
69
70
    /**
71
     * Client constructor.
72
     *
73
     * @param array $servers
74
     * @param ClientInterface|null $client
75
     */
76 30
    public function __construct(array $servers, ?ClientInterface $client = null)
77
    {
78 30
        if (\count($servers) === 0) {
79
            $servers = [self::DEFAULT_SERVER];
80
        }
81
82
        // micro optimization so we don't always hit the same server first
83 30
        shuffle($servers);
84
85 30
        foreach ($servers as $server) {
86 30
            $this->availableServers[] = $server;
87
        }
88
89 30
        $this->httpClient = $client ?: new Client();
90
    }
91
92
    /**
93
     * {@Inheritdoc}
94
     * @throws \GuzzleHttp\Exception\ConnectException
95
     */
96 26
    public function execute(string $query, array $parameters): ?CollectionInterface
97
    {
98 26
        return $this->executeGeneric($query, $parameters, false);
99
    }
100
101
    /**
102
     * {@Inheritdoc}
103
     * @throws \GuzzleHttp\Exception\ConnectException
104
     */
105 2
    public function executeBulk(string $query, array $parameters): ?BulkResponseInterface
106
    {
107 2
        return $this->executeGeneric($query, $parameters, true);
108
    }
109
110
    /**
111
     * {@Inheritdoc}
112
     * @throws \GuzzleHttp\Exception\ConnectException
113
     */
114 28
    private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
115
    {
116 28
        $exception = null;
117 28
        $numServers = count($this->availableServers) - 1;
118
119 28
        for ($i = 0; $i <= $numServers; $i++) {
120
            // always get the first available server
121 28
            $server = $this->availableServers[0];
122
123
            // Move the selected server to the end of the stack
124 28
            $this->availableServers[] = array_shift($this->availableServers);
125
126
            try {
127 28
                return $this->sendRequest($server, $query, $parameters, $bulk_mode);
128 6
            } catch (ConnectException $exception) {
129
                // Catch it before the BadResponseException but do nothing.
130 4
                continue;
131 2
            } catch (BadResponseException $exception) {
132 2
                $body = (string)$exception->getResponse()->getBody();
133 2
                $json = json_decode($body, true);
134
135 2
                if ($json === null && json_last_error() !== JSON_ERROR_NONE) {
136
                    throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
137
                }
138
139 2
                $errorCode = $json['error']['code'];
140 2
                $errorMessage = $json['error']['message'];
141
142 2
                throw new RuntimeException($errorMessage, $errorCode, $exception);
143
            }
144
        }
145
146 2
        if ($exception !== null) {
147 2
            throw new ConnectException(
148 2
                sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
149 2
                $exception->getRequest(),
150 2
                $exception
151 2
            );
152
        }
153
    }
154
155 28
    private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
156
    {
157 28
        $args_name = 'args';
158 28
        if ($bulk_mode) {
159 2
            $args_name = 'bulk_args';
160
        }
161 28
        $options = array_merge($this->httpOptions, [
162 28
            'base_uri' => sprintf('%s://%s', $this->protocol, $server),
163 28
            'json'     => [
164 28
                'stmt' => $query,
165 28
                $args_name => $parameters,
166 28
            ],
167 28
        ]);
168
169 28
        $response     = $this->httpClient->request('POST', '/_sql', $options);
170 24
        $responseBody = json_decode((string)$response->getBody(), true);
171
172 24
        if ($bulk_mode) {
173 2
            return new BulkResponse(
174 2
                $responseBody['results'],
175 2
                $responseBody['cols'],
176 2
                $responseBody['duration']
177 2
            );
178
        } else {
179 22
            return new Collection(
180 22
                $responseBody['rows'],
181 22
                $responseBody['cols'],
182 22
                $responseBody['duration'],
183 22
                $responseBody['rowcount']
184 22
            );
185
        }
186
    }
187
188
    /**
189
     * {@Inheritdoc}
190
     */
191
    public function getServerInfo(): array
192
    {
193
        return [
194
            'serverVersion' => $this->getServerVersion(),
195
        ];
196
    }
197
198
    /**
199
     * {@Inheritdoc}
200
     */
201
    public function getServerVersion(): string
202
    {
203
        $result = $this->execute("select version['number'] from sys.nodes limit 1", []);
204
205
        if (count($result->getRows()) !== 1) {
206
            throw new RuntimeException('Failed to determine server version');
207
        }
208
209
        return $result->getRows()[0][0];
210
    }
211
212
    /**
213
     * Reconfigure the the server pool based on the attributes in PDO
214
     *
215
     * @param PDOInterface $pdo
216
     */
217 30
    public function configure(PDOInterface $pdo): void
218
    {
219 30
        $sslMode = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_MODE);
220
221 30
        $protocol = $sslMode === PDOCrateDB::CRATE_ATTR_SSL_MODE_DISABLED ? 'http' : 'https';
222
223 30
        $options = [
224 30
            RequestOptions::TIMEOUT         => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
225 30
            RequestOptions::CONNECT_TIMEOUT => $pdo->getAttribute(PDO::ATTR_TIMEOUT),
226 30
            RequestOptions::AUTH            => $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_HTTP_BASIC_AUTH) ?: null,
227 30
            RequestOptions::HEADERS         => [
228 30
                'Default-Schema' => $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_DEFAULT_SCHEMA),
229 30
            ],
230 30
        ];
231
232 30
        if ($sslMode === PDOCrateDB::CRATE_ATTR_SSL_MODE_ENABLED_BUT_WITHOUT_HOST_VERIFICATION) {
233 4
            $options['verify'] = false;
234
        }
235
236 30
        $ca         = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_CA_PATH);
237 30
        $caPassword = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_CA_PASSWORD);
238
239 30
        if ($ca) {
240 4
            if ($caPassword) {
241 2
                $options[RequestOptions::VERIFY] = [$ca, $caPassword];
242
            } else {
243 4
                $options[RequestOptions::VERIFY] = $ca;
244
            }
245
        }
246
247 30
        $cert         = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_CERT_PATH);
248 30
        $certPassword = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_CERT_PASSWORD);
249
250 30
        if ($cert) {
251 4
            if ($certPassword) {
252 2
                $options[RequestOptions::CERT] = [$cert, $certPassword];
253
            } else {
254 4
                $options[RequestOptions::CERT] = $cert;
255
            }
256
        }
257
258 30
        $key         = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_KEY_PATH);
259 30
        $keyPassword = $pdo->getAttribute(PDOCrateDB::CRATE_ATTR_SSL_KEY_PASSWORD);
260
261 30
        if ($key) {
262 4
            if ($keyPassword) {
263 2
                $options[RequestOptions::SSL_KEY] = [$key, $keyPassword];
264
            } else {
265 4
                $options[RequestOptions::SSL_KEY] = $key;
266
            }
267
        }
268
269 30
        $this->protocol    = $protocol;
270 30
        $this->httpOptions = $options;
271
    }
272
}
273