Passed
Pull Request — master (#63)
by Teye
14:30 queued 09:12
created

LookupBuilder::jdbc()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 27
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 13
c 1
b 0
f 0
dl 0
loc 27
ccs 15
cts 15
cp 1
rs 9.8333
cc 1
nc 1
nop 10
crap 1

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Lookups;
5
6
use DateTime;
7
use InvalidArgumentException;
8
use Level23\Druid\DruidClient;
9
use Level23\Druid\Lookups\ParseSpecs\CsvParseSpec;
10
use Level23\Druid\Lookups\ParseSpecs\TsvParseSpec;
11
use Level23\Druid\Lookups\ParseSpecs\ParseSpecInterface;
12
use Level23\Druid\Lookups\ParseSpecs\CustomJsonParseSpec;
13
use Level23\Druid\Lookups\ParseSpecs\SimpleJsonParseSpec;
14
15
/**
16
 * This class provides functionality to fetch lookups, create/update and delete them.
17
 */
18
class LookupBuilder
19
{
20
    protected bool $injective = false;
21
22
    protected int $firstCacheTimeoutMs = 0;
23
24
    protected int|string|null $pollPeriod = null;
25
26
    protected int|null $maxHeapPercentage = null;
27
28
    /**
29
     * @var class-string<\Level23\Druid\Lookups\LookupInterface>|null
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<\Level23\Dr...s\LookupInterface>|null at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<\Level23\Druid\Lookups\LookupInterface>|null.
Loading history...
30
     */
31
    protected ?string $lookupClass = null;
32
33
    /**
34
     * @var array <int,mixed>
35
     */
36
    protected array $parameters = [];
37
38
    protected ?ParseSpecInterface $parseSpec = null;
39
40 35
    public function __construct(protected DruidClient $druidClient)
41
    {
42
43 35
    }
44
45
    /**
46
     * This will create or update a lookup.
47
     * Assign a unique version identifier each time you update a lookup extractor factory. Otherwise, the call will
48
     * fail. If no version was specified, we will automatically use the current date and time as version number.
49
     *
50
     * @see https://druid.apache.org/docs/latest/api-reference/lookups-api
51
     * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global
52
     * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace
53
     *
54
     * @param string      $lookupName
55
     * @param string      $tier
56
     * @param string|null $versionName
57
     *
58
     * @throws \GuzzleHttp\Exception\GuzzleException
59
     * @throws \Level23\Druid\Exceptions\QueryResponseException
60
     */
61 8
    public function store(
62
        string $lookupName,
63
        string $tier = '__default',
64
        string $versionName = null
65
    ): void {
66 8
        if ($this->lookupClass === null) {
67 1
            throw new InvalidArgumentException('No lookup defined. Please define the lookup by using the map, kafka, jdbc, uri or uriPrefix methods!');
68
        }
69
70 7
        if ($this->lookupClass === UriLookup::class || $this->lookupClass === UriPrefixLookup::class) {
71
72 4
            if ($this->parseSpec === null) {
73 1
                throw new InvalidArgumentException('Using an URI lookup, but there is no parseSpec defined! Use the csv, tsv, simpleJson or customJson methods to define the parseSpec.');
74
            }
75
76 3
            $parameters   = $this->parameters;
77 3
            $parameters[] = $this->pollPeriod;
78 3
            $parameters[] = $this->maxHeapPercentage;
79 3
            $parameters[] = $this->injective;
80 3
            $parameters[] = $this->firstCacheTimeoutMs;
81 3
            $lookup       = new $this->lookupClass(
82 3
                $this->parseSpec,
83 3
                ...$parameters
84 3
            );
85 3
        } elseif ($this->lookupClass === JdbcLookup::class) {
86 1
            $parameters   = $this->parameters;
87 1
            $parameters[] = $this->pollPeriod;
88 1
            $parameters[] = $this->maxHeapPercentage;
89 1
            $parameters[] = $this->injective;
90 1
            $parameters[] = $this->firstCacheTimeoutMs;
91
92 1
            $lookup = new $this->lookupClass(...$parameters);
93 2
        } elseif ($this->lookupClass === KafkaLookup::class) {
94 1
            $parameters   = $this->parameters;
95 1
            $parameters[] = $this->injective;
96
97 1
            $lookup = new $this->lookupClass(...$parameters);
98
        } else {
99 1
            $lookup = new $this->lookupClass(
100 1
                ...$this->parameters
101 1
            );
102
        }
103
104 6
        $payload = [
105 6
            'version'                => $versionName ?? (new DateTime())->format('Y-m-d\TH:i:s.000\Z'),
106 6
            'lookupExtractorFactory' => $lookup->toArray(),
107 6
        ];
108
109 6
        $this->druidClient->executeRawRequest(
110 6
            'post',
111 6
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName,
112 6
            $payload,
113 6
        );
114
    }
115
116
    /**
117
     * Return all keys for the given lookup.
118
     *
119
     * @param string $lookupName
120
     *
121
     * @return array<int,int|string|float>
122
     * @throws \GuzzleHttp\Exception\GuzzleException
123
     * @throws \Level23\Druid\Exceptions\QueryResponseException
124
     */
125 1
    public function keys(string $lookupName): array
126
    {
127
        /**
128
         * Druid facilitates an endpoint for the keys:
129
         * <broker>/druid/v1/lookups/introspect/<lookup>/keys
130
         *
131
         * Unfortunately the response is not valid json. Therefore, we cannot use it.
132
         *
133
         * @see https://github.com/apache/druid/issues/17361
134
         */
135 1
        $all = $this->introspect($lookupName);
136
137 1
        return array_keys($all);
138
    }
139
140
    /**
141
     * Return all values for the given lookup.
142
     *
143
     * @param string $lookupName
144
     *
145
     * @return array<int,int|string|float|bool|null>
146
     * @throws \GuzzleHttp\Exception\GuzzleException
147
     * @throws \Level23\Druid\Exceptions\QueryResponseException
148
     */
149 1
    public function values(string $lookupName): array
150
    {
151
        /**
152
         * Druid facilitates an endpoint for the values:
153
         * <broker>/druid/v1/lookups/introspect/<lookup>/values
154
         *
155
         * Unfortunately the response is not valid json. Therefore, we cannot use it.
156
         *
157
         * @see https://github.com/apache/druid/issues/17361
158
         */
159 1
        $all = $this->introspect($lookupName);
160
161 1
        return array_values($all);
162
    }
163
164
    /**
165
     * Return the content of the lookup
166
     *
167
     * @param string $lookupName
168
     *
169
     * @return array<int|string|float,int|string|float|bool|null>
170
     * @throws \GuzzleHttp\Exception\GuzzleException
171
     * @throws \Level23\Druid\Exceptions\QueryResponseException
172
     */
173 1
    public function introspect(string $lookupName): array
174
    {
175
        /** @var array<int|string,int|string> $response */
176 1
        $response = $this->druidClient->executeRawRequest(
177 1
            'get',
178 1
            $this->druidClient->config('broker_url') . '/druid/v1/lookups/introspect/' . $lookupName,
179 1
        );
180
181 1
        return $response;
182
    }
183
184
    /**
185
     * Delete the given lookup in the given tier. When this fails an exception is thrown.
186
     *
187
     * @param string $lookupName
188
     * @param string $tier
189
     *
190
     * @return void
191
     * @throws \GuzzleHttp\Exception\GuzzleException
192
     * @throws \Level23\Druid\Exceptions\QueryResponseException
193
     */
194 2
    public function delete(string $lookupName, string $tier = '__default'): void
195
    {
196 2
        $this->druidClient->executeRawRequest(
197 2
            'delete',
198 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $lookupName,
199 2
        );
200
    }
201
202
    /**
203
     * Return all tiers and all of their lookups in one large configuration array.
204
     *
205
     *
206
     * @return array<string,mixed>
207
     * @throws \GuzzleHttp\Exception\GuzzleException
208
     * @throws \Level23\Druid\Exceptions\QueryResponseException
209
     */
210 1
    public function all(): array
211
    {
212
        /** @var array<string,mixed> $response */
213 1
        $response = $this->druidClient->executeRawRequest(
214 1
            'get',
215 1
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/all',
216 1
        );
217
218 1
        return $response;
219
    }
220
221
    /**
222
     * Return a list of known tier names in the dynamic configuration.
223
     *
224
     * @return array<int,string>
225
     * @throws \GuzzleHttp\Exception\GuzzleException
226
     * @throws \Level23\Druid\Exceptions\QueryResponseException
227
     */
228 2
    public function tiers(bool $discover = true): array
229
    {
230
        /** @var array<int,string> $response */
231 2
        $response = $this->druidClient->executeRawRequest(
232 2
            'get',
233 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config?discover=' . ($discover ? 'true' : 'false'),
234 2
        );
235
236 2
        return $response;
237
    }
238
239
    /**
240
     * For Uri and JDBC lookups it is possible to define a first cache timeout.
241
     * With this method you can set it.
242
     *
243
     * @param int $ms
244
     *
245
     * @return $this
246
     */
247 5
    public function firstCacheTimeout(int $ms): self
248
    {
249 5
        $this->firstCacheTimeoutMs = $ms;
250
251 5
        return $this;
252
    }
253
254
    /**
255
     * For Uri and JDBC lookups it is possible to define if the content is injective or not.
256
     * Injective means that each key item points to a unique value. So each key and value is unique.
257
     * If so, druid kan make internal optimizations.
258
     *
259
     * @param bool $injective
260
     *
261
     * @return $this
262
     */
263 6
    public function injective(bool $injective = true): self
264
    {
265 6
        $this->injective = $injective;
266
267 6
        return $this;
268
    }
269
270
    /**
271
     * Set the polling period for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups.
272
     *
273
     * @param string|int $period Period between polling for updates. For example PT10M for every 10 minutes, or use
274
     *                           milliseconds like 600000. When not given, the data is fetched only once.
275
     *
276
     * @return $this
277
     */
278 5
    public function pollPeriod(int|string $period): self
279
    {
280 5
        $this->pollPeriod = $period;
281
282 5
        return $this;
283
    }
284
285
    /**
286
     * Set the max heap percentage for the lookup to configure. This is applied for JDBC, URI and URIPrefix lookups.
287
     *
288
     * @param int $maxHeapPercentage The maximum percentage of heap size that the lookup should consume. If the lookup
289
     *                               grows beyond this size, warning messages will be logged in the respective service
290
     *                               logs.
291
     *
292
     * @return $this
293
     */
294 5
    public function maxHeapPercentage(int $maxHeapPercentage): self
295
    {
296 5
        $this->maxHeapPercentage = $maxHeapPercentage;
297
298 5
        return $this;
299
    }
300
301
    /**
302
     * Return all lookup names defined under the given tier.
303
     *
304
     * @param string $tier
305
     *
306
     * @return array<int,string>
307
     * @throws \GuzzleHttp\Exception\GuzzleException
308
     * @throws \Level23\Druid\Exceptions\QueryResponseException
309
     */
310 3
    public function names(string $tier = '__default'): array
311
    {
312
        /** @var array<int,string> $response */
313 3
        $response = $this->druidClient->executeRawRequest(
314 3
            'get',
315 3
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier,
316 3
        );
317
318 3
        return $response;
319
    }
320
321
    /**
322
     * Return the lookup as it is currently configured in Druid.
323
     *
324
     * @param string $name
325
     * @param string $tier
326
     *
327
     * @return array<string,mixed>
328
     * @throws \GuzzleHttp\Exception\GuzzleException
329
     * @throws \Level23\Druid\Exceptions\QueryResponseException
330
     */
331 2
    public function get(string $name, string $tier = '__default'): array
332
    {
333
        /** @var array<string,mixed> $response */
334 2
        $response = $this->druidClient->executeRawRequest(
335 2
            'get',
336 2
            $this->druidClient->config('coordinator_url') . '/druid/coordinator/v1/lookups/config/' . $tier . '/' . $name,
337 2
        );
338
339 2
        return $response;
340
    }
341
342
    /**
343
     * Configure a map lookup.
344
     *
345
     * @param array<int|float|string,int|string|float|bool|null> $map
346
     *
347
     * @return \Level23\Druid\Lookups\LookupBuilder
348
     */
349 2
    public function map(array $map): self
350
    {
351 2
        $this->lookupClass = MapLookup::class;
352 2
        $this->parameters  = [$map];
353
354 2
        return $this;
355
    }
356
357
    /**
358
     * Configure a kafka lookup.
359
     *
360
     * @see https://druid.apache.org/docs/latest/querying/kafka-extraction-namespace
361
     *
362
     * @param string                   $kafkaTopic      The Kafka topic to read the data from
363
     * @param string|array<int,string> $servers         The kafka server(s), for example ["kafka1.service:9092",
364
     *                                                  "kafka2.service:9092"]
365
     * @param array<string,scalar>     $kafkaProperties Other optional kafka properties.
366
     * @param int                      $connectTimeout  How long to wait for an initial connection
367
     *
368
     * @return $this
369
     */
370 2
    public function kafka(
371
        string $kafkaTopic,
372
        string|array $servers,
373
        array $kafkaProperties = [],
374
        int $connectTimeout = 0
375
    ): LookupBuilder {
376 2
        $this->lookupClass = KafkaLookup::class;
377 2
        $this->parameters  = [$kafkaTopic, $servers, $kafkaProperties, $connectTimeout];
378
379 2
        return $this;
380
    }
381
382
    /**
383
     * Configure a new JDBC lookup.
384
     *
385
     * @see https://druid.apache.org/docs/latest/querying/lookups-cached-global/#jdbc-lookup
386
     *
387
     * @param string      $connectUri             The URI where to connect to. For example
388
     *                                            "jdbc:mysql://localhost:3306/druid"
389
     * @param string|null $username               The username for the connection, or null when not used.
390
     * @param string|null $password               The password for the connection, or null when not used.
391
     * @param string      $table                  The table where to retrieve the data from.
392
     * @param string      $keyColumn              The column from the table which is used as key for the lookup.
393
     * @param string      $valueColumn            The column from the table which is used as value from the lookup.
394
     * @param string|null $filter                 Specify a filter (like a where statement) which should be used in the
395
     *                                            query to fetch the data from the database.
396
     * @param string|null $tsColumn               Specify a column which contains a datetime. Druid will use this to
397
     *                                            only fetch rows from the database which have been changed since the
398
     *                                            last poll request. This reduces database load and is highly
399
     *                                            recommended!
400
     * @param int|null    $jitterSeconds          How much jitter to add (in seconds) up to maximum as a delay (actual
401
     *                                            value will be used as random from 0 to jitterSeconds), used to
402
     *                                            distribute db load more evenly.
403
     * @param int|null    $loadTimeoutSeconds     How much time (in seconds) it can take to query and populate lookup
404
     *                                            values. It will be helpful in lookup updates. On lookup update, it
405
     *                                            will wait maximum of loadTimeoutSeconds for new lookup to come up and
406
     *                                            continue serving from old lookup until new lookup successfully loads.
407
     *
408
     * @return $this
409
     */
410 2
    public function jdbc(
411
        string $connectUri,
412
        string|null $username,
413
        string|null $password,
414
        string $table,
415
        string $keyColumn,
416
        string $valueColumn,
417
        ?string $filter = null,
418
        ?string $tsColumn = null,
419
        ?int $jitterSeconds = null,
420
        ?int $loadTimeoutSeconds = null
421
    ): self {
422 2
        $this->lookupClass = JdbcLookup::class;
423 2
        $this->parameters  = [
424 2
            $connectUri,
425 2
            $username,
426 2
            $password,
427 2
            $table,
428 2
            $keyColumn,
429 2
            $valueColumn,
430 2
            $filter,
431 2
            $tsColumn,
432 2
            $jitterSeconds,
433 2
            $loadTimeoutSeconds,
434 2
        ];
435
436 2
        return $this;
437
    }
438
439
    /**
440
     * Configure a new URI lookup. Do not forget to specify the file specification by calling the `csv`, `tsv`, `json`
441
     * or `customJson` methods.
442
     *
443
     * @param string $uri URI for the lookup file. Can be a file, HDFS, S3 or GCS path.
444
     *
445
     * @return $this
446
     */
447 5
    public function uri(string $uri): self
448
    {
449 5
        $this->lookupClass = UriLookup::class;
450 5
        $this->parameters  = [$uri];
451
452 5
        return $this;
453
    }
454
455
    /**
456
     * Configure a new URI lookup for files matching a given pattern.
457
     *
458
     * Do not forget to specify the file specification by calling the `csv`, `tsv`, `json`
459
     * or `customJson` methods.
460
     *
461
     * @param string      $uriPrefix A URI prefix that specifies a directory or other searchable resource where lookup
462
     *                               files are located
463
     * @param string|null $fileRegex Regex for matching the file name under uriPrefix, for example "*.json"
464
     *
465
     * @return $this
466
     */
467 1
    public function uriPrefix(string $uriPrefix, ?string $fileRegex = null): self
468
    {
469 1
        $this->lookupClass = UriPrefixLookup::class;
470 1
        $this->parameters  = [$uriPrefix, $fileRegex];
471
472 1
        return $this;
473
    }
474
475
    /**
476
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a CSV file.
477
     * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you
478
     * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column
479
     * information from the third line.
480
     *
481
     * @param array<int,string>|null $columns        The list of columns in the csv file, or use null and set
482
     *                                               $hasHeaderRow to true to fetch it automatically.
483
     * @param string|null            $keyColumn      The name of the column containing the key
484
     * @param string|null            $valueColumn    The name of the column containing the value
485
     * @param bool                   $hasHeaderRow   Set to true to indicate that column information can be extracted
486
     *                                               from the input files' header row
487
     * @param int                    $skipHeaderRows Number of header rows to be skipped
488
     *
489
     * @return $this
490
     */
491 2
    public function csv(
492
        ?array $columns = null,
493
        ?string $keyColumn = null,
494
        ?string $valueColumn = null,
495
        bool $hasHeaderRow = false,
496
        int $skipHeaderRows = 0
497
    ): LookupBuilder {
498
499 2
        $this->parseSpec = new CsvParseSpec($columns, $keyColumn, $valueColumn, $hasHeaderRow, $skipHeaderRows);
500
501 2
        return $this;
502
    }
503
504
    /**
505
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file.
506
     *
507
     * @param string $keyFieldName   The field name of the key
508
     * @param string $valueFieldName The field name of the value
509
     *
510
     * @return $this
511
     */
512 1
    public function customJson(
513
        string $keyFieldName,
514
        string $valueFieldName
515
    ): LookupBuilder {
516 1
        $this->parseSpec = new CustomJsonParseSpec($keyFieldName, $valueFieldName);
517
518 1
        return $this;
519
    }
520
521
    /**
522
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a JSON file containing key => value
523
     * items. For example:
524
     *
525
     * ```
526
     * {"foo": "bar"}
527
     * {"baz": "bat"}
528
     * {"buck": "truck"}
529
     * ```
530
     *
531
     * @return $this
532
     */
533 4
    public function json(): LookupBuilder
534
    {
535 4
        $this->parseSpec = new SimpleJsonParseSpec();
536
537 4
        return $this;
538
    }
539
540
    /**
541
     * Specify that the file which is being parsed by a URI or URIPrefix lookup is a TSV file.
542
     *
543
     * If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you
544
     * set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column
545
     * information from the third line.
546
     *
547
     * @param array<int,string>|null $columns        The list of columns in the TSV file, or use null and set
548
     *                                               $hasHeaderRow to true to fetch it automatically.
549
     * @param string|null            $keyColumn      The name of the column containing the key
550
     * @param string|null            $valueColumn    The name of the column containing the value
551
     * @param string                 $delimiter      The delimiter in the file
552
     * @param string                 $listDelimiter  The list delimiter in the file
553
     * @param bool                   $hasHeaderRow   Set to true to indicate that column information can be extracted
554
     *                                               from the input files' header row
555
     * @param int                    $skipHeaderRows Number of header rows to be skipped
556
     *
557
     * @return $this
558
     */
559 2
    public function tsv(
560
        ?array $columns,
561
        ?string $keyColumn = null,
562
        ?string $valueColumn = null,
563
        string $delimiter = "\t",
564
        string $listDelimiter = "\x01",
565
        bool $hasHeaderRow = false,
566
        int $skipHeaderRows = 0
567
    ): LookupBuilder {
568 2
        $this->parseSpec = new TsvParseSpec(
569 2
            $columns,
570 2
            $keyColumn,
571 2
            $valueColumn,
572 2
            $delimiter,
573 2
            $listDelimiter,
574 2
            $hasHeaderRow,
575 2
            $skipHeaderRows
576 2
        );
577
578 2
        return $this;
579
    }
580
}