Completed
Push — master ( 18b606...1191a0 )
by yuuki
04:00
created

StatementClient::buildQueryRequest()   B

Complexity

Conditions 6
Paths 8

Size

Total Lines 33
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 33
rs 8.439
c 0
b 0
f 0
cc 6
eloc 23
nc 8
nop 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Ytake\PrestoClient;
5
6
use Fig\Http\Message\StatusCodeInterface;
7
use GuzzleHttp\Psr7\Uri;
8
use GuzzleHttp\Client;
9
use GuzzleHttp\Psr7\Request;
10
use GuzzleHttp\ClientInterface;
11
use GuzzleHttp\Psr7\UriNormalizer;
12
use GuzzleHttp\Exception\ClientException;
13
use Guzzle\Http\Message\RequestInterface;
14
use GuzzleHttp\Exception\RequestException;
15
use Psr\Http\Message\ResponseInterface;
16
use Ytake\PrestoClient\Exception\QueryErrorException;
17
use Ytake\PrestoClient\Exception\RequestFailedException;
18
use Ytake\PrestoClient\Session\Property;
19
20
/**
21
 * Class StatementClient
22
 */
23
class StatementClient
24
{
25
    const STATEMENT_URI = '/v1/statement';
26
27
    /** @var ClientInterface */
28
    private $client;
29
30
    /** @var ClientSession */
31
    private $session;
32
33
    /** @var QueryResult */
34
    protected $queryResult;
35
36
    /** @var string[] */
37
    protected $headers = [];
38
39
    /** @var string */
40
    protected $query;
41
42
    /** @var string */
43
    protected $nextUri;
44
45
    /** @var bool */
46
    private $gone = false;
47
48
    /** @var bool */
49
    private $valid = true;
50
51
    /** @var bool */
52
    private $closed = false;
53
54
    /** @var bool */
55
    private $fulfilled = false;
56
57
    /** @var int */
58
    protected $nanoseconds = 5000000000;
59
60
    /**
61
     * PrestoClient constructor.
62
     *
63
     * @param ClientSession        $session
64
     * @param string               $query
65
     * @param ClientInterface|null $client
66
     */
67
    public function __construct(ClientSession $session, string $query, ClientInterface $client = null)
68
    {
69
        $this->session = $session;
70
        $this->query = $query;
71
        $this->client = (is_null($client)) ? new Client : $client;
72
        $this->queryResult = new QueryResult();
73
        $this->prepareRequest();
74
    }
75
76
    private function prepareRequest()
77
    {
78
        $this->headers = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array(\Ytake\PrestoClien...PrestoHeaders::VERSION) of type array<string|integer,str..."User-Agent":"string"}> is incompatible with the declared type array<integer,string> of property $headers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
79
            PrestoHeaders::PRESTO_USER => $this->session->getUser(),
80
            'User-Agent'               => $this->session->getSource() . '/' . PrestoHeaders::VERSION,
81
        ];
82
    }
83
84
    /**
85
     * @param Request $request
86
     *
87
     * @return Request
88
     */
89
    protected function buildQueryRequest(Request $request): Request
90
    {
91
        $transactionId = is_null($this->session->getTransactionId()) ? 'NONE' : $this->session->getTransactionId()->toString();
92
        $request = $request->withAddedHeader(PrestoHeaders::PRESTO_CATALOG, $this->session->getCatalog())
93
            ->withAddedHeader(PrestoHeaders::PRESTO_SCHEMA, $this->session->getSchema())
94
            ->withAddedHeader(PrestoHeaders::PRESTO_SOURCE, $this->session->getSource())
95
            ->withAddedHeader(PrestoHeaders::PRESTO_TRANSACTION_ID, $transactionId);
96
        $sessionProperty = $this->session->getProperty();
97
        if (count($sessionProperty)) {
98
            $sessions = [];
99
            /** @var Property $property */
100
            foreach ($sessionProperty as $property) {
101
                $sessions[] = $property->getKey() . '=' . $property->getValue();
102
            }
103
            $request = $request->withAddedHeader(
104
                PrestoHeaders::PRESTO_SESSION,
105
                implode(',', $sessions)
106
            );
107
        }
108
        $preparedStatements = $this->session->getPreparedStatement();
109
        if (count($preparedStatements)) {
110
            $statements = [];
111
            foreach ($preparedStatements as $preparedStatement) {
112
                $statements[] = urlencode($preparedStatement->getKey()) . '=' . urlencode($preparedStatement->getValue());
113
            }
114
            $request = $request->withAddedHeader(
115
                PrestoHeaders::PRESTO_PREPARED_STATEMENT,
116
                implode(',', $statements)
117
            );
118
        }
119
120
        return $request;
121
    }
122
123
    /**
124
     * @param int  $timeout
125
     * @param bool $debug
126
     *
127
     * @return void
128
     * @throws QueryErrorException
129
     */
130
    public function execute(int $timeout = 500000, bool $debug = false)
131
    {
132
        $normalize = UriNormalizer::normalize(
133
            new Uri($this->session->getHost() . StatementClient::STATEMENT_URI),
134
            UriNormalizer::REMOVE_DUPLICATE_SLASHES
135
        );
136
        $request = new Request(RequestInterface::POST, $normalize, $this->headers);
137
        try {
138
            $response = $this->client->send($this->buildQueryRequest($request), [
139
                'timeout' => $timeout,
140
                'body'    => $this->query,
141
                'debug'   => $debug,
142
            ]);
143
            if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
144
                $this->queryResult->set($response->getBody()->getContents());
145
            }
146
        } catch (ClientException $e) {
147
            throw new QueryErrorException($e->getMessage(), $e->getCode(), $e);
148
        }
