1 | <?php |
||
2 | declare(strict_types=1); |
||
3 | |||
4 | namespace Level23\Druid; |
||
5 | |||
6 | use JsonException; |
||
7 | use Psr\Log\LoggerInterface; |
||
8 | use InvalidArgumentException; |
||
9 | use Level23\Druid\Types\Granularity; |
||
10 | use GuzzleHttp\Client as GuzzleClient; |
||
11 | use Level23\Druid\Tasks\TaskInterface; |
||
12 | use Level23\Druid\Queries\QueryBuilder; |
||
13 | use Psr\Http\Message\ResponseInterface; |
||
14 | use Level23\Druid\Tasks\KillTaskBuilder; |
||
15 | use Level23\Druid\Lookups\LookupBuilder; |
||
16 | use GuzzleHttp\Exception\ServerException; |
||
17 | use Level23\Druid\Queries\QueryInterface; |
||
18 | use Level23\Druid\Tasks\IndexTaskBuilder; |
||
19 | use Level23\Druid\Responses\TaskResponse; |
||
20 | use Level23\Druid\Metadata\MetadataBuilder; |
||
21 | use Level23\Druid\Tasks\CompactTaskBuilder; |
||
22 | use Level23\Druid\InputSources\DruidInputSource; |
||
23 | use Level23\Druid\Exceptions\QueryResponseException; |
||
24 | use Level23\Druid\InputSources\InputSourceInterface; |
||
25 | use function json_decode; |
||
26 | |||
27 | class DruidClient |
||
28 | { |
||
29 | protected GuzzleClient $client; |
||
30 | |||
31 | protected ?LoggerInterface $logger = null; |
||
32 | |||
33 | /** |
||
34 | * @var null|array{0:string,1:string} |
||
35 | */ |
||
36 | protected ?array $auth = null; |
||
37 | |||
38 | /** |
||
39 | * @var array<string,string|int> |
||
40 | */ |
||
41 | protected array $config = [ |
||
42 | |||
43 | // Domain + optional port or the druid router. If this is set, it will be used for the broker, |
||
44 | // coordinator and overlord. |
||
45 | 'router_url' => '', |
||
46 | |||
47 | // Domain + optional port. Don't add the api path like "/druid/v2" |
||
48 | 'broker_url' => '', |
||
49 | |||
50 | // Domain + optional port. Don't add the api path like "/druid/coordinator/v1" |
||
51 | 'coordinator_url' => '', |
||
52 | |||
53 | // Domain + optional port. Don't add the api path like "/druid/indexer/v1" |
||
54 | 'overlord_url' => '', |
||
55 | |||
56 | // The maximum duration in seconds of a druid query. If the response takes longer, we will close the connection. |
||
57 | 'timeout' => 60, |
||
58 | |||
59 | // The maximum duration in seconds of connecting to the druid instance. |
||
60 | 'connect_timeout' => 10, |
||
61 | |||
62 | // The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to |
||
63 | // execute the query in worst case 3 times. |
||
64 | // |
||
65 | // First time is the normal attempt to execute the query. |
||
66 | // Then we do the FIRST retry. |
||
67 | // Then we do the SECOND retry. |
||
68 | 'retries' => 2, |
||
69 | |||
70 | // When a query fails to be executed, this is the delay before a query is retried. |
||
71 | // Default is 500 ms, which is 0.5 seconds. |
||
72 | // |
||
73 | // Set to 0 to disable they delay between retries. |
||
74 | 'retry_delay_ms' => 500, |
||
75 | |||
76 | // Amount of time in seconds to wait till we try and poll a task status again. |
||
77 | 'polling_sleep_seconds' => 2, |
||
78 | ]; |
||
79 | |||
80 | /** |
||
81 | * DruidService constructor. |
||
82 | * |
||
83 | * @param array<string,string|int> $config The configuration for this client. |
||
84 | * @param \GuzzleHttp\Client|null $client |
||
85 | */ |
||
86 | 468 | public function __construct(array $config, ?GuzzleClient $client = null) |
|
87 | { |
||
88 | 468 | $this->config = array_merge($this->config, $config); |
|
89 | |||
90 | 468 | $this->client = $client ?: $this->makeGuzzleClient(); |
|
91 | } |
||
92 | |||
93 | /** |
||
94 | * Provide HTTP basic auth credentials. |
||
95 | * |
||
96 | * @param string $username |
||
97 | * @param string $password |
||
98 | * |
||
99 | * @return $this |
||
100 | */ |
||
101 | 1 | public function auth( |
|
102 | #[\SensitiveParameter] |
||
103 | string $username, |
||
104 | #[\SensitiveParameter] |
||
105 | string $password |
||
106 | ): self { |
||
107 | 1 | $this->auth = [$username, $password]; |
|
108 | |||
109 | 1 | return $this; |
|
110 | } |
||
111 | |||
112 | /** |
||
113 | * Create a new query using the druid query builder. |
||
114 | * |
||
115 | * @param string $dataSource |
||
116 | * @param string|Granularity $granularity |
||
117 | * |
||
118 | * @return \Level23\Druid\Queries\QueryBuilder |
||
119 | */ |
||
120 | 3 | public function query(string $dataSource = '', string|Granularity $granularity = Granularity::ALL): QueryBuilder |
|
121 | { |
||
122 | 3 | return new QueryBuilder($this, $dataSource, $granularity); |
|
123 | } |
||
124 | |||
125 | /** |
||
126 | * Cancel the execution of a query with the given query identifier. |
||
127 | * |
||
128 | * @param string $identifier |
||
129 | * |
||
130 | * @throws \GuzzleHttp\Exception\GuzzleException |
||
131 | * @throws \Level23\Druid\Exceptions\QueryResponseException |
||
132 | */ |
||
133 | 1 | public function cancelQuery(string $identifier): void |
|
134 | { |
||
135 | 1 | $this->executeRawRequest( |
|
136 | 1 | 'DELETE', |
|
137 | 1 | $this->config('broker_url') . '/druid/v2/' . $identifier |
|
138 | 1 | ); |
|
139 | } |
||
140 | |||
141 | /** |
||
142 | * Execute a druid query and return the response. |
||
143 | * |
||
144 | * @param \Level23\Druid\Queries\QueryInterface $druidQuery |
||
145 | * |
||
146 | * @return array<string|int,array<mixed>|string|int> |
||
147 | * @throws \Level23\Druid\Exceptions\QueryResponseException |
||
148 | * @throws \GuzzleHttp\Exception\GuzzleException |
||
149 | */ |
||
150 | 1 | public function executeQuery(QueryInterface $druidQuery): array |
|
151 | { |
||
152 | 1 | $query = $druidQuery->toArray(); |
|
153 | |||
154 | 1 | $this->log('Executing druid query: ' . json_encode($query, JSON_UNESCAPED_SLASHES)); |
|
155 | |||
156 | 1 | $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query); |
|
157 | |||
158 | 1 | $this->log('Received druid response: ' . var_export($result, true)); |
|
159 | |||
160 | 1 | return $result; |
|
161 | } |
||
162 | |||
163 | /** |
||
164 | * Execute a druid task and return the response. |
||
165 | * |
||
166 | * @param \Level23\Druid\Tasks\TaskInterface $task |
||
167 | * |
||
168 | * @return string The task identifier |
||
169 | * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException |
||
170 | */ |
||
171 | 1 | public function executeTask(TaskInterface $task): string |
|
172 | { |
||
173 | /** @var array<string,array<mixed>|int|string> $payload */ |
||
174 | 1 | $payload = $task->toArray(); |
|
175 | |||
176 | 1 | $this->log('Executing druid task: ' . var_export($payload, true)); |
|
177 | |||
178 | /** @var string[] $result */ |
||
179 | 1 | $result = $this->executeRawRequest( |
|
180 | 1 | 'post', |
|
181 | 1 | $this->config('overlord_url') . '/druid/indexer/v1/task', |
|
182 | 1 | $payload |
|
183 | 1 | ); |
|
184 | |||
185 | 1 | $this->log('Received task response: ' . var_export($result, true)); |
|
186 | |||
187 | 1 | return $result['task']; |
|
188 | } |
||
189 | |||
190 | /** |
||
191 | * Execute a raw druid request and return the response. |
||
192 | * |
||
193 | * @param string $method POST or GET |
||
194 | * @param string $url The url where to send the "query" to. |
||
195 | * @param array<string,string|int|array<mixed>> $data The data to POST or GET. |
||
196 | * |
||
197 | * @return array<string,array<mixed>|string|int> |
||
198 | * @throws \Level23\Druid\Exceptions\QueryResponseException |
||
199 | * @throws \GuzzleHttp\Exception\GuzzleException |
||
200 | */ |
||
201 | 16 | public function executeRawRequest(string $method, string $url, array $data = []): array |
|
202 | { |
||
203 | 16 | $retries = 0; |
|
204 | |||
205 | begin: |
||
206 | try { |
||
207 | 16 | if (strtolower($method) == 'post') { |
|
208 | 6 | $response = $this->client->post($url, array_merge([ |
|
209 | 6 | 'json' => $data, |
|
210 | 6 | ], $this->auth ? ['auth' => $this->auth] : [])); |
|
211 | 10 | } elseif (strtolower($method) == 'delete') { |
|
212 | 1 | $response = $this->client->delete($url, $this->auth ? ['auth' => $this->auth] : []); |
|
213 | } else { |
||
214 | 9 | $response = $this->client->get($url, array_merge([ |
|
215 | 9 | 'query' => $data, |
|
216 | 9 | ], $this->auth ? ['auth' => $this->auth] : [])); |
|
217 | } |
||
218 | |||
219 | 5 | if ($response->getStatusCode() == 204 || $response->getStatusCode() == 202) { |
|
220 | 2 | return []; |
|
221 | } |
||
222 | |||
223 | 3 | return $this->parseResponse($response, $data); |
|
224 | 11 | } catch (ServerException $exception) { |
|
225 | |||
226 | 9 | $configRetries = intval($this->config('retries', 2)); |
|
227 | 9 | $configDelay = intval($this->config('retry_delay_ms', 500)); |
|
228 | // Should we attempt a retry? |
||
229 | 9 | if ($retries++ < $configRetries) { |
|
230 | 4 | $this->log('Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries); |
|
231 | 4 | $this->log($exception->getMessage()); |
|
232 | 4 | $this->log($exception->getTraceAsString()); |
|
233 | |||
234 | 4 | if ($configDelay > 0) { |
|
235 | 3 | $this->log('Sleep for ' . $configDelay . ' ms'); |
|
236 | 3 | $this->usleep(($configDelay * 1000)); |
|
237 | } |
||
238 | 4 | goto begin; |
|
239 | } |
||
240 | |||
241 | /** @var ResponseInterface $response */ |
||
242 | 9 | $response = $exception->getResponse(); |
|
243 | |||
244 | // Bad gateway, this happens for instance when all brokers are unavailable. |
||
245 | 9 | if ($response->getStatusCode() == 502) { |
|
246 | 6 | throw new QueryResponseException( |
|
247 | 6 | $data, |
|
248 | 6 | 'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.', |
|
249 | 6 | $exception |
|
250 | 6 | ); |
|
251 | } |
||
252 | |||
253 | /** @var array<string,string> $error */ |
||
254 | 3 | $error = $this->parseResponse($response, $data); |
|
255 | |||
256 | // When it's not a formatted error response from druid we rethrow the original exception |
||
257 | 3 | if (!isset($error['error'], $error['errorMessage'])) { |
|
258 | 2 | throw $exception; |
|
259 | } |
||
260 | |||
261 | 1 | throw new QueryResponseException( |
|
262 | 1 | $data, |
|
263 | 1 | sprintf('%s: %s', $error['error'], $error['errorMessage']), |
|
264 | 1 | $exception |
|
265 | 1 | ); |
|
266 | } |
||
267 | } |
||
268 | |||
269 | /** |
||
270 | * @param int $microSeconds |
||
271 | * |
||
272 | * @codeCoverageIgnore |
||
273 | */ |
||
274 | protected function usleep(int $microSeconds): void |
||
275 | { |
||
276 | usleep($microSeconds); |
||
277 | } |
||
278 | |||
279 | /** |
||
280 | * @param \Psr\Log\LoggerInterface|null $logger |
||
281 | * |
||
282 | * @return DruidClient |
||
283 | */ |
||
284 | 4 | public function setLogger(?LoggerInterface $logger = null): DruidClient |
|
285 | { |
||
286 | 4 | $this->logger = $logger; |
|
287 | |||
288 | 4 | return $this; |
|
289 | } |
||
290 | |||
291 | /** |
||
292 | * Return the logger if one is set. |
||
293 | * |
||
294 | * @return \Psr\Log\LoggerInterface|null |
||
295 | */ |
||
296 | 1 | public function getLogger(): ?LoggerInterface |
|
297 | { |
||
298 | 1 | return $this->logger; |
|
299 | } |
||
300 | |||
301 | /** |
||
302 | * Set a custom guzzle client which should be used. |
||
303 | * |
||
304 | * @param GuzzleClient $client |
||
305 | * |
||
306 | * @return \Level23\Druid\DruidClient |
||
307 | */ |
||
308 | 7 | public function setGuzzleClient(GuzzleClient $client): DruidClient |
|
309 | { |
||
310 | 7 | $this->client = $client; |
|
311 | |||
312 | 7 | return $this; |
|
313 | } |
||
314 | |||
315 | /** |
||
316 | * Get the value of the config key |
||
317 | * |
||
318 | * @param string $key |
||
319 | * @param mixed|null $default |
||
320 | * |
||
321 | * @return mixed|null |
||
322 | */ |
||
323 | 329 | public function config(string $key, mixed $default = null): mixed |
|
324 | { |
||
325 | // when the broker, coordinator or overlord url is empty, then use the router url. |
||
326 | 329 | $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']); |
|
327 | |||
328 | 329 | if ($routerFallback) { |
|
329 | 2 | return $this->config[$key] ?: $this->config('router_url', $default); |
|
330 | } |
||
331 | |||
332 | 329 | return $this->config[$key] ?? $default; |
|
333 | } |
||
334 | |||
335 | /** |
||
336 | * @return \GuzzleHttp\Client |
||
337 | */ |
||
338 | 324 | protected function makeGuzzleClient(): GuzzleClient |
|
339 | { |
||
340 | 324 | return new GuzzleClient([ |
|
341 | 324 | 'timeout' => $this->config('timeout', 60), |
|
342 | 324 | 'connect_timeout' => $this->config('connect_timeout', 10), |
|
343 | 324 | 'headers' => [ |
|
344 | 324 | 'User-Agent' => 'level23 druid client package', |
|
345 | 324 | ], |
|
346 | 324 | ]); |
|
347 | } |
||
348 | |||
349 | /** |
||
350 | * @param \Psr\Http\Message\ResponseInterface $response |
||
351 | * @param array<string,string|int|array<mixed>> $query |
||
352 | * |
||
353 | * @return array<string,array<mixed>|string|int> |
||
354 | * @throws \Level23\Druid\Exceptions\QueryResponseException |
||
355 | */ |
||
356 | 5 | protected function parseResponse(ResponseInterface $response, array $query = []): array |
|
357 | { |
||
358 | 5 | $contents = $response->getBody()->getContents(); |
|
359 | try { |
||
360 | 5 | $row = json_decode($contents, true, 512, JSON_THROW_ON_ERROR) ?: []; |
|
361 | |||
362 | 4 | if (!is_array($row)) { |
|
363 | 4 | throw new InvalidArgumentException('We failed to parse response!'); |
|
364 | } |
||
365 | 1 | } catch (InvalidArgumentException|JsonException $exception) { |
|
366 | 1 | $this->log('We failed to decode druid response. '); |
|
367 | 1 | $this->log('Status code: ' . $response->getStatusCode()); |
|
368 | 1 | $this->log('Response body: ' . $contents); |
|
369 | |||
370 | 1 | throw new QueryResponseException( |
|
371 | 1 | $query, |
|
372 | 1 | 'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' . |
|
373 | 1 | 'Response body: ' . $contents, |
|
374 | 1 | $exception |
|
375 | 1 | ); |
|
376 | } |
||
377 | |||
378 | 4 | return $row; |
|
379 | } |
||
380 | |||
381 | /** |
||
382 | * Log a message |
||
383 | * |
||
384 | * @param string $message |
||
385 | * @param array<string,array<mixed>|string|int> $context |
||
386 | */ |
||
387 | 8 | protected function log(string $message, array $context = []): void |
|
388 | { |
||
389 | 8 | $this->logger?->debug($message, $context); |
|
390 | } |
||
391 | |||
392 | /** |
||
393 | * @return \Level23\Druid\Metadata\MetadataBuilder |
||
394 | */ |
||
395 | 2 | public function metadata(): MetadataBuilder |
|
396 | { |
||
397 | 2 | return new MetadataBuilder($this); |
|
398 | } |
||
399 | |||
400 | /** |
||
401 | * Fetch the status of a druid task. |
||
402 | * |
||
403 | * @param string $taskId |
||
404 | * |
||
405 | * @return \Level23\Druid\Responses\TaskResponse |
||
406 | * @throws \Exception|\GuzzleHttp\Exception\GuzzleException |
||
407 | */ |
||
408 | 2 | public function taskStatus(string $taskId): TaskResponse |
|
409 | { |
||
410 | 2 | $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status'; |
|
411 | |||
412 | 2 | $response = $this->executeRawRequest('get', $url); |
|
413 | |||
414 | 2 | return new TaskResponse($response); |
|
415 | } |
||
416 | |||
417 | /** |
||
418 | * Waits till a druid task completes and returns the status of it. |
||
419 | * |
||
420 | * @param string $taskId |
||
421 | * |
||
422 | * @return \Level23\Druid\Responses\TaskResponse |
||
423 | * @throws \Exception|\GuzzleHttp\Exception\GuzzleException |
||
424 | */ |
||
425 | 1 | public function pollTaskStatus(string $taskId): TaskResponse |
|
426 | { |
||
427 | 1 | while (true) { |
|
428 | 1 | $status = $this->taskStatus($taskId); |
|
429 | |||
430 | 1 | if ($status->getStatus() != 'RUNNING') { |
|
431 | 1 | break; |
|
432 | } |
||
433 | 1 | sleep(intval($this->config('polling_sleep_seconds'))); |
|
434 | } |
||
435 | |||
436 | 1 | return $status; |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
437 | } |
||
438 | |||
439 | /** |
||
440 | * Build a new compact task. |
||
441 | * |
||
442 | * @param string $dataSource |
||
443 | * |
||
444 | * @return \Level23\Druid\Tasks\CompactTaskBuilder |
||
445 | */ |
||
446 | 1 | public function compact(string $dataSource): CompactTaskBuilder |
|
447 | { |
||
448 | 1 | return new CompactTaskBuilder($this, $dataSource); |
|
449 | } |
||
450 | |||
451 | /** |
||
452 | * Create a kill task. |
||
453 | * |
||
454 | * @param string $dataSource |
||
455 | * |
||
456 | * @return \Level23\Druid\Tasks\KillTaskBuilder |
||
457 | */ |
||
458 | 1 | public function kill(string $dataSource): KillTaskBuilder |
|
459 | { |
||
460 | 1 | return new KillTaskBuilder($this, $dataSource); |
|
461 | } |
||
462 | |||
463 | /** |
||
464 | * Create an index task |
||
465 | * |
||
466 | * @param string $dataSource |
||
467 | * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource |
||
468 | * |
||
469 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||
470 | */ |
||
471 | 1 | public function index(string $dataSource, InputSourceInterface $inputSource): IndexTaskBuilder |
|
472 | { |
||
473 | 1 | return new IndexTaskBuilder($this, $dataSource, $inputSource); |
|
474 | } |
||
475 | |||
476 | /** |
||
477 | * Return a LookupBuilder instance. With this class you can do your lookup management, such as store, list and |
||
478 | * delete lookups. |
||
479 | * |
||
480 | * @return \Level23\Druid\Lookups\LookupBuilder |
||
481 | */ |
||
482 | 18 | public function lookup(): LookupBuilder |
|
483 | { |
||
484 | 18 | return new LookupBuilder($this); |
|
485 | } |
||
486 | |||
487 | /** |
||
488 | * Create a re-index task for druid. |
||
489 | * |
||
490 | * The $start and $stop dates are checked if they match a valid interval. Otherwise, there is a |
||
491 | * risk to of data loss. |
||
492 | * |
||
493 | * We will return a string with the task job identifier, or an exception is thrown in case of an error. |
||
494 | * Example: |
||
495 | * "index_traffic-conversions-2019-03-18T16:26:05.186Z" |
||
496 | * |
||
497 | * @param string $dataSource |
||
498 | * |
||
499 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||
500 | * @throws \Level23\Druid\Exceptions\QueryResponseException |
||
501 | * @throws \GuzzleHttp\Exception\GuzzleException |
||
502 | */ |
||
503 | 2 | public function reindex(string $dataSource): IndexTaskBuilder |
|
504 | { |
||
505 | 2 | $structure = $this->metadata()->structure($dataSource); |
|
506 | |||
507 | 2 | $builder = new IndexTaskBuilder( |
|
508 | 2 | $this, |
|
509 | 2 | $dataSource, |
|
510 | 2 | new DruidInputSource($dataSource) |
|
511 | 2 | ); |
|
512 | |||
513 | 2 | $builder->timestamp('__time', 'auto'); |
|
514 | 2 | foreach ($structure->dimensions as $dimension => $type) { |
|
515 | 1 | $builder->dimension($dimension, $type); |
|
516 | } |
||
517 | |||
518 | 2 | foreach ($structure->metrics as $metric => $type) { |
|
519 | 1 | $builder->sum($metric, $metric, $type); |
|
520 | } |
||
521 | |||
522 | 2 | return $builder; |
|
523 | } |
||
524 | } |