Passed
Push — master ( 8dc2dd...11f141 )
by Teye
05:33 queued 01:20
created

DruidClient::executeQuery()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 5
c 0
b 0
f 0
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid;
5
6
use Psr\Log\LoggerInterface;
7
use InvalidArgumentException;
8
use Level23\Druid\Types\Granularity;
9
use GuzzleHttp\Client as GuzzleClient;
10
use Level23\Druid\Tasks\TaskInterface;
11
use Level23\Druid\Queries\QueryBuilder;
12
use Psr\Http\Message\ResponseInterface;
13
use Level23\Druid\Tasks\KillTaskBuilder;
14
use GuzzleHttp\Exception\ServerException;
15
use Level23\Druid\Queries\QueryInterface;
16
use Level23\Druid\Tasks\IndexTaskBuilder;
17
use Level23\Druid\Responses\TaskResponse;
18
use Level23\Druid\Metadata\MetadataBuilder;
19
use Level23\Druid\Tasks\CompactTaskBuilder;
20
use Level23\Druid\InputSources\DruidInputSource;
21
use Level23\Druid\Exceptions\QueryResponseException;
22
23
class DruidClient
24
{
25
    /**
26
     * @var GuzzleClient
27
     */
28
    protected $client;
29
30
    /**
31
     * @var LoggerInterface|null
32
     */
33
    protected $logger = null;
34
35
    /**
36
     * @var array<string, int|string|null>
37
     */
38
    protected $config = [
39
40
        /**
41
         * Domain + optional port or the druid router. If this is set, it will be used for the broker,
42
         * coordinator and overlord.
43
         */
44
        'router_url'      => '',
45
46
        /**
47
         * Domain + optional port. Don't add the api path like "/druid/v2"
48
         */
49
        'broker_url'      => '',
50
51
        /**
52
         * Domain + optional port. Don't add the api path like "/druid/coordinator/v1"
53
         */
54
        'coordinator_url' => '',
55
56
        /**
57
         * Domain + optional port. Don't add the api path like "/druid/indexer/v1"
58
         */
59
        'overlord_url'    => '',
60
61
        /**
62
         * The maximum duration of a druid query. If the response takes longer, we will close the connection.
63
         */
64
        'timeout'         => 60,
65
66
        /**
67
         * The maximum duration of connecting to the druid instance.
68
         */
69
        'connect_timeout' => 10,
70
71
        /**
72
         * The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to
73
         * execute the query in worst case 3 times.
74
         *
75
         * First time is the normal attempt to execute the query.
76
         * Then we do the FIRST retry.
77
         * Then we do the SECOND retry.
78
         */
79
        'retries'         => 2,
80
81
        /**
82
         * When a query fails to be executed, this is the delay before a query is retried.
83
         * Default is 500 ms, which is 0.5 seconds.
84
         *
85
         * Set to 0 to disable they delay between retries.
86
         */
87
        'retry_delay_ms'  => 500,
88
89
        /**
90
         * Supply the druid version which you are sending your queries to.
91
         * Based on this druid version, we can enable / disable certain new features if your
92
         * server supports this.
93
         */
94
        'version'         => null,
95
    ];
96
97
    /**
98
     * DruidService constructor.
99
     *
100
     * @param array                   $config The configuration for this client.
101
     * @param \GuzzleHttp\Client|null $client
102
     */
103 356
    public function __construct(array $config, GuzzleClient $client = null)
104
    {
105 356
        $this->config = array_merge($this->config, $config);
106
107 356
        $this->client = $client ?: $this->makeGuzzleClient();
108 356
    }
109
110
    /**
111
     * Create a new query using the druid query builder.
112
     *
113
     * @param string $dataSource
114
     * @param string $granularity
115
     *
116
     * @return \Level23\Druid\Queries\QueryBuilder
117
     */
118 3
    public function query(string $dataSource, string $granularity = Granularity::ALL): QueryBuilder
119
    {
120 3
        return new QueryBuilder($this, $dataSource, $granularity);
121
    }
122
123
    /**
124
     * Cancel the execution of a query with the given query identifier.
125
     *
126
     * @param string $identifier
127
     *
128
     * @throws \GuzzleHttp\Exception\GuzzleException
129
     * @throws \Level23\Druid\Exceptions\QueryResponseException
130
     */
131 1
    public function cancelQuery(string $identifier): void
132
    {
133 1
        $this->executeRawRequest(
134 1
            'DELETE',
135 1
            $this->config('broker_url') . '/druid/v2/' . $identifier
136
        );
137 1
    }
138
139
    /**
140
     * Execute a druid query and return the response.
141
     *
142
     * @param \Level23\Druid\Queries\QueryInterface $druidQuery
143
     *
144
     * @return array
145
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
146
     */
147 1
    public function executeQuery(QueryInterface $druidQuery): array
148
    {
149 1
        $query = $druidQuery->toArray();
150
151 1
        $this->log('Executing druid query', ['query' => $query]);
152
153 1
        $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query);
154
155 1
        $this->log('Received druid response', ['response' => $result]);
156
157 1
        return $result;
158
    }
