Issues (52)

src/DruidClient.php (1 issue)

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid;
5
6
use JsonException;
7
use Psr\Log\LoggerInterface;
8
use InvalidArgumentException;
9
use Level23\Druid\Types\Granularity;
10
use GuzzleHttp\Client as GuzzleClient;
11
use Level23\Druid\Tasks\TaskInterface;
12
use Level23\Druid\Queries\QueryBuilder;
13
use Psr\Http\Message\ResponseInterface;
14
use Level23\Druid\Tasks\KillTaskBuilder;
15
use Level23\Druid\Lookups\LookupBuilder;
16
use GuzzleHttp\Exception\ServerException;
17
use Level23\Druid\Queries\QueryInterface;
18
use Level23\Druid\Tasks\IndexTaskBuilder;
19
use Level23\Druid\Responses\TaskResponse;
20
use Level23\Druid\Metadata\MetadataBuilder;
21
use Level23\Druid\Tasks\CompactTaskBuilder;
22
use Level23\Druid\InputSources\DruidInputSource;
23
use Level23\Druid\Exceptions\QueryResponseException;
24
use Level23\Druid\InputSources\InputSourceInterface;
25
use function json_decode;
26
27
class DruidClient
28
{
29
    protected GuzzleClient $client;
30
31
    protected ?LoggerInterface $logger = null;
32
33
    /**
34
     * @var null|array{0:string,1:string}
35
     */
36
    protected ?array $auth = null;
37
38
    /**
39
     * @var array<string,string|int>
40
     */
41
    protected array $config = [
42
43
        // Domain + optional port or the druid router. If this is set, it will be used for the broker,
44
        // coordinator and overlord.
45
        'router_url'            => '',
46
47
        // Domain + optional port. Don't add the api path like "/druid/v2"
48
        'broker_url'            => '',
49
50
        // Domain + optional port. Don't add the api path like "/druid/coordinator/v1"
51
        'coordinator_url'       => '',
52
53
        // Domain + optional port. Don't add the api path like "/druid/indexer/v1"
54
        'overlord_url'          => '',
55
56
        // The maximum duration in seconds of a druid query. If the response takes longer, we will close the connection.
57
        'timeout'               => 60,
58
59
        // The maximum duration in seconds of connecting to the druid instance.
60
        'connect_timeout'       => 10,
61
62
        // The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to
63
        // execute the query in worst case 3 times.
64
        //
65
        // First time is the normal attempt to execute the query.
66
        // Then we do the FIRST retry.
67
        // Then we do the SECOND retry.
68
        'retries'               => 2,
69
70
        // When a query fails to be executed, this is the delay before a query is retried.
71
        // Default is 500 ms, which is 0.5 seconds.
72
        //
73
        // Set to 0 to disable they delay between retries.
74
        'retry_delay_ms'        => 500,
75
76
        // Amount of time in seconds to wait till we try and poll a task status again.
77
        'polling_sleep_seconds' => 2,
78
    ];
79
80
    /**
81
     * DruidService constructor.
82
     *
83
     * @param array<string,string|int> $config The configuration for this client.
84
     * @param \GuzzleHttp\Client|null  $client
85
     */
86 468
    public function __construct(array $config, ?GuzzleClient $client = null)
87
    {
88 468
        $this->config = array_merge($this->config, $config);
89
90 468
        $this->client = $client ?: $this->makeGuzzleClient();
91
    }
92
93
    /**
94
     * Provide HTTP basic auth credentials.
95
     *
96
     * @param string $username
97
     * @param string $password
98
     *
99
     * @return $this
100
     */
101 1
    public function auth(
102
        #[\SensitiveParameter]
103
        string $username,
104
        #[\SensitiveParameter]
105
        string $password
106
    ): self {
107 1
        $this->auth = [$username, $password];
108
109 1
        return $this;
110
    }
111
112
    /**
113
     * Create a new query using the druid query builder.
114
     *
115
     * @param string             $dataSource
116
     * @param string|Granularity $granularity
117
     *
118
     * @return \Level23\Druid\Queries\QueryBuilder
119
     */
120 3
    public function query(string $dataSource = '', string|Granularity $granularity = Granularity::ALL): QueryBuilder
121
    {
122 3
        return new QueryBuilder($this, $dataSource, $granularity);
123
    }
124
125
    /**
126
     * Cancel the execution of a query with the given query identifier.
127
     *
128
     * @param string $identifier
129
     *
130
     * @throws \GuzzleHttp\Exception\GuzzleException
131
     * @throws \Level23\Druid\Exceptions\QueryResponseException
132
     */
133 1
    public function cancelQuery(string $identifier): void
134
    {
135 1
        $this->executeRawRequest(
136 1
            'DELETE',
137 1
            $this->config('broker_url') . '/druid/v2/' . $identifier
138 1
        );
139
    }
140
141
    /**
142
     * Execute a druid query and return the response.
143
     *
144
     * @param \Level23\Druid\Queries\QueryInterface $druidQuery
145
     *
146
     * @return array<string|int,array<mixed>|string|int>
147
     * @throws \Level23\Druid\Exceptions\QueryResponseException
148
     * @throws \GuzzleHttp\Exception\GuzzleException
149
     */
150 1
    public function executeQuery(QueryInterface $druidQuery): array
151
    {
152 1
        $query = $druidQuery->toArray();
153
154 1
        $this->log('Executing druid query: ' . json_encode($query, JSON_UNESCAPED_SLASHES));
155
156 1
        $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query);
157
158 1
        $this->log('Received druid response: ' . var_export($result, true));
159
160 1
        return $result;
161
    }
