Completed
Pull Request — master (#4)
by yuuki
03:34
created

StatementClient::detectResponse()   C

Complexity

Conditions 7
Paths 16

Size

Total Lines 28
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 28
rs 6.7272
c 0
b 0
f 0
cc 7
eloc 20
nc 16
nop 1
1
<?php
2
declare(strict_types=1);
3
4
/**
5
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
6
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
7
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
8
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
9
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
10
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
11
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
12
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
13
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
14
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
15
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
16
 */
17
18
namespace Ytake\PrestoClient;
19
20
use GuzzleHttp\Psr7\Uri;
21
use GuzzleHttp\Client;
22
use GuzzleHttp\Psr7\Request;
23
use GuzzleHttp\ClientInterface;
24
use GuzzleHttp\Psr7\UriNormalizer;
25
use GuzzleHttp\Exception\ClientException;
26
use GuzzleHttp\Exception\RequestException;
27
use Fig\Http\Message\StatusCodeInterface;
28
use Fig\Http\Message\RequestMethodInterface;
29
use Psr\Http\Message\ResponseInterface;
30
use Ytake\PrestoClient\Exception\QueryErrorException;
31
use Ytake\PrestoClient\Exception\RequestFailedException;
32
use Ytake\PrestoClient\Session\Property;
33
34
/**
35
 * Class StatementClient
36
 *
37
 * @author Yuuki Takezawa <[email protected]>
38
 */
39
class StatementClient
40
{
41
    const STATEMENT_URI = '/v1/statement';
42
43
    /** @var ClientInterface */
44
    private $client;
45
46
    /** @var ClientSession */
47
    private $session;
48
49
    /** @var QueryResult */
50
    protected $queryResult;
51
52
    /** @var array<string, string> */
53
    protected $headers = [];
54
55
    /** @var string */
56
    protected $query;
57
58
    /** @var string */
59
    protected $nextUri;
60
61
    /** @var bool */
62
    private $gone = false;
63
64
    /** @var bool */
65
    private $valid = true;
66
67
    /** @var bool */
68
    private $closed = false;
69
70
    /** @var bool */
71
    private $fulfilled = false;
72
73
    /** @var int */
74
    protected $nanoseconds = 5000000000;
75
76
    /**
77
     * PrestoClient constructor.
78
     *
79
     * @param ClientSession        $session
80
     * @param string               $query
81
     * @param ClientInterface|null $client
82
     */
83
    public function __construct(ClientSession $session, string $query, ClientInterface $client = null)
84
    {
85
        $this->session = $session;
86
        $this->query = $query;
87
        $this->client = (is_null($client)) ? new Client : $client;
88
        $this->queryResult = new QueryResult();
89
        $this->prepareRequest();
90
    }
91
92
    private function prepareRequest()
93
    {
94
        $this->headers = [
95
            PrestoHeaders::PRESTO_USER => $this->session->getUser(),
96
            'User-Agent'               => $this->session->getSource() . '/' . PrestoHeaders::VERSION,
97
        ];
98
    }
99
100
    /**
101
     * @param Request $request
102
     *
103
     * @return Request
104
     */
105
    protected function buildQueryRequest(Request $request): Request
106
    {
107
        $sessionTransaction = $this->session->getTransactionId();
108
        $transactionId = is_null($sessionTransaction) ? 'NONE' : $sessionTransaction->toString();
109
        $request = $request->withAddedHeader(PrestoHeaders::PRESTO_CATALOG, $this->session->getCatalog())
110
            ->withAddedHeader(PrestoHeaders::PRESTO_SCHEMA, $this->session->getSchema())
111
            ->withAddedHeader(PrestoHeaders::PRESTO_SOURCE, $this->session->getSource())
112
            ->withAddedHeader(PrestoHeaders::PRESTO_TRANSACTION_ID, $transactionId);
113
        $sessionProperty = $this->session->getProperty();
114
        if (count($sessionProperty)) {
115
            $sessions = [];
116
            /** @var Property $property */
117
            foreach ($sessionProperty as $property) {
118
                $sessions[] = $property->getKey() . '=' . $property->getValue();
119
            }
120
            $request = $request->withAddedHeader(
121
                PrestoHeaders::PRESTO_SESSION,
122
                implode(',', $sessions)
123
            );
124
        }
125
        $preparedStatements = $this->session->getPreparedStatement();
126
        if (count($preparedStatements)) {
127
            $statements = [];
128
            foreach ($preparedStatements as $preparedStatement) {
129
                $statements[] = urlencode($preparedStatement->getKey())
130
                    . '=' . urlencode($preparedStatement->getValue());
131
            }
132
            $request = $request->withAddedHeader(
133
                PrestoHeaders::PRESTO_PREPARED_STATEMENT,
134
                implode(',', $statements)
135
            );
136
        }
137
138
        return $request;
139
    }
140
141
    /**
142
     * @param int  $timeout
143
     * @param bool $debug
144
     *
145
     * @return void
146
     * @throws QueryErrorException
147
     */
148
    public function execute(int $timeout = 500000, bool $debug = false)
149
    {
150
        $normalize = UriNormalizer::normalize(
151
            new Uri($this->session->getHost() . StatementClient::STATEMENT_URI),
152
            UriNormalizer::REMOVE_DUPLICATE_SLASHES
153
        );
154
        $request = new Request(RequestMethodInterface::METHOD_POST, $normalize, $this->headers);
155
        try {
156
            $response = $this->client->send($this->buildQueryRequest($request), [
157
                'timeout' => $timeout,
158
                'body'    => $this->query,
159
                'debug'   => $debug,
160
            ]);
161
            if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
162
                $this->queryResult->set($response->getBody()->getContents());
163
            }
164
        } catch (ClientException $e) {
165
            throw new QueryErrorException($e->getMessage(), $e->getCode(), $e);
166
        }
167
    }
168
169
    /**
170
     * @return QueryResult
171
     * @throws QueryErrorException
172
     */
173
    public function current(): QueryResult
174
    {
175
        return $this->queryResult;
176
    }
177
178
    /**
179
     * @return bool
180
     */
181
    public function advance(): bool
182
    {
183
        $nextUri = $this->current()->getNextUri();
184
        if (is_null($nextUri) || $this->isClosed()) {
185
            $this->valid = false;
186
187
            return false;
188
        }
189
        $this->prepareRequest();
190
191
        return $this->detectResponse($nextUri);
192
    }
193
194
    /**
195
     * @param int  $timeout
196
     * @param bool $debug
197
     *
198
     * @return bool
199
     */
200
    public function cancelLeafStage(int $timeout = 500000, bool $debug = false): bool
201
    {
202
        if (!$this->isClosed()) {
203
            $cancelUri = $this->current()->getPartialCancelUri();
204
            if (is_null($cancelUri)) {
205
                return false;
206
            }
207
            $promise = $this->client->deleteAsync($cancelUri, [
208
                'timeout' => $timeout,
209
                'debug'   => $debug,
210
            ]);
211
            $promise->then(function (ResponseInterface $response) {
212
                $this->fulfilled = (StatusCodeInterface::STATUS_NO_CONTENT === $response->getStatusCode());
213
            }, function (RequestException $e) {
214
                throw new RequestFailedException($e->getMessage(), $e->getCode(), $e);
215
            });
216
            $promise->wait();
217
        }
218
219
        return $this->fulfilled;
220
    }
221
222
    /**
223
     * @param int $nanoseconds
224
     */
225
    public function setNanoseconds(int $nanoseconds)
226
    {
227
        $this->nanoseconds = $nanoseconds;
228
    }
229
230
    /**
231
     * @return string
232
     */
233
    public function getQuery(): string
234
    {
235
        return $this->query;
236
    }
237
238
    /**
239
     * @return bool
240
     */
241
    public function isFailed(): bool
242
    {
243
        return $this->queryResult->getError() !== null;
244
    }
245
246
    /**
247
     * @return bool
248
     */
249
    public function isValid(): bool
250
    {
251
        return $this->valid && (!$this->isGone()) && (!$this->isClosed());
252
    }
253
254
    /**
255
     * @return bool
256
     */
257
    public function isGone(): bool
258
    {
259
        return $this->gone;
260
    }
261
262
    /**
263
     * @return bool
264
     */
265
    public function isClosed(): bool
266
    {
267
        return $this->closed;
268
    }
269
270
    /**
271
     * close
272
     * HTTP method DELETE
273
     */
274
    public function close()
275
    {
276
        $uri = $this->current()->getNextUri();
277
        if (!is_null($uri)) {
278
            $this->client->deleteAsync($uri)->wait();
279
        }
280
        $this->closed = true;
281
    }
282
283
    /**
284
     * @param string                 $message
285
     * @param string                 $uri
286
     * @param ResponseInterface|null $response
287
     *
288
     * @return RequestFailedException
289
     */
290
    private function requestFailedException(
291
        string $message,
292
        string $uri,
293
        ResponseInterface $response = null
294
    ): RequestFailedException {
295
        $this->gone = true;
296
        if ($response) {
297
            if (!$response->getBody()->getSize()) {
298
                return new RequestFailedException(
299
                    sprintf(
300
                        "Error %s at %s returned an invalid response: %s [Error: %s]",
301
                        $message,
302
                        $uri,
303
                        $response->getStatusCode(),
304
                        $response->getBody()->getContents()
305
                    )
306
                );
307
            }
308
309
            return new RequestFailedException(
310
                sprintf(
311
                    "Error %s at %s returned %s: %s",
312
                    $message,
313
                    $uri,
314
                    $response->getStatusCode(),
315
                    $response->getBody()->getContents()
316
                )
317
            );
318
        }
319
320
        return new RequestFailedException('server error.');
321
    }
322
323
    /**
324
     * @param string $nextUri
325
     *
326
     * @return bool
327
     */
328
    private function detectResponse(string $nextUri): bool
329
    {
330
        $start = microtime(true);
331
        $cause = null;
332
        $attempts = 0;
333
        do {
334
            if ($attempts > 0) {
335
                usleep($attempts * 100);
336
            }
337
            $attempts++;
338
            try {
339
                $response = $this->client->get($nextUri);
340
                if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
341
                    $this->queryResult->set($response->getBody()->getContents());
342
343
                    return true;
344
                }
345
            } catch (ClientException $e) {
346
                $cause = $e;
347
                if ($e->getCode() != StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE) {
348
                    throw $this->requestFailedException("fetching next", $nextUri, $e->getResponse());
349
                }
350
            }
351
        } while (((microtime(true) - $start) < $this->nanoseconds) && !$this->isClosed());
352
353
        $this->gone = true;
354
        throw new \RuntimeException('Error fetching next', 0, $cause);
355
    }
356
}
357