StatementClient   A
last analyzed

Complexity

Total Complexity 39

Size/Duplication

Total Lines 321
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 15

Importance

Changes 0
Metric Value
wmc 39
lcom 1
cbo 15
dl 0
loc 321
rs 9.28
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A prepareRequest() 0 10 1
B buildQueryRequest() 0 35 6
A execute() 0 20 3
A current() 0 4 1
A advance() 0 12 3
A cancelLeafStage() 0 21 3
A setNanoseconds() 0 4 1
A getQuery() 0 4 1
A isFailed() 0 4 1
A isValid() 0 4 3
A isGone() 0 4 1
A isClosed() 0 4 1
A close() 0 8 2
A requestFailedException() 0 32 3
B detectResponse() 0 28 7
1
<?php
2
declare(strict_types=1);
3
4
/**
5
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
6
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
7
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
8
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
9
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
10
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
11
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
12
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
13
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
14
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
15
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
16
 */
17
18
namespace Ytake\PrestoClient;
19
20
use GuzzleHttp\Psr7\Uri;
21
use GuzzleHttp\Client;
22
use GuzzleHttp\Psr7\Request;
23
use GuzzleHttp\ClientInterface;
24
use GuzzleHttp\Psr7\UriNormalizer;
25
use GuzzleHttp\Exception\ClientException;
26
use GuzzleHttp\Exception\RequestException;
27
use Fig\Http\Message\StatusCodeInterface;
28
use Fig\Http\Message\RequestMethodInterface;
29
use Psr\Http\Message\ResponseInterface;
30
use Ytake\PrestoClient\Exception\QueryErrorException;
31
use Ytake\PrestoClient\Exception\RequestFailedException;
32
use Ytake\PrestoClient\Session\Property;
33
34
/**
35
 * Class StatementClient
36
 *
37
 * @author Yuuki Takezawa <[email protected]>
38
 */
39
class StatementClient
40
{
41
    const STATEMENT_URI = '/v1/statement';
42
43
    /** @var ClientInterface */
44
    private $client;
45
46
    /** @var ClientSession */
47
    private $session;
48
49
    /** @var QueryResult */
50
    protected $queryResult;
51
52
    /** @var array<string, string> */
53
    protected $headers = [];
54
55
    /** @var string */
56
    protected $query;
57
58
    /** @var string */
59
    protected $nextUri;
60
61
    /** @var bool */
62
    private $gone = false;
63
64
    /** @var bool */
65
    private $valid = true;
66
67
    /** @var bool */
68
    private $closed = false;
69
70
    /** @var bool */
71
    private $fulfilled = false;
72
73
    /** @var int */
74
    protected $nanoseconds = 5000000000;
75
76
    /**
77
     * PrestoClient constructor.
78
     *
79
     * @param ClientSession        $session
80
     * @param string               $query
81
     * @param ClientInterface|null $client
82
     */
83
    public function __construct(ClientSession $session, string $query, ClientInterface $client = null)
84
    {
85
        $this->session = $session;
86
        $this->query = $query;
87
        $this->client = (is_null($client)) ? new Client : $client;
88
        $this->queryResult = new QueryResult();
89
        $this->prepareRequest();
90
    }
91
92
    private function prepareRequest()
93
    {
94
        $this->headers = array_merge(
95
            [
96
                PrestoHeaders::PRESTO_USER => $this->session->getUser(),
97
                'User-Agent'               => $this->session->getSource() . '/' . PrestoHeaders::VERSION
98
            ],
99
            $this->session->getHeader()
100
        );
101
    }
102
103
    /**
104
     * @param Request $request
105
     *
106
     * @return Request
107
     */
108
    protected function buildQueryRequest(Request $request): Request
109
    {
110
        $sessionTransaction = $this->session->getTransactionId();
111
        $transactionId = is_null($sessionTransaction) ? 'NONE' : $sessionTransaction->toString();
112
        $request = $request->withAddedHeader(PrestoHeaders::PRESTO_CATALOG, $this->session->getCatalog())
113
            ->withAddedHeader(PrestoHeaders::PRESTO_SCHEMA, $this->session->getSchema())
114
            ->withAddedHeader(PrestoHeaders::PRESTO_SOURCE, $this->session->getSource())
115
            ->withAddedHeader(PrestoHeaders::PRESTO_TRANSACTION_ID, $transactionId);
116
        $sessionProperty = $this->session->getProperty();
117
        if (count($sessionProperty)) {
118
            $sessions = [];
119
            /** @var Property $property */
120
            foreach ($sessionProperty as $property) {
121
                $sessions[] = $property->getKey() . '=' . $property->getValue();
122
            }
123
            $request = $request->withAddedHeader(
124
                PrestoHeaders::PRESTO_SESSION,
125
                implode(',', $sessions)
126
            );
127
        }
128
        $preparedStatements = $this->session->getPreparedStatement();
129
        if (count($preparedStatements)) {
130
            $statements = [];
131
            foreach ($preparedStatements as $preparedStatement) {
132
                $statements[] = urlencode($preparedStatement->getKey())
133
                    . '=' . urlencode($preparedStatement->getValue());
134
            }
135
            $request = $request->withAddedHeader(
136
                PrestoHeaders::PRESTO_PREPARED_STATEMENT,
137
                implode(',', $statements)
138
            );
139
        }
140
141
        return $request;
142
    }
143
144
    /**
145
     * @param int  $timeout
146
     * @param bool $debug
147
     *
148
     * @return void
149
     * @throws QueryErrorException
150
     */
151
    public function execute(int $timeout = 500000, bool $debug = false)
152
    {
153
        $normalize = UriNormalizer::normalize(
154
            new Uri($this->session->getHost() . StatementClient::STATEMENT_URI),
155
            UriNormalizer::REMOVE_DUPLICATE_SLASHES
156
        );
157
        $request = new Request(RequestMethodInterface::METHOD_POST, $normalize, $this->headers);
158
        try {
159
            $response = $this->client->send($this->buildQueryRequest($request), [
160
                'timeout' => $timeout,
161
                'body'    => $this->query,
162
                'debug'   => $debug,
163
            ]);
164
            if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
165
                $this->queryResult->set($response->getBody()->getContents());
166
            }
167
        } catch (ClientException $e) {
168
            throw new QueryErrorException($e->getMessage(), $e->getCode(), $e);
169
        }
170
    }
