Completed
Pull Request — master (#20)
by Teye
07:49 queued 01:45
created

DruidClient   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 416
Duplicated Lines 0 %

Test Coverage

Coverage 99.07%

Importance

Changes 5
Bugs 3 Features 0
Metric Value
wmc 33
eloc 110
c 5
b 3
f 0
dl 0
loc 416
ccs 107
cts 108
cp 0.9907
rs 9.76

17 Methods

Rating   Name   Duplication   Size   Complexity  
A reindex() 0 16 3
A config() 0 10 3
A executeTask() 0 15 1
B executeRawRequest() 0 65 9
A __construct() 0 5 2
A setLogger() 0 5 1
A taskStatus() 0 7 1
A usleep() 0 3 1
A metadata() 0 3 1
A parseResponse() 0 19 3
A makeGuzzleClient() 0 7 1
A executeQuery() 0 11 1
A query() 0 3 1
A kill() 0 3 1
A setGuzzleClient() 0 5 1
A compact() 0 3 1
A log() 0 4 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid;
5
6
use Psr\Log\LoggerInterface;
7
use InvalidArgumentException;
8
use Level23\Druid\Types\Granularity;
9
use GuzzleHttp\Client as GuzzleClient;
10
use Level23\Druid\Tasks\TaskInterface;
11
use Level23\Druid\Queries\QueryBuilder;
12
use Psr\Http\Message\ResponseInterface;
13
use Level23\Druid\Tasks\KillTaskBuilder;
14
use GuzzleHttp\Exception\ServerException;
15
use Level23\Druid\Queries\QueryInterface;
16
use Level23\Druid\Tasks\IndexTaskBuilder;
17
use Level23\Druid\Responses\TaskResponse;
18
use Level23\Druid\Metadata\MetadataBuilder;
19
use Level23\Druid\Tasks\CompactTaskBuilder;
20
use Level23\Druid\InputSources\DruidInputSource;
21
use Level23\Druid\Exceptions\QueryResponseException;
22
23
class DruidClient
24
{
25
    /**
26
     * @var GuzzleClient
27
     */
28
    protected $client;
29
30
    /**
31
     * @var LoggerInterface|null
32
     */
33
    protected $logger = null;
34
35
    /**
36
     * @var array<string, int|string|null>
37
     */
38
    protected $config = [
39
40
        /**
41
         * Domain + optional port or the druid router. If this is set, it will be used for the broker,
42
         * coordinator and overlord.
43
         */
44
        'router_url'      => '',
45
46
        /**
47
         * Domain + optional port. Don't add the api path like "/druid/v2"
48
         */
49
        'broker_url'      => '',
50
51
        /**
52
         * Domain + optional port. Don't add the api path like "/druid/coordinator/v1"
53
         */
54
        'coordinator_url' => '',
55
56
        /**
57
         * Domain + optional port. Don't add the api path like "/druid/indexer/v1"
58
         */
59
        'overlord_url'    => '',
60
61
        /**
62
         * The maximum duration of a druid query. If the response takes longer, we will close the connection.
63
         */
64
        'timeout'         => 60,
65
66
        /**
67
         * The maximum duration of connecting to the druid instance.
68
         */
69
        'connect_timeout' => 10,
70
71
        /**
72
         * The number of times we will try to do a retry in case of a failure. So if retries is 2, we will try to
73
         * execute the query in worst case 3 times.
74
         *
75
         * First time is the normal attempt to execute the query.
76
         * Then we do the FIRST retry.
77
         * Then we do the SECOND retry.
78
         */
79
        'retries'         => 2,
80
81
        /**
82
         * When a query fails to be executed, this is the delay before a query is retried.
83
         * Default is 500 ms, which is 0.5 seconds.
84
         *
85
         * Set to 0 to disable they delay between retries.
86
         */
87
        'retry_delay_ms'  => 500,
88
89
        /**
90
         * Supply the druid version which you are sending your queries to.
91
         * Based on this druid version, we can enable / disable certain new features if your
92
         * server supports this.
93
         */
94
        'version'         => null,
95
    ];
96
97
    /**
98
     * DruidService constructor.
99
     *
100
     * @param array                   $config The configuration for this client.
101
     * @param \GuzzleHttp\Client|null $client
102
     */
103 350
    public function __construct(array $config, GuzzleClient $client = null)
104
    {
105 350
        $this->config = array_merge($this->config, $config);
106
107 350
        $this->client = $client ?: $this->makeGuzzleClient();
108 350
    }
109
110
    /**
111
     * Create a new query using the druid query builder.
112
     *
113
     * @param string $dataSource
114
     * @param string $granularity
115
     *
116
     * @return \Level23\Druid\Queries\QueryBuilder
117
     */
118 3
    public function query(string $dataSource, $granularity = Granularity::ALL): QueryBuilder
119
    {
120 3
        return new QueryBuilder($this, $dataSource, $granularity);
121
    }
122
123
    /**
124
     * Execute a druid query and return the response.
125
     *
126
     * @param \Level23\Druid\Queries\QueryInterface $druidQuery
127
     *
128
     * @return array
129
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
130
     */
131 1
    public function executeQuery(QueryInterface $druidQuery): array
132
    {
133 1
        $query = $druidQuery->toArray();
134
135 1
        $this->log('Executing druid query', ['query' => $query]);
136
137 1
        $result = $this->executeRawRequest('post', $this->config('broker_url') . '/druid/v2', $query);
138
139 1
        $this->log('Received druid response', ['response' => $result]);
140
141 1
        return $result;
142
    }
143
144
    /**
145
     * Execute a druid task and return the response.
146
     *
147
     * @param \Level23\Druid\Tasks\TaskInterface $task
148
     *
149
     * @return string The task identifier
150
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
151
     */
152 1
    public function executeTask(TaskInterface $task): string
153
    {
154 1
        $payload = $task->toArray();
155
156 1
        $this->log('Executing druid task', ['task' => $payload]);
157
158 1
        $result = $this->executeRawRequest(
159 1
            'post',
160 1
            $this->config('overlord_url') . '/druid/indexer/v1/task',
161 1
            $payload
162
        );
163
164 1
        $this->log('Received task response', ['response' => $result]);
165
166 1
        return $result['task'];
167
    }
168
169
    /**
170
     * Execute a raw druid request and return the response.
171
     *
172
     * @param string $method POST or GET
173
     * @param string $url    The url where to send the "query" to.
174
     * @param array  $data   The data to POST or GET.
175
     *
176
     * @return array
177
     * @throws \Level23\Druid\Exceptions\QueryResponseException|\GuzzleHttp\Exception\GuzzleException
178
     */
179 14
    public function executeRawRequest(string $method, string $url, array $data = []): array
180
    {
181 14
        $retries = 0;
182
183
        begin:
184
        try {
185 14
            if (strtolower($method) == 'post') {
186 6
                $response = $this->client->post($url, [
187 6
                    'json' => $data,
188
                ]);
189
            } else {
190 8
                $response = $this->client->get($url, [
191 8
                    'query' => $data,
192
                ]);
193
            }
194
195 3
            if ($response->getStatusCode() == 204) {
196 1
                return [];
197
            }
198
199 2
            return $this->parseResponse($response, $data);
200 11
        } catch (ServerException $exception) {
201
202 9
            $configRetries = $this->config('retries', 2);
203 9
            $configDelay   = $this->config('retry_delay_ms', 500);
204
            // Should we attempt a retry?
205 9
            if ($retries++ < $configRetries) {
206 4
                $this->log(
207 4
                    'Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . $configRetries,
0 ignored issues
show
Bug introduced by
Are you sure $configRetries of type integer|mixed|string can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

207
                    'Query failed due to a server exception. Doing a retry. Retry attempt ' . $retries . ' of ' . /** @scrutinizer ignore-type */ $configRetries,
Loading history...
208 4
                    [$exception->getMessage(), $exception->getTraceAsString()]
209
                );
210
211 4
                if ($configDelay > 0) {
212 3
                    $this->log('Sleep for ' . $configDelay . ' ms');
0 ignored issues
show
Bug introduced by
Are you sure $configDelay of type integer|mixed|string can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

212
                    $this->log('Sleep for ' . /** @scrutinizer ignore-type */ $configDelay . ' ms');
Loading history...
213 3
                    $this->usleep(($configDelay * 1000));
214
                }
215 4
                goto begin;
216
            }
217
218 9
            $response = $exception->getResponse();
219
220 9
            if (!$response instanceof ResponseInterface) {
0 ignored issues
show
introduced by
$response is always a sub-type of Psr\Http\Message\ResponseInterface.
Loading history...
221
                throw $exception;
222
            }
223
224
            // Bad gateway, this happens for instance when all brokers are unavailable.
225 9
            if ($response->getStatusCode() == 502) {
226 6
                throw new QueryResponseException(
227 6
                    $data,
228 6
                    'We failed to execute druid query due to a 502 Bad Gateway response. Please try again later.',
229 6
                    $exception
230
                );
231
            }
232
233 3
            $error = $this->parseResponse($response, $data);
234
235
            // When its not a formatted error response from druid we rethrow the original exception
236 3
            if (!isset($error['error'], $error['errorMessage'])) {
237 2
                throw $exception;
238
            }
239
240 1
            throw new QueryResponseException(
241 1
                $data,
242 1
                sprintf('%s: %s', $error['error'], $error['errorMessage']),
243 1
                $exception
244
            );
245
        }
246
    }
247
248
    /**
249
     * @param int $microSeconds
250
     *
251
     * @codeCoverageIgnore
252
     */
253
    protected function usleep(int $microSeconds): void
254
    {
255
        usleep($microSeconds);
256
    }
257
258
    /**
259
     * @param LoggerInterface $logger
260
     *
261
     * @return DruidClient
262
     */
263 4
    public function setLogger(LoggerInterface $logger): DruidClient
264
    {
265 4
        $this->logger = $logger;
266
267 4
        return $this;
268
    }
269
270
    /**
271
     * Set a custom guzzle client which should be used.
272
     *
273
     * @param GuzzleClient $client
274
     *
275
     * @return \Level23\Druid\DruidClient
276
     */
277 6
    public function setGuzzleClient(GuzzleClient $client): DruidClient
278
    {
279 6
        $this->client = $client;
280
281 6
        return $this;
282
    }
283
284
    /**
285
     * Get the value of the config key
286
     *
287
     * @param string $key
288
     * @param mixed  $default
289
     *
290
     * @return mixed|null
291
     */
292 224
    public function config(string $key, $default = null)
293
    {
294
        // when the broker, coordinator or overlord url is empty, then use the router url.
295 224
        $routerFallback = in_array($key, ['broker_url', 'coordinator_url', 'overlord_url']);
296
297 224
        if ($routerFallback) {
298 1
            return $this->config[$key] ?: $this->config('router_url', $default);
299
        }
300
301 224
        return $this->config[$key] ?? $default;
302
    }
303
304
    /**
305
     * @return \GuzzleHttp\Client
306
     */
307 220
    protected function makeGuzzleClient(): GuzzleClient
308
    {
309 220
        return new GuzzleClient([
310 220
            'timeout'         => $this->config('timeout', 60),
311 220
            'connect_timeout' => $this->config('connect_timeout', 10),
312
            'headers'         => [
313
                'User-Agent' => 'level23 druid client package',
314
            ],
315
        ]);
316
    }
317
318
    /**
319
     * @param \Psr\Http\Message\ResponseInterface $response
320
     * @param array                               $query
321
     *
322
     * @return array
323
     * @throws \Level23\Druid\Exceptions\QueryResponseException
324
     */
325 4
    protected function parseResponse(ResponseInterface $response, array $query = []): array
326
    {
327 4
        $contents = $response->getBody()->getContents();
328
        try {
329 4
            $row = \GuzzleHttp\json_decode($contents, true) ?: [];
330 1
        } catch (InvalidArgumentException $exception) {
331 1
            $this->log('We failed to decode druid response. ');
332 1
            $this->log('Status code: ' . $response->getStatusCode());
333 1
            $this->log('Response body: ' . $contents);
334
335 1
            throw new QueryResponseException(
336 1
                $query,
337 1
                'Failed to parse druid response. Invalid json? Status code(' . $response->getStatusCode() . '). ' .
338 1
                'Response body: ' . $contents,
339 1
                $exception
340
            );
341
        }
342
343 3
        return (array)$row;
344
    }
345
346
    /**
347
     * Log a message
348
     *
349
     * @param string $message
350
     * @param array  $context
351
     */
352 8
    protected function log(string $message, array $context = []): void
353
    {
354 8
        if ($this->logger) {
355 4
            $this->logger->debug($message, $context);
356
        }
357 8
    }
358
359
    /**
360
     * @return \Level23\Druid\Metadata\MetadataBuilder
361
     */
362 2
    public function metadata(): MetadataBuilder
363
    {
364 2
        return new MetadataBuilder($this);
365
    }
366
367
    /**
368
     * Fetch the status of a druid task.
369
     *
370
     * @param string $taskId
371
     *
372
     * @return \Level23\Druid\Responses\TaskResponse
373
     * @throws \Exception|\GuzzleHttp\Exception\GuzzleException
374
     */
375 2
    public function taskStatus(string $taskId): TaskResponse
376
    {
377 2
        $url = $this->config('overlord_url') . '/druid/indexer/v1/task/' . urlencode($taskId) . '/status';
378
379 2
        $response = $this->executeRawRequest('get', $url);
380
381 2
        return new TaskResponse($response);
382
    }
383
384
    /**
385
     * Build a new compact task.
386
     *
387
     * @param string $dataSource
388
     *
389
     * @return \Level23\Druid\Tasks\CompactTaskBuilder
390
     */
391 1
    public function compact(string $dataSource): CompactTaskBuilder
392
    {
393 1
        return new CompactTaskBuilder($this, $dataSource);
394
    }
395
396
    /**
397
     * Create a kill task.
398
     *
399
     * @param string $dataSource
400
     *
401
     * @return \Level23\Druid\Tasks\KillTaskBuilder
402
     */
403 1
    public function kill(string $dataSource): KillTaskBuilder
404
    {
405 1
        return new KillTaskBuilder($this, $dataSource);
406
    }
407
408
    /**
409
     * Create a re-index task for druid.
410
     *
411
     * The $start and $stop dates are checked if they match a valid interval. Otherwise there is a
412
     * risk to of data loss.
413
     *
414
     * We will return an string with the task job identifier, or an exception is thrown in case of an error.
415
     * Example:
416
     * "index_traffic-conversions-2019-03-18T16:26:05.186Z"
417
     *
418
     * @param string $dataSource
419
     *
420
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
421
     * @throws \Level23\Druid\Exceptions\QueryResponseException
422
     */
423 1
    public function reindex(string $dataSource): IndexTaskBuilder
424
    {
425 1
        $structure = $this->metadata()->structure($dataSource);
426
427 1
        $builder = new IndexTaskBuilder($this, $dataSource, DruidInputSource::class);
428 1
        $builder->fromDataSource($dataSource);
429
430 1
        foreach ($structure->dimensions as $dimension => $type) {
431 1
            $builder->dimension($dimension, $type);
432
        }
433
434 1
        foreach ($structure->metrics as $metric => $type) {
435 1
            $builder->sum($metric, $metric, $type);
436
        }
437
438 1
        return $builder;
439
    }
440
}