Passed
Push — master ( dcfc52...d45e42 )
by Teye
05:06
created

DruidClient   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 485
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 8
Bugs 3 Features 0
Metric Value
wmc 44
eloc 129
dl 0
loc 485
ccs 140
cts 140
cp 1
rs 8.8798
c 8
b 3
f 0

22 Methods

Rating   Name   Duplication   Size   Complexity  
A reindex() 0 20 3
A auth() 0 9 1
A getLogger() 0 3 1
A cancelQuery() 0 5 1
A config() 0 10 3
A executeTask() 0 17 1
C executeRawRequest() 0 64 13
A __construct() 0 5 2
A setLogger() 0 5 1
A taskStatus() 0 7 1
A usleep() 0 3 1
A pollTaskStatus() 0 12 3
A metadata() 0 3 1
A parseResponse() 0 23 4
A query() 0 3 1
A makeGuzzleClient() 0 7 1
A executeQuery() 0 11 1
A kill() 0 3 1
A setGuzzleClient() 0 5 1
A compact() 0 3 1
A index() 0 3 1
A log() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like DruidClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use DruidClient, and based on these observations, apply Extract Interface, too.

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid;
5
6
use JsonException;
7
use Psr\Log\LoggerInterface;
8
use InvalidArgumentException;
9
use Level23\Druid\Types\Granularity;
10
use GuzzleHttp\Client as GuzzleClient;
11
use Level23\Druid\Tasks\TaskInterface;
12
use Level23\Druid\Queries\QueryBuilder;
13
use Psr\Http\Message\ResponseInterface;
14
use Level23\Druid\Tasks\KillTaskBuilder;
15
use GuzzleHttp\Exception\ServerException;
16
use Level23\Druid\Queries\QueryInterface;
17
use Level23\Druid\Tasks\IndexTaskBuilder;
18
use Level23\Druid\Responses\TaskResponse;
19
use Level23\Druid\Metadata\MetadataBuilder;
20
use Level23\Druid\Tasks\CompactTaskBuilder;
21
use Level23\Druid\InputSources\DruidInputSource;
22
use Level23\Druid\Exceptions\QueryResponseException;
23
use Level23\Druid\InputSources\InputSourceInterface;
24
use function json_decode;
25
26
class DruidClient
27
{
28
    protected GuzzleClient $client;
29
30
    protected ?LoggerInterface $logger = null;
31
32
    /**
33
     * @var null|array{0:string,1:string}
34
     */
35
    protected ?array $auth = null;
36
37
    /**
38
     * @var array<string,string|int>
39
     */
40
    protected array $config = [
41
42
        // Domain + optional port or the druid router. If this is set, it will be used for the broker,
43
        // coordinator and overlord.
44
        'router_url'            => '',
45
46
        // Domain + optional port. Don't add the api path like "/druid/v2"
47
        'broker_url'            => '',
48
49
        // Domain + optional port. Don't add the api path like "/druid/coordinator/v1"
50
        'coordinator_url'       => '',
51
52
        // Domain + optional port. Don't add the api path like "/druid/indexer/v1"
53
        'overlord_url'          => '',
54
55
        // The maximum duration in seconds of a druid query. If the response takes longer, we will close the connection.
56
        'timeout'               => 60,
57
58
        // The maximum duration in seconds of connecting to the druid instance.
59
        'connect_timeout'       => 10,
60
61
        // The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to
62
        // execute the query in worst case 3 times.
63
        //
64
        // First time is the normal attempt to execute the query.
65
        // Then we do the FIRST retry.
66
        // Then we do the SECOND retry.
67
        'retries'               => 2,
68
69
        // When a query fails to be executed, this is the delay before a query is retried.
70
        // Default is 500 ms, which is 0.5 seconds.
71
        //
72
        // Set to 0 to disable they delay between retries.
73
        'retry_delay_ms'        => 500,
74
75
        // Amount of time in seconds to wait till we try and poll a task status again.
76
        'polling_sleep_seconds' => 2,
77
    ];
78
79
    /**
80
     * DruidService constructor.
81
     *
82
     * @param array<string,string|int> $config The configuration for this client.
83
     * @param \GuzzleHttp\Client|null  $client
84
     */
85 450
    public function __construct(array $config, ?GuzzleClient $client = null)
86
    {
87 450
        $this->config = array_merge($this->config, $config);
88
89 450
        $this->client = $client ?: $this->makeGuzzleClient();
90
    }
91
92
    /**
93
     * Provide HTTP basic auth credentials.
94
     *
95
     * @param string $username
96
     * @param string $password
97
     *
98
     * @return $this
99
     */
100 1
    public function auth(
101
        #[\SensitiveParameter]
102
        string $username,
103
        #[\SensitiveParameter]
104
        string $password
105
    ): self {
106 1
        $this->auth = [$username, $password];
107
108 1
        return $this;
109
    }
110
111
    /**
112
     * Create a new query using the druid query builder.
113
     *
114
     * @param string             $dataSource
115
     * @param string|Granularity $granularity
116
     *
117
     * @return \Level23\Druid\Queries\QueryBuilder
118
     */
119 3
    public function query(string $dataSource = '', string|Granularity $granularity = Granularity::ALL): QueryBuilder
120
    {
121 3
        return new QueryBuilder($this, $dataSource, $granularity);
122
    }
123
124
    /**
125
     * Cancel the execution of a query with the given query identifier.
126
     *
127
     * @param string $identifier
128
     *
129
     * @throws \GuzzleHttp\Exception\GuzzleException
130
     * @throws \Level23\Druid\Exceptions\QueryResponseException
131
     */
132 1
    public function cancelQuery(string $identifier): void
133
    {
134 1
        $this->executeRawRequest(
135 1
            'DELETE',
136 1
            $this->config('broker_url') . '/druid/v2/' . $identifier
137 1
        );
138
    }
139
140
    /**
141
     * Execute a druid query and return the response.
142
     *
143
     * @param \Level23\Druid\Queries\QueryInterface $druidQuery
144
     *
145
     * @return array<string|int,array<mixed>|string|int>
146
     * @throws \Level23\Druid\Exceptions\QueryResponseException
147
     * @throws \GuzzleHttp\Exception\GuzzleException
148
     */
149 1
    public function executeQuery(QueryInterface $druidQuery): array
150
    {
151 1
        $query = $druidQuery->toArray();
152
153 1
        $this->log('Executing druid query: ' . var_export($query, true));
154
155 1
        $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query);
156
157 1
        $this->log('Received druid response: ' . var_export($result, true));
158
159 1
        return $result;
160
    }