162
163
    /**
164
     * Execute a druid task and return the response.
165
     *
166
     * @param \Level23\Druid\Tasks\TaskInterface $task
167
     *
168
     * @return string The task identifier
169
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
170
     */
171 1
    public function executeTask(TaskInterface $task): string
172
    {
173
        /** @var array<string,array<mixed>|int|string> $payload */
174 1
        $payload = $task->toArray();
175
176 1
        $this->log('Executing druid task: ' . var_export($payload, true));
177
178
        /** @var string[] $result */
179 1
        $result = $this->executeRawRequest(
180 1
            'post',
181 1
            $this->config('overlord_url') . '/druid/indexer/v1/task',
182 1
            $payload
183 1
        );
184
185 1
        $this->log('Received task response: ' . var_export($result, true));
186
187 1
        return $result['task'];
188
    }
189
190
    /**
191
     * Execute a raw druid request and return the response.
192
     *
193
     * @param string                                $method POST or GET
194
     * @param string                                $url    The url where to send the "query" to.
195
     * @param array<string,string|int|array<mixed>> $data   The data to POST or GET.
196
     *
197
     * @return array<string,array<mixed>|string|int>
198
     * @throws \Level23\Druid\Exceptions\QueryResponseException
199
     * @throws \GuzzleHttp\Exception\GuzzleException
200
     */
201 16
    public function executeRawRequest(string $method, string $url, array $data = []): array