159
160
    /**
161
     * Execute a druid task and return the response.
162
     *
163
     * @param \Level23\Druid\Tasks\TaskInterface $task
164
     *
165
     * @return string The task identifier
166
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
167
     */
168 1
    public function executeTask(TaskInterface $task): string
169
    {
170 1
        $payload = $task->toArray();
171
172 1
        $this->log('Executing druid task', ['task' => $payload]);
173
174 1
        $result = $this->executeRawRequest(
175 1
            'post',
176 1
            $this->config('overlord_url') . '/druid/indexer/v1/task',
177
            $payload
178
        );
179
180 1
        $this->log('Received task response', ['response' => $result]);
181
182 1
        return $result['task'];
183
    }
184
185
    /**
186
     * Execute a raw druid request and return the response.
187
     *
188
     * @param string $method POST or GET
189
     * @param string $url    The url where to send the "query" to.
190
     * @param array  $data   The data to POST or GET.
191
     *
192
     * @return array
193
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
194
     */
195 15
    public function executeRawRequest(string $method, string $url, array $data = []): array
196
    {
197 15
        $retries = 0;
198
199
        begin:
200
        try {
201 15
            if (strtolower($method) == 'post') {
202 6
                $response = $this->client->post($url, [
203 6
                    'json' => $data,
204
                ]);
205 9
            } elseif (strtolower($method) == 'delete') {
206 1
                $response = $this->client->delete($url);
207
            } else {
208 8
                $response = $this->client->get($url, [
209 8
                    'query' => $data,
210
                ]);
211
            }
212
213 4
            if ($response->getStatusCode() == 204 || $response->getStatusCode() == 202) {
214 2
                return [];
215
            }
216
217 2
            return $this->parseResponse($response, $data);
218 11
        } catch (ServerException $exception) {
219
220 9
            $configRetries = intval($this->config('retries', 2));
221 9
            $configDelay   = intval($this->config('retry_delay_ms', 500));
222
            // Should we attempt a retry?
223 9
            if ($retries++ < $configRetries) {
224 4
                $this->log(
225 4
                    'Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries,
226 4
                    [$exception->getMessage(), $exception->getTraceAsString()]
227
                );
228
229 4
                if ($configDelay > 0) {
230 3
                    $this->log('Sleep for ' . $configDelay . ' ms');
231 3
                    $this->usleep(($configDelay * 1000));
232
                }
233 4
                goto begin;
234
            }
235
236
            /** @var ResponseInterface $response */
237 9
            $response = $exception->getResponse();
238
239
            // Bad gateway, this happens for instance when all brokers are unavailable.
240 9
            if ($response->getStatusCode() == 502) {
241 6
                throw new QueryResponseException(
242 6
                    $data,
243 6
                    'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.',
244
                    $exception
245
                );
246
            }
247
248 3
            $error = $this->parseResponse($response, $data);
249
250
            // When its not a formatted error response from druid we rethrow the original exception
251 3
            if (!isset($error['error'], $error['errorMessage'])) {
252 2
                throw $exception;
253
            }
254
255 1
            throw new QueryResponseException(
256 1
                $data,
257 1
                sprintf('%s: %s', $error['error'], $error['errorMessage']),
258
                $exception
259
            );
260
        }
261
    }
262
263
    /**
264
     * @param int $microSeconds
265
     *
266
     * @codeCoverageIgnore
267
     */
268
    protected function usleep(int $microSeconds): void
269
    {
270
        usleep($microSeconds);
271
    }
272
273
    /**
274
     * @param LoggerInterface $logger
275
     *
276
     * @return DruidClient
277
     */
278 4
    public function setLogger(LoggerInterface $logger): DruidClient
279
    {
280 4
        $this->logger = $logger;
281
282 4
        return $this;
283
    }
284
285
    /**
286
     * Set a custom guzzle client which should be used.
287
     *
288
     * @param GuzzleClient $client
289
     *
290
     * @return \Level23\Druid\DruidClient
291
     */
292 7
    public function setGuzzleClient(GuzzleClient $client): DruidClient
293
    {
294 7
        $this->client = $client;
295
296 7
        return $this;
297
    }
298
299
    /**
300
     * Get the value of the config key
301
     *
302
     * @param string $key
303
     * @param mixed  $default
304
     *
305
     * @return mixed|null
306
     */
307 228
    public function config(string $key, $default = null)
