Issues (52)

src/Lookups/LookupBuilder.php (1 issue)

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Lookups;
5
6
use DateTime;
7
use InvalidArgumentException;
8
use Level23\Druid\DruidClient;
9
use Level23\Druid\Lookups\ParseSpecs\CsvParseSpec;
10
use Level23\Druid\Lookups\ParseSpecs\TsvParseSpec;
11
use Level23\Druid\Lookups\ParseSpecs\ParseSpecInterface;
12
use Level23\Druid\Lookups\ParseSpecs\CustomJsonParseSpec;
13
use Level23\Druid\Lookups\ParseSpecs\SimpleJsonParseSpec;
14
15
/**
16
 * This class provides functionality to fetch lookups, create/update and delete them.
17
 */
18
class LookupBuilder
19
{
20
    protected bool $injective = false;
21
22
    protected int $firstCacheTimeoutMs = 0;
23
24
    protected int|string|null $pollPeriod = null;
25
26
    protected int|null $maxHeapPercentage = null;
27
28
    /**
29
     * @var class-string<\Level23\Druid\Lookups\LookupInterface>|null
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<\Level23\Dr...s\LookupInterface>|null at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<\Level23\Druid\Lookups\LookupInterface>|null.
Loading history...
30
     */
31
    protected ?string $lookupClass = null;
32
33
    /**
34
     * @var array <int,mixed>
35
     */
36
    protected array $parameters = [];
37
38
    protected ?ParseSpecInterface $parseSpec = null;
39
40 35
    public function __construct(protected DruidClient $druidClient)
41
    {
42
43 35
    }
44
45
    /**
46
     * This will create or update a lookup.
47
     * Assign a unique version identifier each time you update a lookup extractor factory. Otherwise, the call will
48
     * fail. If no version was specified, we will automatically use the current date and time as version number.
49
     *
50
     * @see https://druid.apache.org/docs/latest/api-reference/lookups-api
51
     * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global
52
     * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace
53
     *
54
     * @param string      $lookupName
55
     * @param string      $tier
56
     * @param string|null $versionName
57
     *
58
     * @throws \GuzzleHttp\Exception\GuzzleException
59
     * @throws \Level23\Druid\Exceptions\QueryResponseException
60
     */
61 8
    public function store(
62
        string $lookupName,
63
        string $tier = '__default',
64
        ?string $versionName = null
65
    ): void {
66 8
        if ($this->lookupClass === null) {
67 1
            throw new InvalidArgumentException('No lookup defined. Please define the lookup by using the map, kafka, jdbc, uri or uriPrefix methods!');
68
        }
69
70 7
        if ($this->lookupClass === UriLookup::class || $this->lookupClass === UriPrefixLookup::class) {
71
72 4
            if ($this->parseSpec === null) {
73 1
                throw new InvalidArgumentException('Using an URI lookup, but there is no parseSpec defined! Use the csv, tsv, simpleJson or customJson methods to define the parseSpec.');
74
            }
75
76 3
            $parameters   = $this->parameters;
77 3
            $parameters[] = $this->pollPeriod;
78 3
            $parameters[] = $this->maxHeapPercentage;
79 3
            $parameters[] = $this->injective;
80 3
            $parameters[] = $this->firstCacheTimeoutMs;
81 3
            $lookup       = new $this->lookupClass(
82 3
                $this->parseSpec,
83 3
                ...$parameters
84 3
            );
85 3
        } elseif ($this->lookupClass === JdbcLookup::class) {
86 1
            $parameters   = $this->parameters;
87 1
            $parameters[] = $this->pollPeriod;
88 1
            $parameters[] = $this->maxHeapPercentage;
89 1
            $parameters[] = $this->injective;
90 1
            $parameters[] = $this->firstCacheTimeoutMs;
91
92 1
            $lookup = new $this->lookupClass(...$parameters);
93 2
        } elseif ($this->lookupClass === KafkaLookup::class) {
94 1
            $parameters   = $this->parameters;
95 1
            $parameters[] = $this->injective;
96
97 1
            $lookup = new $this->lookupClass(...$parameters);
98
        } else {
99 1
            $lookup = new $this->lookupClass(
100 1
                ...$this->parameters
101 1
            );
102
        }
103
104 6
        $payload = [
105 6
            'version'                => $versionName ?? (new DateTime())->format('Y-m-d\TH:i:s.000\Z'),
106 6
            'lookupExtractorFactory' => $lookup->toArray(),
107 6
        ];
108
109 6
        $this->druidClient->executeRawRequest(
110 6
            'post',
111 6
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName,
112 6
            $payload,
113 6
        );
114
    }
115
116
    /**
117
     * Return all keys for the given lookup.
118
     *
119
     * @param string $lookupName
120
     *
121
     * @return array<int,int|string|float>
122
     * @throws \GuzzleHttp\Exception\GuzzleException
123
     * @throws \Level23\Druid\Exceptions\QueryResponseException
124
     */
125 1
    public function keys(string $lookupName): array
126
    {
127
        /**
128
         * Druid facilitates an endpoint for the keys:
129
         * <broker>/druid/v1/lookups/introspect/<lookup>/keys
130
         *
131
         * Unfortunately the response is not valid json. Therefore, we cannot use it.
132
         *
133
         * @see https://github.com/apache/druid/issues/17361
134
         */
135 1
        $all = $this->introspect($lookupName);
136
137 1
        return array_keys($all);
138
    }
139
140
    /**
141
     * Return all values for the given lookup.
142
     *
143
     * @param string $lookupName
144
     *
145
     * @return array<int,int|string|float|bool|null>
146
     * @throws \GuzzleHttp\Exception\GuzzleException
147
     * @throws \Level23\Druid\Exceptions\QueryResponseException
148
     */
149 1
    public function values(string $lookupName): array
150
    {
151
        /**
152
         * Druid facilitates an endpoint for the values:
153
         * <broker>/druid/v1/lookups/introspect/<lookup>/values
154
         *
155
         * Unfortunately the response is not valid json. Therefore, we cannot use it.
156
         *
157
         * @see https://github.com/apache/druid/issues/17361
158
         */
159 1
        $all = $this->introspect($lookupName);
160
161 1
        return array_values($all);
162
    }
163
164
    /**
165
     * Return the content of the lookup
166
     *
167
     * @param string $lookupName
168
     *
169
     * @return array<int|string|float,int|string|float|bool|null>
170
     * @throws \GuzzleHttp\Exception\GuzzleException
171
     * @throws \Level23\Druid\Exceptions\QueryResponseException
172
     */
173 1
    public function introspect(string $lookupName): array
174
    {
175
        /** @var array<int|string,int|string> $response */
176 1
        $response = $this->druidClient->executeRawRequest(
177 1
            'get',
178 1
            $this->druidClient->config('broker_url') . '/druid/v1/lookups/introspect/' . $lookupName,
179 1
        );
180
181 1
        return $response;
182
    }
183
184
    /**
185
     * Delete the given lookup in the given tier. When this fails an exception is thrown.
186
     *
187
     * @param string $lookupName
188
     * @param string $tier
189
     *
190
     * @return void
191
     * @throws \GuzzleHttp\Exception\GuzzleException
192
     * @throws \Level23\Druid\Exceptions\QueryResponseException
193
     */
194 2
    public function delete(string $lookupName, string $tier = '__default'): void
195
    {
196 2
        $this->druidClient->executeRawRequest(
197 2
            'delete',
198 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName,
199 2
        );
200
    }
201
202
    /**
203
     * Return all tiers and all of their lookups in one large configuration array.
204
     *
205
     *
206
     * @return array<string,mixed>
207
     * @throws \GuzzleHttp\Exception\GuzzleException
208
     * @throws \Level23\Druid\Exceptions\QueryResponseException
209
     */
210 1
    public function all(): array
211
    {
212
        /** @var array<string,mixed> $response */
213 1
        $response = $this->druidClient->executeRawRequest(
214 1
            'get',
215 1
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/all',
216 1
        );
217
218 1
        return $response;
219
    }
220
221
    /**
222
     * Return a list of known tier names in the dynamic configuration.
223
     *
224
     * @return array<int,string>
225
     * @throws \GuzzleHttp\Exception\GuzzleException
226
     * @throws \Level23\Druid\Exceptions\QueryResponseException
227
     */
228 2
    public function tiers(bool $discover = true): array
229
    {
230
        /** @var array<int,string> $response */
231 2
        $response = $this->druidClient->executeRawRequest(
232 2
            'get',
233 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config?discover=' . ($discover ? 'true' : 'false'),
234 2
        );
235
236 2
        return $response;
237
    }
238
239
    /**
240
     * For Uri and JDBC lookups it is possible to define a first cache timeout.
241
     * With this method you can set it.
242
     *
243
     * @param int $ms
244
     *
245
     * @return $this
246
     */
247 5
    public function firstCacheTimeout(int $ms): self
248
    {
249 5
        $this->firstCacheTimeoutMs = $ms;
250
251 5
        return $this;
252
    }
253
254
    /**
255
     * For Uri and JDBC lookups it is possible to define if the content is injective or not.
256
     * Injective means that each key item points to a unique value. So each key and value is unique.
257
     * If so, druid kan make internal optimizations.
258
     *
259
     * @param bool $injective
260
     *
261
     * @return $this
262
     */
263 6
    public function injective(bool $injective = true): self
264
    {
265 6
        $this->injective = $injective;
266
267 6
        return $this;
268
    }
269
270
    /**
271
     * Set the polling period for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups.
272
     *
273
     * @param string|int $period Period between polling for updates. For example PT10M for every 10 minutes, or use
274
     *                           milliseconds like 600000. When not given, the data is fetched only once.
275
     *
276
     * @return $this
277
     */
278 5
    public function pollPeriod(int|string $period): self
279
    {
280 5
        $this->pollPeriod = $period;
281
282 5
        return $this;
283
    }
284
285
    /**
286
     * Set the max heap percentage for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups.
287
     *
288
     * @param int $maxHeapPercentage The maximum percentage of heap size that the lookup should consume. If the lookup
289
     *                               grows beyond this size, warning messages will be logged in the respective service
290
     *                               logs.
291
     *
292
     * @return $this
293
     */
294 5
    public function maxHeapPercentage(int $maxHeapPercentage): self
295
    {
296 5
        $this->maxHeapPercentage = $maxHeapPercentage;
297
298 5
        return $this;
299
    }
300
301
    /**
302
     * Return all lookup names defined under the given tier.
303
     *
304
     * @param string $tier
305
     *
306
     * @return array<int,string>
307
     * @throws \GuzzleHttp\Exception\GuzzleException
308
     * @throws \Level23\Druid\Exceptions\QueryResponseException
309
     */
310 3
    public function names(string $tier = '__default'): array
311
    {
312
        /** @var array<int,string> $response */
313 3
        $response = $this->druidClient->executeRawRequest(
314 3
            'get',
315 3
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier,
316 3
        );
317
318 3
        return $response;
319
    }
320
321
    /**
322
     * Return the lookup as it is currently configured in Druid.
323
     *
324
     * @param string $name
325
     * @param string $tier
326
     *
327
     * @return array<string,mixed>
328
     * @throws \GuzzleHttp\Exception\GuzzleException
329
     * @throws \Level23\Druid\Exceptions\QueryResponseException
330
     */
331 2
    public function get(string $name, string $tier = '__default'): array
332
    {
333
        /** @var array<string,mixed> $response */
334 2
        $response = $this->druidClient->executeRawRequest(
335 2
            'get',
336 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $name,
337 2
        );
338
339 2
        return $response;
340
    }
341
342
    /**
343
     * Configure a map lookup.
344
     *
345
     * @param array<int|float|string,int|string|float|bool|null> $map
346
     *
347
     * @return \Level23\Druid\Lookups\LookupBuilder
348
     */
349 2
    public function map(array $map): self
350
    {
351 2
        $this->lookupClass = MapLookup::class;
352 2
        $this->parameters  = [$map];
353
354 2
        return $this;
355
    }
356
357
    /**
358
     * Configure a kafka lookup.
359
     *
360
     * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace
361
     *
362
     * @param string                   $kafkaTopic      The Kafka topic to read the data from
363
     * @param string|array<int,string> $servers         The kafka server(s), for example ["kafka1.service:9092",
364
     *                                                  "kafka2.service:9092"]
365
     * @param array<string,scalar>     $kafkaProperties Other optional kafka properties.
366
     * @param int                      $connectTimeout  How long to wait for an initial connection
367
     *
368
     * @return $this
369
     */
370 2
    public function kafka(
371
        string $kafkaTopic,
372
        string|array $servers,
373
        array $kafkaProperties = [],
374
        int $connectTimeout = 0
375
    ): LookupBuilder {
376 2
        $this->lookupClass = KafkaLookup::class;
377 2
        $this->parameters  = [$kafkaTopic, $servers, $kafkaProperties, $connectTimeout];
378
379 2
        return $this;
380
    }
381
382
    /**
383
     * Configure a new JDBC lookup.
384
     *
385
     * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global/#jdbc-lookup
386
     *
387
     * @param string      $connectUri             The URI where to connect to. For example
388
     *                                            "jdbc:mysql://localhost:3306/druid"
389
     * @param string|null $username               The username for the connection, or null when not used.
390
     * @param string|null $password               The password for the connection, or null when not used.
391
     * @param string      $table                  The table where to retrieve the data from.
392
     * @param string      $keyColumn              The column from the table which is used as key for the lookup.
393
     * @param string      $valueColumn            The column from the table which is used as value from the lookup.
394
     * @param string|null $filter                 Specify a filter (like a where statement) which should be used in the
395
     *                                            query to fetch the data from the database.
396
     * @param string|null $tsColumn               Specify a column which contains a datetime. Druid will use this to
397
     *                                            only fetch rows from the database which have been changed since the
398
     *                                            last poll request. This reduces database load and is highly
399
     *                                            recommended!
400
     * @param int|null    $jitterSeconds          How much jitter to add (in seconds) up to maximum as a delay (actual
401
     *                                            value will be used as random from 0 to jitterSeconds), used to
402
     *                                            distribute db load more evenly.
403
     * @param int|null    $loadTimeoutSeconds     How much time (in seconds) it can take to query and populate lookup
404
     *                                            values. It will be helpful in lookup updates. On lookup update, it
405
     *                                            will wait maximum of loadTimeoutSeconds for new lookup to come up and
406
     *                                            continue serving from old lookup until new lookup successfully loads.
407
     *
408
     * @return $this
409
     */
410 2
    public function jdbc(
411
        string $connectUri,
412
        string|null $username,
413
        string|null $password,
414
        string $table,
415
        string $keyColumn,
416
        string $valueColumn,
417
        ?string $filter = null,
418
        ?string $tsColumn = null,
419
        ?int $jitterSeconds = null,
420
        ?int $loadTimeoutSeconds = null
421
    ): self {
422 2
        $this->lookupClass = JdbcLookup::class;
423 2
        $this->parameters  = [
424 2
            $connectUri,
425 2
            $username,
426 2
            $password,
427 2
            $table,
428 2
            $keyColumn,
429 2
            $valueColumn,
430 2
            $filter,
431 2
            $tsColumn,
432 2
            $jitterSeconds,
433 2
            $loadTimeoutSeconds,
434 2
        ];
435
436 2
        return $this;
437
    }
438
439
    /**
440
     * Configure a new URI lookup. Do not forget to specify the file specification by calling the `csv`, `tsv`, `json`
441
     * or `customJson` methods.
442
     *
443
     * @param string $uri URI for the lookup file. Can be a file, HDFS, S3 or GCS path.
444
     *
445
     * @return $this
446
     */
447 5
    public function uri(string $uri): self
448
    {
449 5
        $this->lookupClass = UriLookup::class;
450 5
        $this->parameters  = [$uri];
451
452 5
        return $this;
453
    }
454
455
    /**
456
     * Configure a new URI lookup for files matching a given pattern.
457
     *
458
     * Do not forget to specify the file specification by calling the `csv`, `tsv`, `json`
459
     * or `customJson` methods.
460
     *
461
     * @param string      $uriPrefix A URI prefix that specifies a directory or other searchable resource where lookup
462
     *                               files are located
463
     * @param string|null $fileRegex Regex for matching the file name under uriPrefix, for example "*.json"
464
     *
465
     * @return $this
466
     */
467 1
    public function uriPrefix(string $uriPrefix, ?string $fileRegex = null): self
468
    {
469 1
        $this->lookupClass = UriPrefixLookup::class;
470 1
        $this->parameters  = [$uriPrefix, $fileRegex];
471
472 1
        return $this;
473
    }
474
475
    /**
476
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a CSV file.
477
     * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you
478
     * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column
479
     * information from the third line.
480
     *
481
     * @param array<int,string>|null $columns        The list of columns in the csv file, or use null and set
482
     *                                               $hasHeaderRow to true to fetch it automatically.
483
     * @param string|null            $keyColumn      The name of the column containing the key
484
     * @param string|null            $valueColumn    The name of the column containing the value
485
     * @param bool                   $hasHeaderRow   Set to true to indicate that column information can be extracted
486
     *                                               from the input files' header row
487
     * @param int                    $skipHeaderRows Number of header rows to be skipped
488
     *
489
     * @return $this
490
     */
491 2
    public function csv(
492
        ?array $columns,
493
        ?string $keyColumn = null,
494
        ?string $valueColumn = null,
495
        bool $hasHeaderRow = false,
496
        int $skipHeaderRows = 0
497
    ): LookupBuilder {
498
499 2
        $this->parseSpec = new CsvParseSpec($columns, $keyColumn, $valueColumn, $hasHeaderRow, $skipHeaderRows);
500
501 2
        return $this;
502
    }
503
504
    /**
505
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file.
506
     *
507
     * @param string $keyFieldName   The field name of the key
508
     * @param string $valueFieldName The field name of the value
509
     *
510
     * @return $this
511
     */
512 1
    public function customJson(
513
        string $keyFieldName,
514
        string $valueFieldName
515
    ): LookupBuilder {
516 1
        $this->parseSpec = new CustomJsonParseSpec($keyFieldName, $valueFieldName);
517
518 1
        return $this;
519
    }
520
521
    /**
522
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file containing key => value
523
     * items. For example:
524
     *
525
     * ```
526
     * {"foo": "bar"}
527
     * {"baz": "bat"}
528
     * {"buck": "truck"}
529
     * ```
530
     *
531
     * @return $this
532
     */
533 4
    public function json(): LookupBuilder
534
    {
535 4
        $this->parseSpec = new SimpleJsonParseSpec();
536
537 4
        return $this;
538
    }
539
540
    /**
541
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a TSV file.
542
     *
543
     * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you
544
     * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column
545
     * information from the third line.
546
     *
547
     * @param array<int,string>|null $columns        The list of columns in the TSV file, or use null and set
548
     *                                               $hasHeaderRow to true to fetch it automatically.
549
     * @param string|null            $keyColumn      The name of the column containing the key
550
     * @param string|null            $valueColumn    The name of the column containing the value
551
     * @param string                 $delimiter      The delimiter in the file
552
     * @param string                 $listDelimiter  The list delimiter in the file
553
     * @param bool                   $hasHeaderRow   Set to true to indicate that column information can be extracted
554
     *                                               from the input files' header row
555
     * @param int                    $skipHeaderRows Number of header rows to be skipped
556
     *
557
     * @return $this
558
     */
559 2
    public function tsv(
560
        ?array $columns,
561
        ?string $keyColumn = null,
562
        ?string $valueColumn = null,
563
        string $delimiter = "\t",
564
        string $listDelimiter = "\x01",
565
        bool $hasHeaderRow = false,
566
        int $skipHeaderRows = 0
567
    ): LookupBuilder {
568 2
        $this->parseSpec = new TsvParseSpec(
569 2
            $columns,
570 2
            $keyColumn,
571 2
            $valueColumn,
572 2
            $delimiter,
573 2
            $listDelimiter,
574 2
            $hasHeaderRow,
575 2
            $skipHeaderRows
576 2
        );
577
578 2
        return $this;
579
    }
580
}