161
162
    /**
163
     * Execute a druid task and return the response.
164
     *
165
     * @param \Level23\Druid\Tasks\TaskInterface $task
166
     *
167
     * @return string The task identifier
168
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
169
     */
170 1
    public function executeTask(TaskInterface $task): string
171
    {
172
        /** @var array<string,array<mixed>|int|string> $payload */
173 1
        $payload = $task->toArray();
174
175 1
        $this->log('Executing druid task: ' . var_export($payload, true));
176
177
        /** @var string[] $result */
178 1
        $result = $this->executeRawRequest(
179 1
            'post',
180 1
            $this->config('overlord_url') . '/druid/indexer/v1/task',
181 1
            $payload
182 1
        );
183
184 1
        $this->log('Received task response: ' . var_export($result, true));
185
186 1
        return $result['task'];
187
    }
188
189
    /**
190
     * Execute a raw druid request and return the response.
191
     *
192
     * @param string                                $method POST or GET
193
     * @param string                                $url    The url where to send the "query" to.
194
     * @param array<string,string|int|array<mixed>> $data   The data to POST or GET.
195
     *
196
     * @return array<string,array<mixed>|string|int>
197
     * @throws \Level23\Druid\Exceptions\QueryResponseException
198
     * @throws \GuzzleHttp\Exception\GuzzleException
199
     */
200 16
    public function executeRawRequest(string $method, string $url, array $data = []): array
