Passed
Push — master ( 306b71...ebe21d )
by Teye
04:57
created

DruidClient::getLogger()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
c 0
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid;
5
6
use JsonException;
7
use Psr\Log\LoggerInterface;
8
use InvalidArgumentException;
9
use Level23\Druid\Types\Granularity;
10
use GuzzleHttp\Client as GuzzleClient;
11
use Level23\Druid\Tasks\TaskInterface;
12
use Level23\Druid\Queries\QueryBuilder;
13
use Psr\Http\Message\ResponseInterface;
14
use Level23\Druid\Tasks\KillTaskBuilder;
15
use GuzzleHttp\Exception\ServerException;
16
use Level23\Druid\Queries\QueryInterface;
17
use Level23\Druid\Tasks\IndexTaskBuilder;
18
use Level23\Druid\Responses\TaskResponse;
19
use Level23\Druid\Metadata\MetadataBuilder;
20
use Level23\Druid\Tasks\CompactTaskBuilder;
21
use Level23\Druid\InputSources\DruidInputSource;
22
use Level23\Druid\Exceptions\QueryResponseException;
23
use Level23\Druid\InputSources\InputSourceInterface;
24
use function json_decode;
25
26
class DruidClient
27
{
28
    protected GuzzleClient $client;
29
30
    protected ?LoggerInterface $logger = null;
31
32
    /**
33
     * @var array<string,string|int>
34
     */
35
    protected array $config = [
36
37
        // Domain + optional port or the druid router. If this is set, it will be used for the broker,
38
        // coordinator and overlord.
39
        'router_url'            => '',
40
41
        // Domain + optional port. Don't add the api path like "/druid/v2"
42
        'broker_url'            => '',
43
44
        // Domain + optional port. Don't add the api path like "/druid/coordinator/v1"
45
        'coordinator_url'       => '',
46
47
        // Domain + optional port. Don't add the api path like "/druid/indexer/v1"
48
        'overlord_url'          => '',
49
50
        // The maximum duration in seconds of a druid query. If the response takes longer, we will close the connection.
51
        'timeout'               => 60,
52
53
        // The maximum duration in seconds of connecting to the druid instance.
54
        'connect_timeout'       => 10,
55
56
        // The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to
57
        // execute the query in worst case 3 times.
58
        //
59
        // First time is the normal attempt to execute the query.
60
        // Then we do the FIRST retry.
61
        // Then we do the SECOND retry.
62
        'retries'               => 2,
63
64
        // When a query fails to be executed, this is the delay before a query is retried.
65
        // Default is 500 ms, which is 0.5 seconds.
66
        //
67
        // Set to 0 to disable they delay between retries.
68
        'retry_delay_ms'        => 500,
69
70
        // Amount of time in seconds to wait till we try and poll a task status again.
71
        'polling_sleep_seconds' => 2,
72
    ];
73
74
    /**
75
     * DruidService constructor.
76
     *
77
     * @param array<string,string|int> $config The configuration for this client.
78
     * @param \GuzzleHttp\Client|null  $client
79
     */
80 453
    public function __construct(array $config, ?GuzzleClient $client = null)
81
    {
82 453
        $this->config = array_merge($this->config, $config);
83
84 453
        $this->client = $client ?: $this->makeGuzzleClient();
85
    }
86
87
    /**
88
     * Create a new query using the druid query builder.
89
     *
90
     * @param string             $dataSource
91
     * @param string|Granularity $granularity
92
     *
93
     * @return \Level23\Druid\Queries\QueryBuilder
94
     */
95 3
    public function query(string $dataSource = '', string|Granularity $granularity = Granularity::ALL): QueryBuilder
96
    {
97 3
        return new QueryBuilder($this, $dataSource, $granularity);
98
    }
99
100
    /**
101
     * Cancel the execution of a query with the given query identifier.
102
     *
103
     * @param string $identifier
104
     *
105
     * @throws \GuzzleHttp\Exception\GuzzleException
106
     * @throws \Level23\Druid\Exceptions\QueryResponseException
107
     */
108 1
    public function cancelQuery(string $identifier): void
109
    {
110 1
        $this->executeRawRequest(
111 1
            'DELETE',
112 1
            $this->config('broker_url') . '/druid/v2/' . $identifier
113 1
        );
114
    }
115
116
    /**
117
     * Execute a druid query and return the response.
118
     *
119
     * @param \Level23\Druid\Queries\QueryInterface $druidQuery
120
     *
121
     * @return array<string|int,array<mixed>|string|int>
122
     * @throws \Level23\Druid\Exceptions\QueryResponseException
123
     * @throws \GuzzleHttp\Exception\GuzzleException
124
     */
125 1
    public function executeQuery(QueryInterface $druidQuery): array
126
    {
127 1
        $query = $druidQuery->toArray();
128
129 1
        $this->log('Executing druid query: ' . var_export($query, true));
130
131 1
        $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query);
132
133 1
        $this->log('Received druid response: ' . var_export($result, true));
134
135 1
        return $result;
136
    }