308
    {
309
        // when the broker, coordinator or overlord url is empty, then use the router url.
310 228
        $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']);
311
312 228
        if ($routerFallback) {
313 2
            return $this->config[$key] ?: $this->config('router_url', $default);
314
        }
315
316 228
        return $this->config[$key] ?? $default;
317
    }
318
319
    /**
320
     * @return \GuzzleHttp\Client
321
     */
322 223
    protected function makeGuzzleClient(): GuzzleClient
323
    {
324 223
        return new GuzzleClient([
325 223
            'timeout'         => $this->config('timeout', 60),
326 223
            'connect_timeout' => $this->config('connect_timeout', 10),
327
            'headers'         => [
328
                'User-Agent' => 'level23 druid client package',
329
            ],
330
        ]);
331
    }
332
333
    /**
334
     * @param \Psr\Http\Message\ResponseInterface $response
335
     * @param array                               $query
336
     *
337
     * @return array
338
     * @throws \Level23\Druid\Exceptions\QueryResponseException
339
     */
340 4
    protected function parseResponse(ResponseInterface $response, array $query = []): array
341
    {
342 4
        $contents = $response->getBody()->getContents();
343
        try {
344 4
            $row = \json_decode($contents, true) ?: [];
345 4
            if (\JSON_ERROR_NONE !== \json_last_error()) {
346 1
                throw new InvalidArgumentException(
347 4
                    'json_decode error: ' . \json_last_error_msg()
348
                );
349
            }
350 1
        } catch (InvalidArgumentException $exception) {
351 1
            $this->log('We failed to decode druid response. ');
352 1
            $this->log('Status code: ' . $response->getStatusCode());
353 1
            $this->log('Response body: ' . $contents);
354
355 1
            throw new QueryResponseException(
356 1
                $query,
357 1
                'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' .
358 1
                'Response body: ' . $contents,
359
                $exception
360
            );
361
        }
362
363 3
        return (array)$row;
364
    }
365
366
    /**
367
     * Log a message
368
     *
369
     * @param string $message
370
     * @param array  $context
371
     */
372 8
    protected function log(string $message, array $context = []): void
373
    {
374 8
        if ($this->logger) {
375 4
            $this->logger->debug($message, $context);
376
        }
377 8
    }
378
379
    /**
380
     * @return \Level23\Druid\Metadata\MetadataBuilder
381
     */
382 2
    public function metadata(): MetadataBuilder
383
    {
384 2
        return new MetadataBuilder($this);
385
    }
386
387
    /**
388
     * Fetch the status of a druid task.
389
     *
390
     * @param string $taskId
391
     *
392
     * @return \Level23\Druid\Responses\TaskResponse
393
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
394
     */
395 2
    public function taskStatus(string $taskId): TaskResponse
396
    {
397 2
        $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status';
398
399 2
        $response = $this->executeRawRequest('get', $url);
400
401 2
        return new TaskResponse($response);
402
    }
403
404
    /**
405
     * Build a new compact task.
406
     *
407
     * @param string $dataSource
408
     *
409
     * @return \Level23\Druid\Tasks\CompactTaskBuilder
410
     */
411 1
    public function compact(string $dataSource): CompactTaskBuilder
412
    {
413 1
        return new CompactTaskBuilder($this, $dataSource);
414
    }
415
416
    /**
417
     * Create a kill task.
418
     *
419
     * @param string $dataSource
420
     *
421
     * @return \Level23\Druid\Tasks\KillTaskBuilder
422
     */
423 1
    public function kill(string $dataSource): KillTaskBuilder
424
    {
425 1
        return new KillTaskBuilder($this, $dataSource);
426
    }
427
428
    /**
429
     * Create a re-index task for druid.
430
     *
431
     * The $start and $stop dates are checked if they match a valid interval. Otherwise there is a
432
     * risk to of data loss.
433
     *
434
     * We will return an string with the task job identifier, or an exception is thrown in case of an error.
435
     * Example:
436
     * "index_traffic-conversions-2019-03-18T16:26:05.186Z"
437
     *
438
     * @param string $dataSource
439
     *
440
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
441
     * @throws \Level23\Druid\Exceptions\QueryResponseException
442
     */
443 2
    public function reindex(string $dataSource): IndexTaskBuilder
444
    {
445 2
        $structure = $this->metadata()->structure($dataSource);
446
447 2
        $builder = new IndexTaskBuilder($this, $dataSource, DruidInputSource::class);
448 2
        $builder->fromDataSource($dataSource);
449
450 2
        foreach ($structure->dimensions as $dimension => $type) {
451 1
            $builder->dimension($dimension, $type);
452
        }
453
454 2
        foreach ($structure->metrics as $metric => $type) {
455 1
            $builder->sum($metric, $metric, $type);
456
        }
457
458 2
        return $builder;
459
    }
460
}