149
    }
150
151
    /**
152
     * @return QueryResult
153
     * @throws QueryErrorException
154
     */
155
    public function current(): QueryResult
156
    {
157
        return $this->queryResult;
158
    }
159
160
    /**
161
     * @return bool
162
     */
163
    public function advance(): bool
164
    {
165
        $nextUri = $this->current()->getNextUri();
166
        if (is_null($nextUri) || $this->isClosed()) {
167
            $this->valid = false;
168
169
            return false;
170
        }
171
        $this->prepareRequest();
172
        $start = microtime(true);
173
        $cause = null;
174
        $attempts = 0;
175
        do {
176
            if ($attempts > 0) {
177
                usleep($attempts * 100);
178
            }
179
            $attempts++;
180
            try {
181
                $response = $this->client->get($nextUri);
182
                if ($response->getStatusCode() === StatusCodeInterface::STATUS_OK) {
183
                    $this->queryResult->set($response->getBody()->getContents());
184
185
                    return true;
186
                }
187
            } catch (ClientException $e) {
188
                $cause = $e;
189
                if ($e->getCode() != StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE) {
190
                    throw $this->requestFailedException("fetching next", $nextUri, $e->getResponse());
0 ignored issues
show
Bug introduced by
It seems like $e->getResponse() can be null; however, requestFailedException() does not accept null, maybe add an additional type check?

Unless you are absolutely sure that the expression can never be null because of other conditions, we strongly recommend to add an additional type check to your code:

/** @return stdClass|null */
function mayReturnNull() { }

function doesNotAcceptNull(stdClass $x) { }

// With potential error.
function withoutCheck() {
    $x = mayReturnNull();
    doesNotAcceptNull($x); // Potential error here.
}

// Safe - Alternative 1
function withCheck1() {
    $x = mayReturnNull();
    if ( ! $x instanceof stdClass) {
        throw new \LogicException('$x must be defined.');
    }
    doesNotAcceptNull($x);
}

// Safe - Alternative 2
function withCheck2() {
    $x = mayReturnNull();
    if ($x instanceof stdClass) {
        doesNotAcceptNull($x);
    }
}
Loading history...
191
                }
192
            }
193
        } while (((microtime(true) - $start) < $this->nanoseconds) && !$this->isClosed());
194
195
        $this->gone = true;
196
        throw new \RuntimeException('Error fetching next', 0, $cause);
197
    }
198
199
    /**
200
     * @param int  $timeout
201
     * @param bool $debug
202
     *
203
     * @return bool
204
     */
205
    public function cancelLeafStage(int $timeout = 500000, bool $debug = false): bool
206
    {
207
        if (!$this->isClosed()) {
208
            $cancelUri = $this->current()->getPartialCancelUri();
209
            if (is_null($cancelUri)) {
210
                return false;
211
            }
212
            $promise = $this->client->deleteAsync($cancelUri, [
213
                'timeout' => $timeout,
214
                'debug'   => $debug,
215
            ]);
216
            $promise->then(function (ResponseInterface $response) {
217
                $this->fulfilled = (StatusCodeInterface::STATUS_NO_CONTENT === $response->getStatusCode());
218
            }, function (RequestException $e) {
219
                throw new RequestFailedException($e->getMessage(), $e->getCode(), $e);
220
            });
221
            $promise->wait();
222
        }
223
224
        return $this->fulfilled;
225
    }
226
227
    /**
228
     * @param int $nanoseconds
229
     */
230
    public function setNanoseconds(int $nanoseconds)
231
    {
232
        $this->nanoseconds = $nanoseconds;
233
    }
234
235
    /**
236
     * @return string
237
     */
238
    public function getQuery(): string
239
    {
240
        return $this->query;
241
    }
242
243
    /**
244
     * @return bool
245
     */
246
    public function isFailed(): bool
247
    {
248
        return $this->queryResult->getError() != null;
249
    }
250
251
    /**
252
     * @return bool
253
     */
254
    public function isValid(): bool
255
    {
256
        return $this->valid && (!$this->isGone()) && (!$this->isClosed());
257
    }
258
259
    /**
260
     * @return bool
261
     */
262
    public function isGone(): bool
263
    {
264
        return $this->gone;
265
    }
266
267
    /**
268
     * @return bool
269
     */
270
    public function isClosed(): bool
271
    {
272
        return $this->closed;
273
    }
274
275
    /**
276
     * close
277
     * HTTP method DELETE
278
     */
279
    public function close()
280
    {
281
        $uri = $this->current()->getNextUri();
282
        if (!is_null($uri)) {
283
            $this->client->deleteAsync($uri)->wait();
284
        }
285
        $this->closed = true;
286
    }
287
288
    /**
289
     * @param string            $message
290
     * @param string            $uri
291
     * @param ResponseInterface $response
292
     *
293
     * @return RequestFailedException
294
     */
295
    private function requestFailedException(
296
        string $message,
297
        string $uri,
298
        ResponseInterface $response
299
    ): RequestFailedException {
300
        $this->gone = true;
301
        if (!$response->getBody()->getSize()) {
302
            return new RequestFailedException(
303
                sprintf("Error %s at %s returned an invalid response: %s [Error: %s]",
304
                    $message,
305
                    $uri,
306
                    $response->getStatusCode(),
307
                    $response->getBody()->getContents()
308
                )
309
            );
310
        }
311
312
        return new RequestFailedException(
313
            sprintf("Error %s at %s returned %s: %s",
314
                $message,
315
                $uri,
316
                $response->getStatusCode(),
317
                $response->getBody()->getContents()
318
            )
319
        );
320
    }
321
}
322