Issues (52)

src/Metadata/MetadataBuilder.php (2 issues)

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Metadata;
5
6
use Closure;
7
use DateTime;
8
use Exception;
9
use DateTimeInterface;
10
use InvalidArgumentException;
11
use Level23\Druid\DruidClient;
12
use Level23\Druid\Types\TimeBound;
13
use Level23\Druid\Filters\FilterBuilder;
14
use Level23\Druid\Context\ContextInterface;
15
use Level23\Druid\DataSources\DataSourceInterface;
16
use Level23\Druid\Exceptions\QueryResponseException;
17
18
class MetadataBuilder
19
{
20
    protected DruidClient $client;
21
22 31
    public function __construct(DruidClient $client)
23
    {
24 31
        $this->client = $client;
25
    }
26
27
    /**
28
     * Return all intervals for the given dataSource.
29
     * Return an array containing the interval.
30
     *
31
     * We will store the result in static cache to prevent multiple requests.
32
     *
33
     * Example response:
34
     * [
35
     *   "2019-08-19T14:00:00.000Z/2019-08-19T15:00:00.000Z" => [ "size" => 75208,  "count" => 4 ],
36
     *   "2019-08-19T13:00:00.000Z/2019-08-19T14:00:00.000Z" => [ "size" => 161870, "count" => 8 ],
37
     * ]
38
     *
39
     * @param string $dataSource
40
     *
41
     * @return array<string,array<string,string|int>>
42
     *
43
     * @throws \Level23\Druid\Exceptions\QueryResponseException
44
     * @throws \GuzzleHttp\Exception\GuzzleException
45
     */
46 1
    public function intervals(string $dataSource): array
47
    {
48 1
        static $intervals = [];
49
50 1
        if (!array_key_exists($dataSource, $intervals)) {
51 1
            $url = $this->client->config('coordinator_url') . '/druid/coordinator/v1/datasources/' . urlencode($dataSource) . '/intervals';
52
53 1
            $intervals[$dataSource] = $this->client->executeRawRequest('get', $url, ['simple' => '']);
54
        }
55
56 1
        return $intervals[$dataSource];
57
    }
58
59
    /**
60
     * Return the time boundary for the given dataSource.
61
     * This finds the first and/or last occurrence of a record in the given dataSource.
62
     * Optionally, you can also apply a filter. For example, to only see when the first and/or last occurrence
63
     * was for a record where a specific condition was met.
64
     *
65
     * The return type varies per given $bound. If TimeBound::BOTH was given (or null, which is the same),
66
     * we will return an array with the minTime and maxTime:
67
     * ```
68
     * array(
69
     *  'minTime' => \DateTime object,
70
     *  'maxTime' => \DateTime object
71
     * )
72
     * ```
73
     *
74
     * If only one time was requested with either TimeBound::MIN_TIME or TimeBound::MAX_TIME, we will return
75
     * a DateTime object.
76
     *
77
     * @param string|\Level23\Druid\DataSources\DataSourceInterface $dataSource
78
     * @param string|\Level23\Druid\Types\TimeBound|null            $bound
79
     * @param \Closure|null                                         $filterBuilder
80
     * @param \Level23\Druid\Context\ContextInterface|null          $context
81
     *
82
     * @return ( $bound is null ? array<\DateTime> : ( $bound is "both" ? array<DateTime> : \DateTime))
0 ignored issues
show
Documentation Bug introduced by
The doc comment ( at position 1 could not be parsed: the token is null at position 1.
Loading history...
83
     * @throws \GuzzleHttp\Exception\GuzzleException
84
     * @throws \Level23\Druid\Exceptions\QueryResponseException
85
     * @throws \Exception
86
     */
87 10
    public function timeBoundary(
88
        string|DataSourceInterface $dataSource,
89
        null|string|TimeBound $bound = TimeBound::BOTH,
90
        ?Closure $filterBuilder = null,
91
        ?ContextInterface $context = null
92
    ): DateTime|array {
93
94 10
        $query = [
95 10
            'queryType'  => 'timeBoundary',
96 10
            'dataSource' => is_string($dataSource) ? $dataSource : $dataSource->toArray(),
97 10
        ];
98
99 10
        if (is_string($bound)) {
100 1
            $bound = TimeBound::from($bound);
101
        }
102
103 10
        if (!empty($bound) && $bound != TimeBound::BOTH) {
104 2
            $query['bound'] = $bound->value;
105
        }
106
107 10
        if ($filterBuilder) {
108 1
            $builder = new FilterBuilder();
109 1
            call_user_func($filterBuilder, $builder);
110 1
            $filter = $builder->getFilter();
111
112 1
            if ($filter) {
113 1
                $query['filter'] = $filter->toArray();
114
            }
115
        }
116
117 10
        if ($context) {
118 1
            $query['context'] = $context->toArray();
119
        }
120
121 10
        $url = $this->client->config('broker_url') . '/druid/v2';
122
123
        /** @var array<int,null|array<string,string[]|string>> $response */
124 10
        $response = $this->client->executeRawRequest('post', $url, $query);
125
126 10
        if (!empty($response[0])
127 10
            && !empty($response[0]['result'])
128 10
            && is_array($response[0]['result'])
129
        ) {
130 9
            if (sizeof($response[0]['result']) == 1) {
131 3
                $dateString = reset($response[0]['result']);
132 3
                $date       = DateTime::createFromFormat('Y-m-d\TH:i:s.000\Z', $dateString);
133
134 3
                if (!$date) {
0 ignored issues
show
$date is of type DateTime, thus it always evaluated to true.
Loading history...
135 1
                    throw new Exception('Failed to parse time: ' . $dateString);
136
                }
137
138 2
                return $date;
139
            } else {
140 6
                $result = [];
141 6
                foreach ($response[0]['result'] as $key => $dateString) {
142
                    /** @var string $key */
143 6
                    $date = DateTime::createFromFormat('Y-m-d\TH:i:s.000\Z', $dateString);
144
145 6
                    if (!$date) {
146 1
                        throw new Exception('Failed to parse time: ' . $dateString);
147
                    }
148
149 5
                    $result[$key] = $date;
150
                }
151
152 5
                return $result;
153
            }
154
        }
155
156 1
        throw new Exception('Received incorrect response: ' . var_export($response, true));
157
    }
158
159
    /**
160
     * Returns a map of segment intervals contained within the specified interval to a map of segment metadata to a set
161
     * of server names that contain the segment for an interval.
162
     * The latest intervals will come as first, the oldest as last.
163
     *
164
     * Example response:
165
     *
166
     * Array
167
     * (
168
     *     [2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z] => Array
169
     *         (
170
     *             [traffic-conversions_2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z_2019-05-15T11:29:56.874Z] =>
171
     *             Array
172
     *                 (
173
     *                     [metadata] => Array
174
     *                         (
175
     *                             [dataSource] => traffic-conversions
176
     *                             [interval] => 2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z
177
     *                             [version] => 2019-05-15T11:29:56.874Z
178
     *                             [loadSpec] => Array
179
     *                                 (
180
     *                                     [type] => s3_zip
181
     *                                     [bucket] => level23-druid-data
182
     *                                     [key] =>
183
     *                                     druid/segments/traffic-conversions/2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z/2019-05-15T11:29:56.874Z/0/index.zip
184
     *                                     [S3Schema] => s3n
185
     *                                 )
186
     *
187
     *                             [dimensions] =>
188
     *                             country_iso,flags,mccmnc,offer_id,product_type_id,promo_id,test_data_id,test_data_reason,third_party_id
189
     *                             [metrics] => conversion_time,conversions,revenue_external,revenue_internal
190
     *                             [shardSpec] => Array
191
     *                                 (
192
     *                                     [type] => numbered
193
     *                                     [partitionNum] => 0
194
     *                                     [partitions] => 0
195
     *                                 )
196
     *
197
     *                             [binaryVersion] => 9
198
     *                             [size] => 272709
199
     *                             [identifier] =>
200
     *                             traffic-conversions_2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z_2019-05-15T11:29:56.874Z
201
     *                         )
202
     *
203
     *                     [servers] => Array
204
     *                         (
205
     *                             [0] => 172.31.23.160:8083
206
     *                             [1] => 172.31.3.204:8083
207
     *                         )
208
     *
209
     *                 )
210
     *
211
     *         )
212
     *
213
     * )
214
     *
215
     * @param string $dataSource
216
     * @param string $interval
217
     *
218
     * @return array<string,array<mixed>|string|int>
219
     * @throws \Level23\Druid\Exceptions\QueryResponseException
220
     * @throws \GuzzleHttp\Exception\GuzzleException
221
     */
222 1
    public function interval(string $dataSource, string $interval): array
223
    {
224 1
        $url = $this->client->config('coordinator_url') .
225 1
            '/druid/coordinator/v1/datasources/' . urlencode($dataSource) .
226 1
            '/intervals/' . urlencode($interval);
227
228 1
        return $this->client->executeRawRequest('get', $url, ['full' => '']);
229
    }
230
231
    /**
232
     * Get the columns for the given interval. This will return something like this:
233
     *
234
     *   Array
235
     *  (
236
     *      0 => Array
237
     *          (
238
     *              [field] => __time
239
     *              [type] => LONG
240
     *              [hasMultipleValues] =>
241
     *              [size] => 0
242
     *              [cardinality] =>
243
     *              [minValue] =>
244
     *              [maxValue] =>
245
     *              [errorMessage] =>
246
     *          )
247
     *      1 => Array
248
     *          (
249
     *              [field] => delta
250
     *              [type] => LONG
251
     *              [hasMultipleValues] =>
252
     *              [size] => 0
253
     *              [cardinality] =>
254
     *              [minValue] =>
255
     *              [maxValue] =>
256
     *              [errorMessage] =>
257
     *          )
258
     *      2 => Array
259
     *          (
260
     *              [field] => cityName
261
     *              [type] => STRING
262
     *              [hasMultipleValues] =>
263
     *              [size] => 0
264
     *              [cardinality] => 59
265
     *              [minValue] => af
266
     *              [maxValue] => zm
267
     *              [errorMessage] =>
268
     *          )
269
     *      3 => Array
270
     *          (
271
     *              [field] => comment
272
     *              [type] => STRING
273
     *              [hasMultipleValues] =>
274
     *              [size] => 0
275
     *              [cardinality] => 84
276
     *              [minValue] =>
277
     *              [maxValue] => 74807
278
     *              [errorMessage] =>
279
     *          )
280
     *      4 => Array
281
     *          (
282
     *              [field] => added
283
     *              [type] => LONG
284
     *              [hasMultipleValues] =>
285
     *              [size] => 0
286
     *              [cardinality] =>
287
     *              [minValue] =>
288
     *              [maxValue] =>
289
     *              [errorMessage] =>
290
     *          )
291
     *  )
292
     *
293
     * @param string                             $dataSource
294
     * @param \DateTimeInterface|int|string      $start
295
     * @param \DateTimeInterface|int|string|null $stop
296
     *
297
     * @return array<int,array<string,string>>
298
     * @throws \GuzzleHttp\Exception\GuzzleException
299
     * @throws \Level23\Druid\Exceptions\QueryResponseException
300
     * @throws \Exception
301
     */
302 1
    protected function getColumnsForInterval(
303
        string $dataSource,
304
        DateTimeInterface|int|string $start,
305
        DateTimeInterface|int|string|null $stop = null
306
    ): array {
307 1
        $response = $this->client->query($dataSource)
308 1
            ->interval($start, $stop)
309 1
            ->segmentMetadata();
310
311 1
        $columns = [];
312
313 1
        $rows = $response->data();
314
315 1
        if (isset($rows[0])) {
316
317
            /** @var array<string,array<string,array<string,string>>> $row */
318 1
            $row = $rows[0];
319
320 1
            if (isset($row['columns'])) {
321 1
                array_walk($row['columns'], function ($value, $key) use (&$columns) {
322 1
                    $columns[] = array_merge($value, ['field' => $key]);
323 1
                });
324
            }
325
        }
326
327 1
        return $columns;
328
    }
329
330
    /**
331
     * Return the total number of rows for the given interval
332
     *
333
     * @param string                             $dataSource The name of the dataSource where you want to count the
334
     *                                                       rows for
335
     * @param \DateTimeInterface|int|string      $start      The start of the interval.
336
     * @param \DateTimeInterface|int|string|null $stop       The end of the interval, or null when it was given as a
337
     *                                                       "date/date" interval in the $start parameter.
338
     *
339
     * @return int
340
     * @throws \GuzzleHttp\Exception\GuzzleException
341
     * @throws \Level23\Druid\Exceptions\QueryResponseException
342
     * @throws \Exception
343
     */
344 1
    public function rowCount(
345
        string $dataSource,
346
        DateTimeInterface|int|string $start,
347
        DateTimeInterface|int|string|null $stop = null
348
    ): int {
349 1
        $response = $this->client->query($dataSource)
350 1
            ->interval($start, $stop)
351 1
            ->segmentMetadata();
352
353 1
        $totalRows = 0;
354 1
        foreach ($response->data() as $row) {
355 1
            if (isset($row['numRows'])) {
356 1
                $totalRows += intval($row['numRows']);
357
            }
358
        }
359
360 1
        return $totalRows;
361
    }
362
363
    /**
364
     * Return the druid interval by the shorthand "first" or "last".
365
     *
366
     * We will return something like "2017-01-01T00:00:00.000Z/2017-01-02T00:00:00.000Z"
367
     *
368
     * We will return an empty array when no interval data is found.
369
     *
370
     * @param string $dataSource
371
     * @param string $shortHand
372
     *
373
     * @return string
374
     * @throws \Level23\Druid\Exceptions\QueryResponseException
375
     * @throws \GuzzleHttp\Exception\GuzzleException
376
     */
377 5
    protected function getIntervalByShorthand(string $dataSource, string $shortHand): string
378
    {
379
        // Get the interval which we will use to do a "structure" scan.
380 5
        $shortHand = strtolower($shortHand);
381 5
        if ($shortHand != 'last' && $shortHand != 'first') {
382 1
            throw new InvalidArgumentException('Only shorthand "first" and "last" are supported!');
383
        }
384
385 4
        $rawIntervals = $this->intervals($dataSource);
386
387 4
        $intervals = array_keys($rawIntervals);
388
389 4
        $result = ($shortHand == 'last') ? ($intervals[0] ?? '') : ($intervals[count($intervals) - 1] ?? '');
390
391 4
        if (empty($result)) {
392 2
            $this->client->getLogger()?->warning(
393 2
                'Failed to get ' . $shortHand . ' interval! ' .
394 2
                'We got ' . count($rawIntervals) . ' intervals: ' . var_export($intervals, true)
395 2
            );
396
        }
397
398 4
        return $result;
399
    }
400
401
    /**
402
     * Generate a structure class for the given dataSource.
403
     *
404
     * @param string $dataSource
405
     * @param string $interval "last", "first" or a raw interval string as returned by druid.
406
     *
407
     * @return \Level23\Druid\Metadata\Structure
408
     * @throws \GuzzleHttp\Exception\GuzzleException
409
     * @throws \Level23\Druid\Exceptions\QueryResponseException
410
     */
411 6
    public function structure(string $dataSource, string $interval = 'last'): Structure
412
    {
413
        // shorthand given? Then retrieve the real interval for them.
414 6
        if (in_array(strtolower($interval), ['first', 'last'])) {
415 2
            $interval = $this->getIntervalByShorthand($dataSource, $interval);
416
        }
417
418 6
        if (empty($interval)) {
419 1
            throw new InvalidArgumentException(
420 1
                'Error, interval "' . $interval . '" is invalid. Maybe there are no intervals for this dataSource?'
421 1
            );
422
        }
423
424 5
        $rawStructure = $this->interval($dataSource, $interval);
425
426 5
        $structureData = reset($rawStructure);
427 5
        if (!$structureData || !is_array($structureData)) {
428 2
            throw new QueryResponseException([],
429 2
                'We failed to retrieve a correct structure for dataSource: ' . $dataSource . '.' . PHP_EOL .
430 2
                'Failed to parse raw interval structure data: ' . var_export($rawStructure, true)
431
432 2
            );
433
        }
434
435
        /** @var array<string|string[]> $data */
436 3
        $data = reset($structureData);
437
438 3
        $dimensionFields = explode(',', $data['metadata']['dimensions'] ?? '');
439 3
        $metricFields    = explode(',', $data['metadata']['metrics'] ?? '');
440
441 3
        $dimensions = [];
442 3
        $metrics    = [];
443
444 3
        $columns = $this->getColumnsForInterval($dataSource, $interval);
445
446 3
        foreach ($columns as $info) {
447 3
            $column = $info['field'];
448
449 3
            if (in_array($column, $dimensionFields)) {
450 3
                $dimensions[$column] = $info['type'];
451
            }
452 3
            if (in_array($column, $metricFields)) {
453 3
                $metrics[$column] = $info['type'];
454
            }
455
        }
456
457 3
        return new Structure($dataSource, $dimensions, $metrics);
458
    }
459
460
    /**
461
     * Return a list of all known dataSources
462
     *
463
     * @return array<string>
464
     * @throws \GuzzleHttp\Exception\GuzzleException
465
     * @throws \Level23\Druid\Exceptions\QueryResponseException
466
     */
467 1
    public function dataSources(): array
468
    {
469 1
        $url = $this->client->config('coordinator_url') . '/druid/coordinator/v1/datasources';
470
471
        /** @var array<int,string> $dataSources */
472 1
        $dataSources = $this->client->executeRawRequest('get', $url);
473
474 1
        return $dataSources;
475
    }
476
}