Passed
Pull Request — master (#50)
by Teye
15:39
created

MetadataBuilder::rowCount()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 17
ccs 9
cts 9
cp 1
rs 10
c 0
b 0
f 0
cc 3
nc 3
nop 3
crap 3
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Metadata;
5
6
use DateTime;
7
use Exception;
8
use DateTimeInterface;
9
use InvalidArgumentException;
10
use Level23\Druid\DruidClient;
11
use Level23\Druid\Types\TimeBound;
12
use Level23\Druid\Filters\FilterBuilder;
13
use Level23\Druid\Context\ContextInterface;
14
use Level23\Druid\DataSources\DataSourceInterface;
15
use Level23\Druid\Exceptions\QueryResponseException;
16
17
class MetadataBuilder
18
{
19
    protected DruidClient $client;
20
21 29
    public function __construct(DruidClient $client)
22
    {
23 29
        $this->client = $client;
24
    }
25
26
    /**
27
     * Return all intervals for the given dataSource.
28
     * Return an array containing the interval.
29
     *
30
     * We will store the result in static cache to prevent multiple requests.
31
     *
32
     * Example response:
33
     * [
34
     *   "2019-08-19T14:00:00.000Z/2019-08-19T15:00:00.000Z" => [ "size" => 75208,  "count" => 4 ],
35
     *   "2019-08-19T13:00:00.000Z/2019-08-19T14:00:00.000Z" => [ "size" => 161870, "count" => 8 ],
36
     * ]
37
     *
38
     * @param string $dataSource
39
     *
40
     * @return array<string,array<string,string|int>>
41
     *
42
     * @throws \Level23\Druid\Exceptions\QueryResponseException
43
     * @throws \GuzzleHttp\Exception\GuzzleException
44
     */
45 1
    public function intervals(string $dataSource): array
46
    {
47 1
        static $intervals = [];
48
49 1
        if (!array_key_exists($dataSource, $intervals)) {
50 1
            $url = $this->client->config('coordinator_url') . '/druid/coordinator/v1/datasources/' . urlencode($dataSource) . '/intervals';
51
52 1
            $intervals[$dataSource] = $this->client->executeRawRequest('get', $url, ['simple' => '']);
53
        }
54
55 1
        return $intervals[$dataSource];
56
    }
57
58
    /**
59
     * Return the time boundary for the given dataSource.
60
     * This finds the first and/or last occurrence of a record in the given dataSource.
61
     * Optionally, you can also apply a filter. For example, to only see when the first and/or last occurrence
62
     * was for a record where a specific condition was met.
63
     *
64
     * The return type varies per given $bound. If TimeBound::BOTH was given (or null, which is the same),
65
     * we will return an array with the minTime and maxTime:
66
     * ```
67
     * array(
68
     *  'minTime' => \DateTime object,
69
     *  'maxTime' => \DateTime object
70
     * )
71
     * ```
72
     *
73
     * If only one time was requested with either TimeBound::MIN_TIME or TimeBound::MAX_TIME, we will return
74
     * a DateTime object.
75
     *
76
     * @param string|\Level23\Druid\DataSources\DataSourceInterface $dataSource
77
     * @param string|\Level23\Druid\Types\TimeBound|null            $bound
78
     * @param \Closure|null                                         $filterBuilder
79
     * @param \Level23\Druid\Context\ContextInterface|null          $context
80
     *
81
     * @return ( $bound is null ? array<\DateTime> : ( $bound is "both" ? array<DateTime> : \DateTime))
0 ignored issues
show
Documentation Bug introduced by
The doc comment ( at position 1 could not be parsed: the token is null at position 1.
Loading history...
82
     * @throws \GuzzleHttp\Exception\GuzzleException
83
     * @throws \Level23\Druid\Exceptions\QueryResponseException
84
     * @throws \Exception
85
     */
86 10
    public function timeBoundary(
87
        string|DataSourceInterface $dataSource,
88
        null|string|TimeBound $bound = TimeBound::BOTH,
89
        \Closure $filterBuilder = null,
90
        ContextInterface $context = null
91
    ): DateTime|array {
92
93 10
        $query = [
94 10
            'queryType'  => 'timeBoundary',
95 10
            'dataSource' => is_string($dataSource) ? $dataSource : $dataSource->toArray(),
96 10
        ];
97
98 10
        if (is_string($bound)) {
99 1
            $bound = TimeBound::from($bound);
100
        }
101
102 10
        if (!empty($bound) && $bound != TimeBound::BOTH) {
103 2
            $query['bound'] = $bound->value;
104
        }
105
106 10
        if ($filterBuilder) {
107 1
            $builder = new FilterBuilder();
108 1
            call_user_func($filterBuilder, $builder);
109 1
            $filter = $builder->getFilter();
110
111 1
            if ($filter) {
112 1
                $query['filter'] = $filter->toArray();
113
            }
114
        }
115
116 10
        if ($context) {
117 1
            $query['context'] = $context->toArray();
118
        }
119
120 10
        $url = $this->client->config('broker_url') . '/druid/v2';
121
122
        /** @var array<int,null|array<string,string[]|string>> $response */
123 10
        $response = $this->client->executeRawRequest('post', $url, $query);
124
125 10
        if (!empty($response[0])
126 10
            && !empty($response[0]['result'])
127 10
            && is_array($response[0]['result'])
128
        ) {
129 9
            if (sizeof($response[0]['result']) == 1) {
130 3
                $dateString = reset($response[0]['result']);
131 3
                $date       = DateTime::createFromFormat('Y-m-d\TH:i:s.000\Z', $dateString);
132
133 3
                if (!$date) {
0 ignored issues
show
introduced by
$date is of type DateTime, thus it always evaluated to true.
Loading history...
134 1
                    throw new Exception('Failed to parse time: ' . $dateString);
135
                }
136
137 2
                return $date;
138
            } else {
139 6
                $result = [];
140 6
                foreach ($response[0]['result'] as $key => $dateString) {
141
                    /** @var string $key */
142 6
                    $date = DateTime::createFromFormat('Y-m-d\TH:i:s.000\Z', $dateString);
143
144 6
                    if (!$date) {
145 1
                        throw new Exception('Failed to parse time: ' . $dateString);
146
                    }
147
148 5
                    $result[$key] = $date;
149
                }
150
151 5
                return $result;
152
            }
153
        }
154
155 1
        throw new Exception('Received incorrect response: ' . var_export($response, true));
156
    }
157
158
    /**
159
     * Returns a map of segment intervals contained within the specified interval to a map of segment metadata to a set
160
     * of server names that contain the segment for an interval.
161
     * The latest intervals will come as first, the oldest as last.
162
     *
163
     * Example response:
164
     *
165
     * Array
166
     * (
167
     *     [2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z] => Array
168
     *         (
169
     *             [traffic-conversions_2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z_2019-05-15T11:29:56.874Z] =>
170
     *             Array
171
     *                 (
172
     *                     [metadata] => Array
173
     *                         (
174
     *                             [dataSource] => traffic-conversions
175
     *                             [interval] => 2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z
176
     *                             [version] => 2019-05-15T11:29:56.874Z
177
     *                             [loadSpec] => Array
178
     *                                 (
179
     *                                     [type] => s3_zip
180
     *                                     [bucket] => level23-druid-data
181
     *                                     [key] =>
182
     *                                     druid/segments/traffic-conversions/2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z/2019-05-15T11:29:56.874Z/0/index.zip
183
     *                                     [S3Schema] => s3n
184
     *                                 )
185
     *
186
     *                             [dimensions] =>
187
     *                             country_iso,flags,mccmnc,offer_id,product_type_id,promo_id,test_data_id,test_data_reason,third_party_id
188
     *                             [metrics] => conversion_time,conversions,revenue_external,revenue_internal
189
     *                             [shardSpec] => Array
190
     *                                 (
191
     *                                     [type] => numbered
192
     *                                     [partitionNum] => 0
193
     *                                     [partitions] => 0
194
     *                                 )
195
     *
196
     *                             [binaryVersion] => 9
197
     *                             [size] => 272709
198
     *                             [identifier] =>
199
     *                             traffic-conversions_2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z_2019-05-15T11:29:56.874Z
200
     *                         )
201
     *
202
     *                     [servers] => Array
203
     *                         (
204
     *                             [0] => 172.31.23.160:8083
205
     *                             [1] => 172.31.3.204:8083
206
     *                         )
207
     *
208
     *                 )
209
     *
210
     *         )
211
     *
212
     * )
213
     *
214
     * @param string $dataSource
215
     * @param string $interval
216
     *
217
     * @return array<string,array<mixed>|string|int>
218
     * @throws \Level23\Druid\Exceptions\QueryResponseException
219
     * @throws \GuzzleHttp\Exception\GuzzleException
220
     */
221 1
    public function interval(string $dataSource, string $interval): array
222
    {
223 1
        $url = $this->client->config('coordinator_url') .
224 1
            '/druid/coordinator/v1/datasources/' . urlencode($dataSource) .
225 1
            '/intervals/' . urlencode($interval);
226
227 1
        return $this->client->executeRawRequest('get', $url, ['full' => '']);
228
    }
229
230
    /**
231
     * Get the columns for the given interval. This will return something like this:
232
     *
233
     *   Array
234
     *  (
235
     *      0 => Array
236
     *          (
237
     *              [field] => __time
238
     *              [type] => LONG
239
     *              [hasMultipleValues] =>
240
     *              [size] => 0
241
     *              [cardinality] =>
242
     *              [minValue] =>
243
     *              [maxValue] =>
244
     *              [errorMessage] =>
245
     *          )
246
     *      1 => Array
247
     *          (
248
     *              [field] => delta
249
     *              [type] => LONG
250
     *              [hasMultipleValues] =>
251
     *              [size] => 0
252
     *              [cardinality] =>
253
     *              [minValue] =>
254
     *              [maxValue] =>
255
     *              [errorMessage] =>
256
     *          )
257
     *      2 => Array
258
     *          (
259
     *              [field] => cityName
260
     *              [type] => STRING
261
     *              [hasMultipleValues] =>
262
     *              [size] => 0
263
     *              [cardinality] => 59
264
     *              [minValue] => af
265
     *              [maxValue] => zm
266
     *              [errorMessage] =>
267
     *          )
268
     *      3 => Array
269
     *          (
270
     *              [field] => comment
271
     *              [type] => STRING
272
     *              [hasMultipleValues] =>
273
     *              [size] => 0
274
     *              [cardinality] => 84
275
     *              [minValue] =>
276
     *              [maxValue] => 74807
277
     *              [errorMessage] =>
278
     *          )
279
     *      4 => Array
280
     *          (
281
     *              [field] => added
282
     *              [type] => LONG
283
     *              [hasMultipleValues] =>
284
     *              [size] => 0
285
     *              [cardinality] =>
286
     *              [minValue] =>
287
     *              [maxValue] =>
288
     *              [errorMessage] =>
289
     *          )
290
     *  )
291
     *
292
     * @param string                             $dataSource
293
     * @param \DateTimeInterface|int|string      $start
294
     * @param \DateTimeInterface|int|string|null $stop
295
     *
296
     * @return array<int,array<string,string>>
297
     * @throws \GuzzleHttp\Exception\GuzzleException
298
     * @throws \Level23\Druid\Exceptions\QueryResponseException
299
     * @throws \Exception
300
     */
301 1
    protected function getColumnsForInterval(
302
        string $dataSource,
303
        DateTimeInterface|int|string $start,
304
        DateTimeInterface|int|string $stop = null
305
    ): array {
306 1
        $response = $this->client->query($dataSource)
307 1
            ->interval($start, $stop)
308 1
            ->segmentMetadata();
309
310 1
        $columns = [];
311
312 1
        $rows = $response->data();
313
314 1
        if (isset($rows[0])) {
315
316
            /** @var array<string,array<string,array<string,string>>> $row */
317 1
            $row = $rows[0];
318
319 1
            if (isset($row['columns'])) {
320 1
                array_walk($row['columns'], function ($value, $key) use (&$columns) {
321 1
                    $columns[] = array_merge($value, ['field' => $key]);
322 1
                });
323
            }
324
        }
325
326 1
        return $columns;
327
    }
328
329
    /**
330
     * Return the total number of rows for the given interval
331
     *
332
     * @param string                             $dataSource The name of the dataSource where you want to count the
333
     *                                                       rows for
334
     * @param \DateTimeInterface|int|string      $start      The start of the interval.
335
     * @param \DateTimeInterface|int|string|null $stop       The end of the interval, or null when it was given as a
336
     *                                                       "date/date" interval in the $start parameter.
337
     *
338
     * @return int
339
     * @throws \GuzzleHttp\Exception\GuzzleException
340
     * @throws \Level23\Druid\Exceptions\QueryResponseException
341
     * @throws \Exception
342
     */
343 1
    public function rowCount(
344
        string $dataSource,
345
        DateTimeInterface|int|string $start,
346
        DateTimeInterface|int|string $stop = null
347
    ): int {
348 1
        $response = $this->client->query($dataSource)
349 1
            ->interval($start, $stop)
350 1
            ->segmentMetadata();
351
352 1
        $totalRows = 0;
353 1
        foreach ($response->data() as $row) {
354 1
            if (isset($row['numRows'])) {
355 1
                $totalRows += intval($row['numRows']);
356
            }
357
        }
358
359 1
        return $totalRows;
360
    }
361
362
    /**
363
     * Return the druid interval by the shorthand "first" or "last".
364
     *
365
     * We will return something like "2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z"
366
     *
367
     * We will return an empty array when no interval data is found.
368
     *
369
     * @param string $dataSource
370
     * @param string $shortHand
371
     *
372
     * @return string
373
     * @throws \Level23\Druid\Exceptions\QueryResponseException
374
     * @throws \GuzzleHttp\Exception\GuzzleException
375
     */
376 3
    protected function getIntervalByShorthand(string $dataSource, string $shortHand): string
377
    {
378
        // Get the interval which we will use to do a "structure" scan.
379 3
        $shortHand = strtolower($shortHand);
380 3
        if ($shortHand != 'last' && $shortHand != 'first') {
381 1
            throw new InvalidArgumentException('Only shorthand "first" and "last" are supported!');
382
        }
383
384 2
        $intervals = array_keys($this->intervals($dataSource));
385
386 2
        if ($shortHand == 'last') {
387 1
            return $intervals[0] ?? '';
388
        }
389
390 1
        return $intervals[count($intervals) - 1] ?? '';
391
    }
392
393
    /**
394
     * Generate a structure class for the given dataSource.
395
     *
396
     * @param string $dataSource
397
     * @param string $interval "last", "first" or a raw interval string as returned by druid.
398
     *
399
     * @return \Level23\Druid\Metadata\Structure
400
     * @throws \GuzzleHttp\Exception\GuzzleException
401
     * @throws \Level23\Druid\Exceptions\QueryResponseException
402
     */
403 6
    public function structure(string $dataSource, string $interval = 'last'): Structure
404
    {
405
        // shorthand given? Then retrieve the real interval for them.
406 6
        if (in_array(strtolower($interval), ['first', 'last'])) {
407 2
            $interval = $this->getIntervalByShorthand($dataSource, $interval);
408
        }
409
410 6
        if (empty($interval)) {
411 1
            throw new InvalidArgumentException(
412 1
                'Error, interval "' . $interval . '" is invalid. Maybe there are no intervals for this dataSource?'
413 1
            );
414
        }
415
416 5
        $rawStructure = $this->interval($dataSource, $interval);
417
418 5
        $structureData = reset($rawStructure);
419 5
        if (!$structureData || !is_array($structureData)) {
420 2
            throw new QueryResponseException([],
421 2
                'We failed to retrieve a correct structure for dataSource: ' . $dataSource . '.' . PHP_EOL .
422 2
                'Failed to parse raw interval structure data: ' . var_export($rawStructure, true)
423
424 2
            );
425
        }
426
427
        /** @var array<string|string[]> $data */
428 3
        $data = reset($structureData);
429
430 3
        $dimensionFields = explode(',', $data['metadata']['dimensions'] ?? '');
431 3
        $metricFields    = explode(',', $data['metadata']['metrics'] ?? '');
432
433 3
        $dimensions = [];
434 3
        $metrics    = [];
435
436 3
        $columns = $this->getColumnsForInterval($dataSource, $interval);
437
438 3
        foreach ($columns as $info) {
439 3
            $column = $info['field'];
440
441 3
            if (in_array($column, $dimensionFields)) {
442 3
                $dimensions[$column] = $info['type'];
443
            }
444 3
            if (in_array($column, $metricFields)) {
445 3
                $metrics[$column] = $info['type'];
446
            }
447
        }
448
449 3
        return new Structure($dataSource, $dimensions, $metrics);
450
    }
451
452
    /**
453
     * Return a list of all known dataSources
454
     *
455
     * @return array<string>
456
     * @throws \GuzzleHttp\Exception\GuzzleException
457
     * @throws \Level23\Druid\Exceptions\QueryResponseException
458
     */
459 1
    public function dataSources(): array
460
    {
461 1
        $url = $this->client->config('coordinator_url') . '/druid/coordinator/v1/datasources';
462
463
        /** @var array<int,string> $dataSources */
464 1
        $dataSources = $this->client->executeRawRequest('get', $url);
465
466 1
        return $dataSources;
467
    }
468
}