171
172
    /**
173
     * @return QueryResult
174
     * @throws QueryErrorException
175
     */
176
    public function current(): QueryResult
177
    {
178
        return $this->queryResult;
179
    }
180
181
    /**
182
     * @return bool
183
     */
184
    public function advance(): bool
185
    {
186
        $nextUri = $this->current()->getNextUri();
187
        if (is_null($nextUri) || $this->isClosed()) {
188
            $this->valid = false;
189
190
            return false;
191
        }
192
        $this->prepareRequest();
193
194
        return $this->detectResponse($nextUri);
195
    }
196
197
    /**
198
     * @param int  $timeout
199
     * @param bool $debug
200
     *
201
     * @return bool
202
     */
203
    public function cancelLeafStage(int $timeout = 500000, bool $debug = false): bool
204
    {
205
        if (!$this->isClosed()) {
206
            $cancelUri = $this->current()->getPartialCancelUri();
207
            if (is_null($cancelUri)) {
208
                return false;
209
            }
210
            $promise = $this->client->deleteAsync($cancelUri, [
211
                'timeout' => $timeout,
212
                'debug'   => $debug,
213
            ]);
214
            $promise->then(function (ResponseInterface $response) {
215
                $this->fulfilled = (StatusCodeInterface::STATUS_NO_CONTENT === $response->getStatusCode());
216
            }, function (RequestException $e) {
217
                throw new RequestFailedException($e->getMessage(), $e->getCode(), $e);
218
            });
219
            $promise->wait();
220
        }
221
222
        return $this->fulfilled;
223
    }
224
225
    /**
226
     * @param int $nanoseconds
227
     */
228
    public function setNanoseconds(int $nanoseconds)
229
    {
230
        $this->nanoseconds = $nanoseconds;
231
    }
232
233
    /**
234
     * @return string
235
     */
236
    public function getQuery(): string
237
    {
238
        return $this->query;
239
    }
240
241
    /**
242
     * @return bool
243
     */
244
    public function isFailed(): bool
245
    {
246
        return $this->queryResult->getError() !== null;
247
    }
248
249
    /**
250
     * @return bool
251
     */
252
    public function isValid(): bool
253
    {
254
        return $this->valid && (!$this->isGone()) && (!$this->isClosed());
255
    }
256
257
    /**
258
     * @return bool
259
     */
260
    public function isGone(): bool
261
    {
262
        return $this->gone;
263
    }
264
265
    /**
266
     * @return bool
267
     */
268
    public function isClosed(): bool
269
    {
270
        return $this->closed;
271
    }
272
273
    /**
274
     * close
275
     * HTTP method DELETE
276
     */
277
    public function close()
278
    {
279
        $uri = $this->current()->getNextUri();
280
        if (!is_null($uri)) {
281
            $this->client->deleteAsync($uri)->wait();
282
        }
283
        $this->closed = true;
284
    }
285
286
    /**
287
     * @param string                 $message
288
     * @param string                 $uri
289
     * @param ResponseInterface|null $response
290
     *
291
     * @return RequestFailedException
292
     */
293
    private function requestFailedException(
294
        string $message,
295
        string $uri,
296
        ResponseInterface $response = null
297
    ): RequestFailedException {
298
        $this->gone = true;
299
        if ($response) {
300
            if (!$response->getBody()->getSize()) {
301
                return new RequestFailedException(
302
                    sprintf(
303
                        "Error %s at %s returned an invalid response: %s [Error: %s]",
304
                        $message,
305
                        $uri,
306
                        $response->getStatusCode(),
307
                        $response->getBody()->getContents()
308
                    )
309
                );
310
            }
311
312
            return new RequestFailedException(
313
                sprintf(
314
                    "Error %s at %s returned %s: %s",
315
                    $message,
316
                    $uri,
317
                    $response->getStatusCode(),
318
                    $response->getBody()->getContents()
319
                )
320
            );
321
        }
322
323
        return new RequestFailedException('server error.');
324
    }
325
326
    /**
327
     * @param string $nextUri
328
     *
329
     * @return bool
330
     */
331
    private function detectResponse(string $nextUri): bool
332
    {
333
        $start = microtime(true);
334
        $cause = null;
335
        $attempts = 0;
336
        do {
337
            if ($attempts > 0) {
338
                usleep($attempts * 100);
339
            }
340
            $attempts++;
341
            try {
342
                $response = $this->client->get($nextUri, ['headers' => $this->headers]);
343
                if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
344
                    $this->queryResult->set($response->getBody()->getContents());
345
346
                    return true;
347
                }
348
            } catch (ClientException $e) {
349
                $cause = $e;
350
                if ($e->getCode() != StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE) {
351
                    throw $this->requestFailedException("fetching next", $nextUri, $e->getResponse());
352
                }
353
            }
354
        } while (((microtime(true) - $start) < $this->nanoseconds) && !$this->isClosed());
355
356
        $this->gone = true;
357
        throw new \RuntimeException('Error fetching next', 0, $cause);
358
    }
359
}
360