137
138
    /**
139
     * Execute a druid task and return the response.
140
     *
141
     * @param \Level23\Druid\Tasks\TaskInterface $task
142
     *
143
     * @return string The task identifier
144
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
145
     */
146 1
    public function executeTask(TaskInterface $task): string
147
    {
148
        /** @var array<string,array<mixed>|int|string> $payload */
149 1
        $payload = $task->toArray();
150
151 1
        $this->log('Executing druid task: ' . var_export($payload, true));
152
153
        /** @var string[] $result */
154 1
        $result = $this->executeRawRequest(
155 1
            'post',
156 1
            $this->config('overlord_url') . '/druid/indexer/v1/task',
157 1
            $payload
158 1
        );
159
160 1
        $this->log('Received task response: ' . var_export($result, true));
161
162 1
        return $result['task'];
163
    }
164
165
    /**
166
     * Execute a raw druid request and return the response.
167
     *
168
     * @param string                                $method POST or GET
169
     * @param string                                $url    The url where to send the "query" to.
170
     * @param array<string,string|int|array<mixed>> $data   The data to POST or GET.
171
     *
172
     * @return array<string,array<mixed>|string|int>
173
     * @throws \Level23\Druid\Exceptions\QueryResponseException
174
     * @throws \GuzzleHttp\Exception\GuzzleException
175
     */
176 15
    public function executeRawRequest(string $method, string $url, array $data = []): array