201
    {
202 16
        $retries = 0;
203
204
        begin:
205
        try {
206 16
            if (strtolower($method) == 'post') {
207 6
                $response = $this->client->post($url, array_merge([
208 6
                    'json' => $data,
209 6
                ], $this->auth ? ['auth' => $this->auth] : []));
210 10
            } elseif (strtolower($method) == 'delete') {
211 1
                $response = $this->client->delete($url, $this->auth ? ['auth' => $this->auth] : []);
212
            } else {
213 9
                $response = $this->client->get($url, array_merge([
214 9
                    'query' => $data,
215 9
                ], $this->auth ? ['auth' => $this->auth] : []));
216
            }
217
218 5
            if ($response->getStatusCode() == 204 || $response->getStatusCode() == 202) {
219 2
                return [];
220
            }
221
222 3
            return $this->parseResponse($response, $data);
223 11
        } catch (ServerException $exception) {
224
225 9
            $configRetries = intval($this->config('retries', 2));
226 9
            $configDelay   = intval($this->config('retry_delay_ms', 500));
227
            // Should we attempt a retry?
228 9
            if ($retries++ < $configRetries) {
229 4
                $this->log('Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries);
230 4
                $this->log($exception->getMessage());
231 4
                $this->log($exception->getTraceAsString());
232
233 4
                if ($configDelay > 0) {
234 3
                    $this->log('Sleep for ' . $configDelay . ' ms');
235 3
                    $this->usleep(($configDelay * 1000));
236
                }
237 4
                goto begin;
238
            }
239
240
            /** @var ResponseInterface $response */
241 9
            $response = $exception->getResponse();
242
243
            // Bad gateway, this happens for instance when all brokers are unavailable.
244 9
            if ($response->getStatusCode() == 502) {
245 6
                throw new QueryResponseException(
246 6
                    $data,
247 6
                    'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.',
248 6
                    $exception
249 6
                );
250
            }
251
252
            /** @var array<string,string> $error */
253 3
            $error = $this->parseResponse($response, $data);
254
255
            // When it's not a formatted error response from druid we rethrow the original exception
256 3
            if (!isset($error['error'], $error['errorMessage'])) {
257 2
                throw $exception;
258
            }
259
260 1
            throw new QueryResponseException(
261 1
                $data,
262 1
                sprintf('%s: %s', $error['error'], $error['errorMessage']),
263 1
                $exception
264 1
            );
265
        }
266
    }
267
268
    /**
269
     * @param int $microSeconds
270
     *
271
     * @codeCoverageIgnore
272
     */
273
    protected function usleep(int $microSeconds): void
274
    {
275
        usleep($microSeconds);
276
    }
277
278
    /**
279
     * @param \Psr\Log\LoggerInterface|null $logger
280
     *
281
     * @return DruidClient
282
     */
283 4
    public function setLogger(?LoggerInterface $logger = null): DruidClient
284
    {
285 4
        $this->logger = $logger;
286
287 4
        return $this;
288
    }
289
290
    /**
291
     * Return the logger if one is set.
292
     *
293
     * @return \Psr\Log\LoggerInterface|null
294
     */
295 1
    public function getLogger(): ?LoggerInterface
296
    {
297 1
        return $this->logger;
298
    }
299
300
    /**
301
     * Set a custom guzzle client which should be used.
302
     *
303
     * @param GuzzleClient $client
304
     *
305
     * @return \Level23\Druid\DruidClient
306
     */
307 7
    public function setGuzzleClient(GuzzleClient $client): DruidClient
308
    {
309 7
        $this->client = $client;
310
311 7
        return $this;
312
    }
313
314
    /**
315
     * Get the value of the config key
316
     *
317
     * @param string     $key
318
     * @param mixed|null $default
319
     *
320
     * @return mixed|null
321
     */
322 311
    public function config(string $key, mixed $default = null): mixed
323
    {
324
        // when the broker, coordinator or overlord url is empty, then use the router url.
325 311
        $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']);
326
327 311
        if ($routerFallback) {
328 2
            return $this->config[$key] ?: $this->config('router_url', $default);
329
        }
330
331 311
        return $this->config[$key] ?? $default;
332
    }
333
334
    /**
335
     * @return \GuzzleHttp\Client
336
     */
337 306
    protected function makeGuzzleClient(): GuzzleClient
338
    {
339 306
        return new GuzzleClient([
340 306
            'timeout'         => $this->config('timeout', 60),
341 306
            'connect_timeout' => $this->config('connect_timeout', 10),
342 306
            'headers'         => [
343 306
                'User-Agent' => 'level23 druid client package',
344 306
            ],
345 306
        ]);
346
    }
347
348
    /**
349
     * @param \Psr\Http\Message\ResponseInterface   $response
350
     * @param array<string,string|int|array<mixed>> $query
351
     *
352
     * @return array<string,array<mixed>|string|int>
353
     * @throws \Level23\Druid\Exceptions\QueryResponseException
354
     */
355 5
    protected function parseResponse(ResponseInterface $response, array $query = []): array
356
    {
357 5
        $contents = $response->getBody()->getContents();
358
        try {
359 5
            $row = json_decode($contents, true, 512, JSON_THROW_ON_ERROR) ?: [];
360
361 4
            if (!is_array($row)) {
362 4
                throw new InvalidArgumentException('We failed to parse response!');
363
            }
364 1
        } catch (InvalidArgumentException|JsonException $exception) {
365 1
            $this->log('We failed to decode druid response. ');
366 1
            $this->log('Status code: ' . $response->getStatusCode());
367 1
            $this->log('Response body: ' . $contents);
368
369 1
            throw new QueryResponseException(
370 1
                $query,
371 1
                'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' .
372 1
                'Response body: ' . $contents,
373 1
                $exception
374 1
            );
375
        }
376
377 4
        return $row;
378
    }
379
380
    /**
381
     * Log a message
382
     *
383
     * @param string                                $message
384
     * @param array<string,array<mixed>|string|int> $context
385
     */
386 8
    protected function log(string $message, array $context = []): void
387
    {
388 8
        $this->logger?->debug($message, $context);
389
    }
390
391
    /**
392
     * @return \Level23\Druid\Metadata\MetadataBuilder
393
     */
394 2
    public function metadata(): MetadataBuilder
395
    {
396 2
        return new MetadataBuilder($this);
397
    }
398
399
    /**
400
     * Fetch the status of a druid task.
401
     *
402
     * @param string $taskId
403
     *
404
     * @return \Level23\Druid\Responses\TaskResponse
405
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
406
     */
407 2
    public function taskStatus(string $taskId): TaskResponse
408
    {
409 2
        $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status';
410
411 2
        $response = $this->executeRawRequest('get', $url);
412
413 2
        return new TaskResponse($response);
414
    }
415
416
    /**
417
     * Waits till a druid task completes and returns the status of it.
418
     *
419
     * @param string $taskId
420
     *
421
     * @return \Level23\Druid\Responses\TaskResponse
422
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
423
     */
424 1
    public function pollTaskStatus(string $taskId): TaskResponse
425
    {
426 1
        while (true) {
427 1
            $status = $this->taskStatus($taskId);
428
429 1
            if ($status->getStatus() != 'RUNNING') {
430 1
                break;
431
            }
432 1
            sleep(intval($this->config('polling_sleep_seconds')));
433
        }
434
435 1
        return $status;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $status does not seem to be defined for all execution paths leading up to this point.
Loading history...
436
    }
437
438
    /**
439
     * Build a new compact task.
440
     *
441
     * @param string $dataSource
442
     *
443
     * @return \Level23\Druid\Tasks\CompactTaskBuilder
444
     */
445 1
    public function compact(string $dataSource): CompactTaskBuilder
446
    {
447 1
        return new CompactTaskBuilder($this, $dataSource);
448
    }
449
450
    /**
451
     * Create a kill task.
452
     *
453
     * @param string $dataSource
454
     *
455
     * @return \Level23\Druid\Tasks\KillTaskBuilder
456
     */
457 1
    public function kill(string $dataSource): KillTaskBuilder
458
    {
459 1
        return new KillTaskBuilder($this, $dataSource);
460
    }
461
462
    /**
463
     * Create an index task
464
     *
465
     * @param string                                           $dataSource
466
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
467
     *
468
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
469
     */
470 1
    public function index(string $dataSource, InputSourceInterface $inputSource): IndexTaskBuilder
471
    {
472 1
        return new IndexTaskBuilder($this, $dataSource, $inputSource);
473
    }
474
475
    /**
476
     * Create a re-index task for druid.
477
     *
478
     * The $start and $stop dates are checked if they match a valid interval. Otherwise, there is a
479
     * risk to of data loss.
480
     *
481
     * We will return a string with the task job identifier, or an exception is thrown in case of an error.
482
     * Example:
483
     * "index_traffic-conversions-2019-03-18T16:26:05.186Z"
484
     *
485
     * @param string $dataSource
486
     *
487
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
488
     * @throws \Level23\Druid\Exceptions\QueryResponseException
489
     * @throws \GuzzleHttp\Exception\GuzzleException
490
     */
491 2
    public function reindex(string $dataSource): IndexTaskBuilder
492
    {
493 2
        $structure = $this->metadata()->structure($dataSource);
494
495 2
        $builder = new IndexTaskBuilder(
496 2
            $this,
497 2
            $dataSource,
498 2
            new DruidInputSource($dataSource)
499 2
        );
500
501 2
        $builder->timestamp('__time', 'auto');
502 2
        foreach ($structure->dimensions as $dimension => $type) {
503 1
            $builder->dimension($dimension, $type);
504
        }
505
506 2
        foreach ($structure->metrics as $metric => $type) {
507 1
            $builder->sum($metric, $metric, $type);
508
        }
509
510 2
        return $builder;
511
    }
512
}