202
    {
203 16
        $retries = 0;
204
205
        begin:
206
        try {
207 16
            if (strtolower($method) == 'post') {
208 6
                $response = $this->client->post($url, array_merge([
209 6
                    'json' => $data,
210 6
                ], $this->auth ? ['auth' => $this->auth] : []));
211 10
            } elseif (strtolower($method) == 'delete') {
212 1
                $response = $this->client->delete($url, $this->auth ? ['auth' => $this->auth] : []);
213
            } else {
214 9
                $response = $this->client->get($url, array_merge([
215 9
                    'query' => $data,
216 9
                ], $this->auth ? ['auth' => $this->auth] : []));
217
            }
218
219 5
            if ($response->getStatusCode() == 204 || $response->getStatusCode() == 202) {
220 2
                return [];
221
            }
222
223 3
            return $this->parseResponse($response, $data);
224 11
        } catch (ServerException $exception) {
225
226 9
            $configRetries = intval($this->config('retries', 2));
227 9
            $configDelay   = intval($this->config('retry_delay_ms', 500));
228
            // Should we attempt a retry?
229 9
            if ($retries++ < $configRetries) {
230 4
                $this->log('Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries);
231 4
                $this->log($exception->getMessage());
232 4
                $this->log($exception->getTraceAsString());
233
234 4
                if ($configDelay > 0) {
235 3
                    $this->log('Sleep for ' . $configDelay . ' ms');
236 3
                    $this->usleep(($configDelay * 1000));
237
                }
238 4
                goto begin;
239
            }
240
241
            /** @var ResponseInterface $response */
242 9
            $response = $exception->getResponse();
243
244
            // Bad gateway, this happens for instance when all brokers are unavailable.
245 9
            if ($response->getStatusCode() == 502) {
246 6
                throw new QueryResponseException(
247 6
                    $data,
248 6
                    'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.',
249 6
                    $exception
250 6
                );
251
            }
252
253
            /** @var array<string,string> $error */
254 3
            $error = $this->parseResponse($response, $data);
255
256
            // When it's not a formatted error response from druid we rethrow the original exception
257 3
            if (!isset($error['error'], $error['errorMessage'])) {
258 2
                throw $exception;
259
            }
260
261 1
            throw new QueryResponseException(
262 1
                $data,
263 1
                sprintf('%s: %s', $error['error'], $error['errorMessage']),
264 1
                $exception
265 1
            );
266
        }
267
    }
268
269
    /**
270
     * @param int $microSeconds
271
     *
272
     * @codeCoverageIgnore
273
     */
274
    protected function usleep(int $microSeconds): void
275
    {
276
        usleep($microSeconds);
277
    }
278
279
    /**
280
     * @param \Psr\Log\LoggerInterface|null $logger
281
     *
282
     * @return DruidClient
283
     */
284 4
    public function setLogger(?LoggerInterface $logger = null): DruidClient
285
    {
286 4
        $this->logger = $logger;
287
288 4
        return $this;
289
    }
290
291
    /**
292
     * Return the logger if one is set.
293
     *
294
     * @return \Psr\Log\LoggerInterface|null
295
     */
296 1
    public function getLogger(): ?LoggerInterface
297
    {
298 1
        return $this->logger;
299
    }
300
301
    /**
302
     * Set a custom guzzle client which should be used.
303
     *
304
     * @param GuzzleClient $client
305
     *
306
     * @return \Level23\Druid\DruidClient
307
     */
308 7
    public function setGuzzleClient(GuzzleClient $client): DruidClient
309
    {
310 7
        $this->client = $client;
311
312 7
        return $this;
313
    }
314
315
    /**
316
     * Get the value of the config key
317
     *
318
     * @param string     $key
319
     * @param mixed|null $default
320
     *
321
     * @return mixed|null
322
     */
323 329
    public function config(string $key, mixed $default = null): mixed
324
    {
325
        // when the broker, coordinator or overlord url is empty, then use the router url.
326 329
        $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']);
327
328 329
        if ($routerFallback) {
329 2
            return $this->config[$key] ?: $this->config('router_url', $default);
330
        }
331
332 329
        return $this->config[$key] ?? $default;
333
    }
334
335
    /**
336
     * @return \GuzzleHttp\Client
337
     */
338 324
    protected function makeGuzzleClient(): GuzzleClient
339
    {
340 324
        return new GuzzleClient([
341 324
            'timeout'         => $this->config('timeout', 60),
342 324
            'connect_timeout' => $this->config('connect_timeout', 10),
343 324
            'headers'         => [
344 324
                'User-Agent' => 'level23 druid client package',
345 324
            ],
346 324
        ]);
347
    }
348
349
    /**
350
     * @param \Psr\Http\Message\ResponseInterface   $response
351
     * @param array<string,string|int|array<mixed>> $query
352
     *
353
     * @return array<string,array<mixed>|string|int>
354
     * @throws \Level23\Druid\Exceptions\QueryResponseException
355
     */
356 5
    protected function parseResponse(ResponseInterface $response, array $query = []): array
357
    {
358 5
        $contents = $response->getBody()->getContents();
359
        try {
360 5
            $row = json_decode($contents, true, 512, JSON_THROW_ON_ERROR) ?: [];
361
362 4
            if (!is_array($row)) {
363 4
                throw new InvalidArgumentException('We failed to parse response!');
364
            }
365 1
        } catch (InvalidArgumentException|JsonException $exception) {
366 1
            $this->log('We failed to decode druid response. ');
367 1
            $this->log('Status code: ' . $response->getStatusCode());
368 1
            $this->log('Response body: ' . $contents);
369
370 1
            throw new QueryResponseException(
371 1
                $query,
372 1
                'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' .
373 1
                'Response body: ' . $contents,
374 1
                $exception
375 1
            );
376
        }
377
378 4
        return $row;
379
    }
380
381
    /**
382
     * Log a message
383
     *
384
     * @param string                                $message
385
     * @param array<string,array<mixed>|string|int> $context
386
     */
387 8
    protected function log(string $message, array $context = []): void
388
    {
389 8
        $this->logger?->debug($message, $context);
390
    }
391
392
    /**
393
     * @return \Level23\Druid\Metadata\MetadataBuilder
394
     */
395 2
    public function metadata(): MetadataBuilder
396
    {
397 2
        return new MetadataBuilder($this);
398
    }
399
400
    /**
401
     * Fetch the status of a druid task.
402
     *
403
     * @param string $taskId
404
     *
405
     * @return \Level23\Druid\Responses\TaskResponse
406
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
407
     */
408 2
    public function taskStatus(string $taskId): TaskResponse
409
    {
410 2
        $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status';
411
412 2
        $response = $this->executeRawRequest('get', $url);
413
414 2
        return new TaskResponse($response);
415
    }
416
417
    /**
418
     * Waits till a druid task completes and returns the status of it.
419
     *
420
     * @param string $taskId
421
     *
422
     * @return \Level23\Druid\Responses\TaskResponse
423
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
424
     */
425 1
    public function pollTaskStatus(string $taskId): TaskResponse
426
    {
427 1
        while (true) {
428 1
            $status = $this->taskStatus($taskId);
429
430 1
            if ($status->getStatus() != 'RUNNING') {
431 1
                break;
432
            }
433 1
            sleep(intval($this->config('polling_sleep_seconds')));
434
        }
435
436 1
        return $status;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $status does not seem to be defined for all execution paths leading up to this point.
Loading history...
437
    }
438
439
    /**
440
     * Build a new compact task.
441
     *
442
     * @param string $dataSource
443
     *
444
     * @return \Level23\Druid\Tasks\CompactTaskBuilder
445
     */
446 1
    public function compact(string $dataSource): CompactTaskBuilder
447
    {
448 1
        return new CompactTaskBuilder($this, $dataSource);
449
    }
450
451
    /**
452
     * Create a kill task.
453
     *
454
     * @param string $dataSource
455
     *
456
     * @return \Level23\Druid\Tasks\KillTaskBuilder
457
     */
458 1
    public function kill(string $dataSource): KillTaskBuilder
459
    {
460 1
        return new KillTaskBuilder($this, $dataSource);
461
    }
462
463
    /**
464
     * Create an index task
465
     *
466
     * @param string                                           $dataSource
467
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
468
     *
469
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
470
     */
471 1
    public function index(string $dataSource, InputSourceInterface $inputSource): IndexTaskBuilder
472
    {
473 1
        return new IndexTaskBuilder($this, $dataSource, $inputSource);
474
    }
475
476
    /**
477
     * Return a LookupBuilder instance. With this class you can do your lookup management, such as store, list and
478
     * delete lookups.
479
     *
480
     * @return \Level23\Druid\Lookups\LookupBuilder
481
     */
482 18
    public function lookup(): LookupBuilder
483
    {
484 18
        return new LookupBuilder($this);
485
    }
486
487
    /**
488
     * Create a re-index task for druid.
489
     *
490
     * The $start and $stop dates are checked if they match a valid interval. Otherwise, there is a
491
     * risk to of data loss.
492
     *
493
     * We will return a string with the task job identifier, or an exception is thrown in case of an error.
494
     * Example:
495
     * "index_traffic-conversions-2019-03-18T16:26:05.186Z"
496
     *
497
     * @param string $dataSource
498
     *
499
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
500
     * @throws \Level23\Druid\Exceptions\QueryResponseException
501
     * @throws \GuzzleHttp\Exception\GuzzleException
502
     */
503 2
    public function reindex(string $dataSource): IndexTaskBuilder
504
    {
505 2
        $structure = $this->metadata()->structure($dataSource);
506
507 2
        $builder = new IndexTaskBuilder(
508 2
            $this,
509 2
            $dataSource,
510 2
            new DruidInputSource($dataSource)
511 2
        );
512
513 2
        $builder->timestamp('__time', 'auto');
514 2
        foreach ($structure->dimensions as $dimension => $type) {
515 1
            $builder->dimension($dimension, $type);
516
        }
517
518 2
        foreach ($structure->metrics as $metric => $type) {
519 1
            $builder->sum($metric, $metric, $type);
520
        }
521
522 2
        return $builder;
523
    }
524
}