177
    {
178 15
        $retries = 0;
179
180
        begin:
181
        try {
182 15
            if (strtolower($method) == 'post') {
183 6
                $response = $this->client->post($url, [
184 6
                    'json' => $data,
185 6
                ]);
186 9
            } elseif (strtolower($method) == 'delete') {
187 1
                $response = $this->client->delete($url);
188
            } else {
189 8
                $response = $this->client->get($url, [
190 8
                    'query' => $data,
191 8
                ]);
192
            }
193
194 4
            if ($response->getStatusCode() == 204 || $response->getStatusCode() == 202) {
195 2
                return [];
196
            }
197
198 2
            return $this->parseResponse($response, $data);
199 11
        } catch (ServerException $exception) {
200
201 9
            $configRetries = intval($this->config('retries', 2));
202 9
            $configDelay   = intval($this->config('retry_delay_ms', 500));
203
            // Should we attempt a retry?
204 9
            if ($retries++ < $configRetries) {
205 4
                $this->log('Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries);
206 4
                $this->log($exception->getMessage());
207 4
                $this->log($exception->getTraceAsString());
208
209 4
                if ($configDelay > 0) {
210 3
                    $this->log('Sleep for ' . $configDelay . ' ms');
211 3
                    $this->usleep(($configDelay * 1000));
212
                }
213 4
                goto begin;
214
            }
215
216
            /** @var ResponseInterface $response */
217 9
            $response = $exception->getResponse();
218
219
            // Bad gateway, this happens for instance when all brokers are unavailable.
220 9
            if ($response->getStatusCode() == 502) {
221 6
                throw new QueryResponseException(
222 6
                    $data,
223 6
                    'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.',
224 6
                    $exception
225 6
                );
226
            }
227
228
            /** @var array<string,string> $error */
229 3
            $error = $this->parseResponse($response, $data);
230
231
            // When it's not a formatted error response from druid we rethrow the original exception
232 3
            if (!isset($error['error'], $error['errorMessage'])) {
233 2
                throw $exception;
234
            }
235
236 1
            throw new QueryResponseException(
237 1
                $data,
238 1
                sprintf('%s: %s', $error['error'], $error['errorMessage']),
239 1
                $exception
240 1
            );
241
        }
242
    }
243
244
    /**
245
     * @param int $microSeconds
246
     *
247
     * @codeCoverageIgnore
248
     */
249
    protected function usleep(int $microSeconds): void
250
    {
251
        usleep($microSeconds);
252
    }
253
254
    /**
255
     * @param \Psr\Log\LoggerInterface|null $logger
256
     *
257
     * @return DruidClient
258
     */
259 4
    public function setLogger(?LoggerInterface $logger = null): DruidClient
260
    {
261 4
        $this->logger = $logger;
262
263 4
        return $this;
264
    }
265
266
    /**
267
     * Return the logger if one is set.
268
     *
269
     * @return \Psr\Log\LoggerInterface|null
270
     */
271 1
    public function getLogger(): ?LoggerInterface
272
    {
273 1
        return $this->logger;
274
    }
275
276
    /**
277
     * Set a custom guzzle client which should be used.
278
     *
279
     * @param GuzzleClient $client
280
     *
281
     * @return \Level23\Druid\DruidClient
282
     */
283 7
    public function setGuzzleClient(GuzzleClient $client): DruidClient
284
    {
285 7
        $this->client = $client;
286
287 7
        return $this;
288
    }
289
290
    /**
291
     * Get the value of the config key
292
     *
293
     * @param string     $key
294
     * @param mixed|null $default
295
     *
296
     * @return mixed|null
297
     */
298 311
    public function config(string $key, mixed $default = null): mixed
299
    {
300
        // when the broker, coordinator or overlord url is empty, then use the router url.
301 311
        $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']);
302
303 311
        if ($routerFallback) {
304 2
            return $this->config[$key] ?: $this->config('router_url', $default);
305
        }
306
307 311
        return $this->config[$key] ?? $default;
308
    }
309
310
    /**
311
     * @return \GuzzleHttp\Client
312
     */
313 306
    protected function makeGuzzleClient(): GuzzleClient
314
    {
315 306
        return new GuzzleClient([
316 306
            'timeout'         => $this->config('timeout', 60),
317 306
            'connect_timeout' => $this->config('connect_timeout', 10),
318 306
            'headers'         => [
319 306
                'User-Agent' => 'level23 druid client package',
320 306
            ],
321 306
        ]);
322
    }
323
324
    /**
325
     * @param \Psr\Http\Message\ResponseInterface   $response
326
     * @param array<string,string|int|array<mixed>> $query
327
     *
328
     * @return array<string,array<mixed>|string|int>
329
     * @throws \Level23\Druid\Exceptions\QueryResponseException
330
     */
331 4
    protected function parseResponse(ResponseInterface $response, array $query = []): array
332
    {
333 4
        $contents = $response->getBody()->getContents();
334
        try {
335 4
            $row = json_decode($contents, true, 512, JSON_THROW_ON_ERROR) ?: [];
336
337 3
            if (!is_array($row)) {
338 3
                throw new InvalidArgumentException('We failed to parse response!');
339
            }
340 1
        } catch (InvalidArgumentException|JsonException $exception) {
341 1
            $this->log('We failed to decode druid response. ');
342 1
            $this->log('Status code: ' . $response->getStatusCode());
343 1
            $this->log('Response body: ' . $contents);
344
345 1
            throw new QueryResponseException(
346 1
                $query,
347 1
                'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' .
348 1
                'Response body: ' . $contents,
349 1
                $exception
350 1
            );
351
        }
352
353 3
        return $row;
354
    }
355
356
    /**
357
     * Log a message
358
     *
359
     * @param string                                $message
360
     * @param array<string,array<mixed>|string|int> $context
361
     */
362 8
    protected function log(string $message, array $context = []): void
363
    {
364 8
        $this->logger?->debug($message, $context);
365
    }
366
367
    /**
368
     * @return \Level23\Druid\Metadata\MetadataBuilder
369
     */
370 2
    public function metadata(): MetadataBuilder
371
    {
372 2
        return new MetadataBuilder($this);
373
    }
374
375
    /**
376
     * Fetch the status of a druid task.
377
     *
378
     * @param string $taskId
379
     *
380
     * @return \Level23\Druid\Responses\TaskResponse
381
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
382
     */
383 2
    public function taskStatus(string $taskId): TaskResponse
384
    {
385 2
        $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status';
386
387 2
        $response = $this->executeRawRequest('get', $url);
388
389 2
        return new TaskResponse($response);
390
    }
391
392
    /**
393
     * Waits till a druid task completes and returns the status of it.
394
     *
395
     * @param string $taskId
396
     *
397
     * @return \Level23\Druid\Responses\TaskResponse
398
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
399
     */
400 1
    public function pollTaskStatus(string $taskId): TaskResponse
401
    {
402 1
        while (true) {
403 1
            $status = $this->taskStatus($taskId);
404
405 1
            if ($status->getStatus() != 'RUNNING') {
406 1
                break;
407
            }
408 1
            sleep(intval($this->config('polling_sleep_seconds')));
409
        }
410
411 1
        return $status;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $status does not seem to be defined for all execution paths leading up to this point.
Loading history...
412
    }
413
414
    /**
415
     * Build a new compact task.
416
     *
417
     * @param string $dataSource
418
     *
419
     * @return \Level23\Druid\Tasks\CompactTaskBuilder
420
     */
421 1
    public function compact(string $dataSource): CompactTaskBuilder
422
    {
423 1
        return new CompactTaskBuilder($this, $dataSource);
424
    }
425
426
    /**
427
     * Create a kill task.
428
     *
429
     * @param string $dataSource
430
     *
431
     * @return \Level23\Druid\Tasks\KillTaskBuilder
432
     */
433 1
    public function kill(string $dataSource): KillTaskBuilder
434
    {
435 1
        return new KillTaskBuilder($this, $dataSource);
436
    }
437
438
    /**
439
     * Create an index task
440
     *
441
     * @param string                                           $dataSource
442
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
443
     *
444
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
445
     */
446 1
    public function index(string $dataSource, InputSourceInterface $inputSource): IndexTaskBuilder
447
    {
448 1
        return new IndexTaskBuilder($this, $dataSource, $inputSource);
449
    }
450
451
    /**
452
     * Create a re-index task for druid.
453
     *
454
     * The $start and $stop dates are checked if they match a valid interval. Otherwise, there is a
455
     * risk to of data loss.
456
     *
457
     * We will return a string with the task job identifier, or an exception is thrown in case of an error.
458
     * Example:
459
     * "index_traffic-conversions-2019-03-18T16:26:05.186Z"
460
     *
461
     * @param string $dataSource
462
     *
463
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
464
     * @throws \Level23\Druid\Exceptions\QueryResponseException
465
     * @throws \GuzzleHttp\Exception\GuzzleException
466
     */
467 2
    public function reindex(string $dataSource): IndexTaskBuilder
468
    {
469 2
        $structure = $this->metadata()->structure($dataSource);
470
471 2
        $builder = new IndexTaskBuilder(
472 2
            $this,
473 2
            $dataSource,
474 2
            new DruidInputSource($dataSource)
475 2
        );
476
477 2
        $builder->timestamp('__time', 'auto');
478 2
        foreach ($structure->dimensions as $dimension => $type) {
479 1
            $builder->dimension($dimension, $type);
480
        }
481
482 2
        foreach ($structure->metrics as $metric => $type) {
483 1
            $builder->sum($metric, $metric, $type);
484
        }
485
486 2
        return $builder;
487
    }
488
}