Passed
Pull Request — master (#38)
by Teye
14:51
created

IndexTaskBuilder::inputSource()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
c 0
b 0
f 0
dl 0
loc 5
ccs 3
cts 3
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Tasks;
5
6
use Closure;
7
use InvalidArgumentException;
8
use Level23\Druid\DruidClient;
9
use Level23\Druid\Types\DataType;
10
use Level23\Druid\Context\TaskContext;
11
use Level23\Druid\Concerns\HasInterval;
12
use Level23\Druid\Concerns\HasAggregations;
13
use Level23\Druid\Concerns\HasTuningConfig;
14
use Level23\Druid\Transforms\TransformSpec;
15
use Level23\Druid\Dimensions\TimestampSpec;
16
use Level23\Druid\InputFormats\FlattenSpec;
17
use Level23\Druid\Types\MultiValueHandling;
18
use Level23\Druid\Transforms\TransformBuilder;
19
use Level23\Druid\Dimensions\SpatialDimension;
20
use Level23\Druid\InputFormats\CsvInputFormat;
21
use Level23\Druid\InputFormats\TsvInputFormat;
22
use Level23\Druid\InputFormats\OrcInputFormat;
23
use Level23\Druid\Concerns\HasQueryGranularity;
24
use Level23\Druid\InputFormats\JsonInputFormat;
25
use Level23\Druid\InputSources\DruidInputSource;
26
use Level23\Druid\Collections\IntervalCollection;
27
use Level23\Druid\Concerns\HasSegmentGranularity;
28
use Level23\Druid\Collections\TransformCollection;
29
use Level23\Druid\InputFormats\ParquetInputFormat;
30
use Level23\Druid\Granularities\UniformGranularity;
31
use Level23\Druid\InputFormats\ProtobufInputFormat;
32
use Level23\Druid\Collections\AggregationCollection;
33
use Level23\Druid\InputSources\InputSourceInterface;
34
use Level23\Druid\InputFormats\InputFormatInterface;
35
use Level23\Druid\Granularities\ArbitraryGranularity;
36
use Level23\Druid\Collections\SpatialDimensionCollection;
37
38
class IndexTaskBuilder extends TaskBuilder
39
{
40
    use HasSegmentGranularity, HasQueryGranularity, HasInterval, HasTuningConfig, HasAggregations;
41
42
    /**
43
     * @var array<array<string,string|bool>>
44
     */
45
    protected array $dimensions = [];
46
47
    protected SpatialDimensionCollection $spatialDimensions;
48
49
    /**
50
     * The data source where we will write to.
51
     *
52
     * @var string
53
     */
54
    protected string $dataSource;
55
56
    protected ?InputSourceInterface $inputSource;
57
58
    protected bool $rollup = false;
59
60
    /**
61
     * Whether this task should be executed parallel.
62
     *
63
     * @var bool
64
     */
65
    protected bool $parallel = false;
66
67
    /**
68
     * @var TransformSpec|null
69
     */
70
    protected ?TransformSpec $transformSpec = null;
71
72
    /**
73
     * @var TimestampSpec|null
74
     */
75
    protected ?TimestampSpec $timestampSpec = null;
76
77
    /**
78
     * Here we remember which type of granularity we want.
79
     * By default, this is UniformGranularity.
80
     *
81
     * @var string
82
     */
83
    protected string $granularityType = UniformGranularity::class;
84
85
    /**
86
     * @var \Level23\Druid\InputFormats\InputFormatInterface|null
87
     */
88
    protected ?InputFormatInterface $inputFormat = null;
89
90
    /**
91
     * @var bool
92
     */
93
    protected bool $appendToExisting = false;
94
95
    /**
96
     * IndexTaskBuilder constructor.
97
     *
98
     * @param DruidClient               $druidClient
99
     * @param string                    $toDataSource Data source where the data will be imported in.
100
     * @param InputSourceInterface|null $inputSource
101
     */
102 33
    public function __construct(
103
        DruidClient $druidClient,
104
        string $toDataSource,
105
        ?InputSourceInterface $inputSource = null
106
    ) {
107 33
        $this->client            = $druidClient;
108 33
        $this->dataSource        = $toDataSource;
109 33
        $this->inputSource       = $inputSource;
110 33
        $this->spatialDimensions = new SpatialDimensionCollection();
111
    }
112
113
    /**
114
     * Add a dimension.
115
     *
116
     * @param string $name
117
     * @param string $type
118
     *
119
     * @return $this
120
     */
121 1
    public function dimension(string $name, string $type = DataType::STRING): IndexTaskBuilder
122
    {
123 1
        $this->dimensions[] = ['name' => $name, 'type' => DataType::validate($type)];
124
125 1
        return $this;
126
    }
127
128
    /**
129
     * Add a multi-value dimension.
130
     *
131
     * @param string $name
132
     * @param string $type
133
     * @param string $multiValueHandling $type
134
     * @param bool   $createBitmapIndex
135
     *
136
     * @return $this
137
     */
138 2
    public function multiValueDimension(
139
        string $name,
140
        string $type = DataType::STRING,
141
        string $multiValueHandling = MultiValueHandling::SORTED_ARRAY,
142
        bool $createBitmapIndex = true
143
    ): IndexTaskBuilder {
144 2
        $this->dimensions[] = [
145
            'name'               => $name,
146 2
            'type'               => DataType::validate($type),
147 2
            'multiValueHandling' => MultiValueHandling::validate($multiValueHandling),
148
            'createBitmapIndex'  => $createBitmapIndex,
149
        ];
150
151 2
        return $this;
152
    }
153
154
    /**
155
     * Add a spatial dimension.
156
     *
157
     * @param string   $name Name of the dimension.
158
     * @param string[] $dims Field names where latitude,longitude data are read from.
159
     *
160
     * @return $this
161
     */
162 1
    public function spatialDimension(string $name, array $dims): IndexTaskBuilder
163
    {
164 1
        $this->spatialDimensions->add(new SpatialDimension($name, $dims));
165
166 1
        return $this;
167
    }
168
169
    /**
170
     * Enable append mode. When this is set, we will add the data retrieved from the firehose to the segments, instead
171
     * of overwriting the data in the segments.
172
     *
173
     * @return $this
174
     * @deprecated Use appendToExisting() instead.
175
     */
176 1
    public function append(): IndexTaskBuilder
177
    {
178 1
        $this->appendToExisting();
179
180 1
        return $this;
181
    }
182
183
    /**
184
     * @param string      $column       Input row field to read the primary timestamp from. Regardless of the name of
185
     *                                  this input field, the primary timestamp will always be stored as a column named
186
     *                                  __time in your Druid datasource.
187
     * @param string      $format       Timestamp format. Options are:
188
     *                                  - iso: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"
189
     *                                  - posix: seconds since epoch
190
     *                                  - millis: milliseconds since epoch
191
     *                                  - micro: microseconds since epoch
192
     *                                  - nano: nanoseconds since epoch
193
     *                                  - auto: automatically detects ISO (either 'T' or space separator) or millis
194
     *                                  format
195
     *                                  - any Joda DateTimeFormat string
196
     * @param null|string $missingValue Timestamp to use for input records that have a null or missing timestamp
197
     *                                  column. Should be in ISO8601 format, like "2000-01-01T01:02:03.456", even if
198
     *                                  you have specified something else for format. Since Druid requires a primary
199
     *                                  timestamp, this setting can be useful for ingesting datasets that do not have
200
     *                                  any per-record timestamps at all.
201
     *
202
     * @return $this
203
     */
204 9
    public function timestamp(string $column, string $format, ?string $missingValue = null): IndexTaskBuilder
205
    {
206 9
        $this->timestampSpec = new TimestampSpec($column, $format, $missingValue);
207
208 9
        return $this;
209
    }
210
211
    /**
212
     * Specify that we use JSON as input format.
213
     *
214
     * @param FlattenSpec|null        $flattenSpec Specifies flattening configuration for nested JSON data. See
215
     *                                             flattenSpec for more info.
216
     * @param array<string,bool>|null $features    JSON parser features supported by Jackson library. Those features
217
     *                                             will be applied when parsing the input JSON data.
218
     *
219
     * @see https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features
220
     */
221 1
    public function jsonFormat(?FlattenSpec $flattenSpec = null, ?array $features = null): self
222
    {
223 1
        $this->inputFormat = new JsonInputFormat($flattenSpec, $features);
224
225 1
        return $this;
226
    }
227
228
    /**
229
     * Specify that we use CSV as input format.
230
     *
231
     * @param string[]|null $columns               Specifies the columns of the data. The columns should be in the same
232
     *                                             order with the columns of your data.
233
     * @param string|null   $listDelimiter         A custom delimiter for multi-value dimensions.
234
     * @param bool|null     $findColumnsFromHeader If this is set, the task will find the column names from the header
235
     *                                             row. Note that skipHeaderRows will be applied before finding column
236
     *                                             names from the header. For example, if you set skipHeaderRows to 2
237
     *                                             and findColumnsFromHeader to true, the task will skip the first two
238
     *                                             lines and then extract column information from the third line.
239
     *                                             columns will be ignored if this is set to true.
240
     * @param int           $skipHeaderRows        If this is set, the task will skip the first skipHeaderRows rows.
241
     */
242 1
    public function csvFormat(
243
        ?array $columns = null,
244
        ?string $listDelimiter = null,
245
        ?bool $findColumnsFromHeader = null,
246
        int $skipHeaderRows = 0
247
    ): self {
248 1
        $this->inputFormat = new CsvInputFormat($columns, $listDelimiter, $findColumnsFromHeader, $skipHeaderRows);
249
250 1
        return $this;
251
    }
252
253
    /**
254
     * Specify that we use TSV as input format.
255
     *
256
     * @param array<string>|null $columns               Specifies the columns of the data. The columns should be in the
257
     *                                                  same order with the columns of your data.
258
     * @param string|null        $delimiter             A custom delimiter for data values.
259
     * @param string|null        $listDelimiter         A custom delimiter for multi-value dimensions.
260
     * @param bool|null          $findColumnsFromHeader If this is set, the task will find the column names from the
261
     *                                                  header row. Note that skipHeaderRows will be applied before
262
     *                                                  finding column names from the header. For example, if you set
263
     *                                                  skipHeaderRows to 2 and findColumnsFromHeader to true, the task
264
     *                                                  will skip the first two lines and then extract column
265
     *                                                  information from the third line. columns will be ignored if
266
     *                                                  this is set to true.
267
     * @param int                $skipHeaderRows        If this is set, the task will skip the first skipHeaderRows
268
     *                                                  rows.
269
     */
270 1
    public function tsvFormat(
271
        ?array $columns = null,
272
        ?string $delimiter = null,
273
        ?string $listDelimiter = null,
274
        ?bool $findColumnsFromHeader = null,
275
        int $skipHeaderRows = 0
276
    ): self {
277 1
        $this->inputFormat = new TsvInputFormat(
278
            $columns,
279
            $delimiter,
280
            $listDelimiter,
281
            $findColumnsFromHeader,
282
            $skipHeaderRows
283
        );
284
285 1
        return $this;
286
    }
287
288
    /**
289
     * Specify that we use ORC as input format.
290
     *
291
     * To use the ORC input format, load the Druid Orc extension ( druid-orc-extensions).
292
     *
293
     * @param FlattenSpec|null $flattenSpec    Specifies flattening configuration for nested ORC data. See flattenSpec
294
     *                                         for more info.
295
     * @param bool|null        $binaryAsString Specifies if the binary orc column which is not logically marked as a
296
     *                                         string should be treated as a UTF-8 encoded string. Default is false.
297
     */
298 1
    public function orcFormat(?FlattenSpec $flattenSpec = null, ?bool $binaryAsString = null): self
299
    {
300 1
        $this->inputFormat = new OrcInputFormat($flattenSpec, $binaryAsString);
301
302 1
        return $this;
303
    }
304
305
    /**
306
     * Specify that we use Parquet as input format.
307
     *
308
     * To use the Parquet input format load the Druid Parquet extension (druid-parquet-extensions).
309
     *
310
     * @param FlattenSpec|null $flattenSpec    Define a flattenSpec to extract nested values from a Parquet file. Note
311
     *                                         that only 'path' expression are supported ('jq' is unavailable).
312
     * @param bool|null        $binaryAsString Specifies if the bytes parquet column which is not logically marked as a
313
     *                                         string or enum type should be treated as a UTF-8 encoded string.
314
     */
315 1
    public function parquetFormat(?FlattenSpec $flattenSpec = null, ?bool $binaryAsString = null): self
316
    {
317 1
        $this->inputFormat = new ParquetInputFormat($flattenSpec, $binaryAsString);
318
319 1
        return $this;
320
    }
321
322
    /**
323
     * Specify that we use Protobuf as input format.
324
     *
325
     * You need to include the druid-protobuf-extensions as an extension to use the Protobuf input format.
326
     *
327
     * @param array<string,string> $protoBytesDecoder Specifies how to decode bytes to Protobuf record. See below for
328
     *                                                an example.
329
     * @param FlattenSpec|null     $flattenSpec       Define a flattenSpec to extract nested values from a Parquet
330
     *                                                file. Note that only 'path' expression are supported ('jq' is
331
     *                                                unavailable).
332
     *
333
     * Example $protoBytesDecoder value:
334
     * ```
335
     * [
336
     *     "type" => "file",
337
     *     "descriptor" => "file:///tmp/metrics.desc",
338
     *     "protoMessageType" => "Metrics"
339
     * ]
340
     * ```
341
     *
342
     * @see https://druid.apache.org/docs/latest/ingestion/data-formats.html#protobuf
343
     */
344 1
    public function protobufFormat(array $protoBytesDecoder, ?FlattenSpec $flattenSpec = null): self
345
    {
346 1
        $this->inputFormat = new ProtobufInputFormat($protoBytesDecoder, $flattenSpec);
347
348 1
        return $this;
349
    }
350
351
    /**
352
     * @param \Level23\Druid\Context\TaskContext|array<string,string|int|bool> $context
353
     *
354
     * @return \Level23\Druid\Tasks\TaskInterface
355
     */
356 11
    protected function buildTask($context): TaskInterface
357
    {
358 11
        if (is_array($context)) {
359 11
            $context = new TaskContext($context);
360
        }
361
362 11
        if ($this->queryGranularity === null) {
363 1
            throw new InvalidArgumentException('You have to specify a queryGranularity value!');
364
        }
365
366 10
        if ($this->interval === null) {
367 1
            throw new InvalidArgumentException('You have to specify an interval!');
368
        }
369
370 9
        if ($this->timestampSpec === null) {
371 1
            throw new InvalidArgumentException('You have to specify an timestamp column!');
372
        }
373
374 8
        if ($this->granularityType == ArbitraryGranularity::class) {
375 1
            $granularity = new ArbitraryGranularity(
376 1
                $this->queryGranularity,
377 1
                $this->rollup,
378 1
                new IntervalCollection($this->interval)
379
            );
380
        } else {
381 7
            if ($this->segmentGranularity === null) {
382 1
                throw new InvalidArgumentException('You have to specify a segmentGranularity value!');
383
            }
384
385 6
            $granularity = new UniformGranularity(
386 6
                $this->segmentGranularity,
387 6
                $this->queryGranularity,
388 6
                $this->rollup,
389 6
                new IntervalCollection($this->interval)
390
            );
391
        }
392
393
        // No input source known? Then use our deprecated "string" approach.
394 7
        if (!isset($this->inputSource)) {
395 3
            throw new InvalidArgumentException(
396
                'No InputSource known. You have to supply an input source!.'
397
            );
398
        }
399
400
        // Do we want to read data from duid? And no interval set yet? Then fill it. We assume this is a reindex task.
401 4
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->getInterval() === null) {
0 ignored issues
show
Bug introduced by
The method getInterval() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

401
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
The method getInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

401
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {
Loading history...
402 2
            $this->inputSource->setInterval($this->interval);
0 ignored issues
show
Bug introduced by
The method setInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

402
            $this->inputSource->/** @scrutinizer ignore-call */ 
403
                                setInterval($this->interval);
Loading history...
403
        }
404
405 4
        $task = new IndexTask(
406 4
            $this->dataSource,
407 4
            $this->inputSource,
0 ignored issues
show
Bug introduced by
It seems like $this->inputSource can also be of type null; however, parameter $inputSource of Level23\Druid\Tasks\IndexTask::__construct() does only seem to accept Level23\Druid\InputSources\InputSourceInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

407
            /** @scrutinizer ignore-type */ $this->inputSource,
Loading history...
408
            $granularity,
409 4
            $this->timestampSpec,
410 4
            $this->transformSpec,
411 4
            $this->tuningConfig,
412
            $context,
413 4
            new AggregationCollection(... $this->aggregations),
414 4
            $this->dimensions,
415 4
            $this->taskId,
416 4
            $this->inputFormat,
417 4
            $this->spatialDimensions
418
        );
419
420 4
        if ($this->parallel) {
421 3
            $task->setParallel($this->parallel);
422
        }
423
424 4
        if ($this->appendToExisting) {
425 3
            $task->setAppendToExisting($this->appendToExisting);
426
        }
427
428 4
        return $task;
429
    }
430
431
    /**
432
     * Call this with a closure. Your closure will receive a TransformBuilder, which allows you to
433
     * specify a transform which needs to be applied when this indexing job is executed. Optionally you can
434
     * also specify a filter on which records this transform needs to be applied.
435
     *
436
     * Note: calling this method more than once will overwrite the previous data!
437
     *
438
     * @param \Closure $transformBuilder
439
     *
440
     * @return $this
441
     */
442 2
    public function transform(Closure $transformBuilder): IndexTaskBuilder
443
    {
444 2
        $builder = new TransformBuilder();
445 2
        call_user_func($transformBuilder, $builder);
446
447 2
        if (!$builder->getTransforms()) {
448 1
            return $this;
449
        }
450
451 1
        $this->transformSpec = new TransformSpec(
452 1
            new TransformCollection(...$builder->getTransforms()),
453 1
            $builder->getFilter()
454
        );
455
456 1
        return $this;
457
    }
458
459
    /**
460
     * Enable rollup mode
461
     *
462
     * @return $this
463
     */
464 2
    public function rollup(): IndexTaskBuilder
465
    {
466 2
        $this->rollup = true;
467
468 2
        return $this;
469
    }
470
471
    /**
472
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
473
     *
474
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
475
     */
476 2
    public function inputSource(InputSourceInterface $inputSource): IndexTaskBuilder
477
    {
478 2
        $this->inputSource = $inputSource;
479
480 2
        return $this;
481
    }
482
483
    /**
484
     * Execute this index task as parallel batch.
485
     *
486
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
487
     */
488 5
    public function parallel(): IndexTaskBuilder
489
    {
490 5
        $this->parallel = true;
491
492 5
        return $this;
493
    }
494
495
    /**
496
     * Specify that we want to use a UniformGranularity
497
     *
498
     * @return $this
499
     */
500 4
    public function uniformGranularity(): IndexTaskBuilder
501
    {
502 4
        $this->granularityType = UniformGranularity::class;
503
504 4
        return $this;
505
    }
506
507
    /**
508
     * Specify that we want to use a ArbitraryGranularity
509
     *
510
     * @return $this
511
     */
512 3
    public function arbitraryGranularity(): IndexTaskBuilder
513
    {
514 3
        $this->granularityType = ArbitraryGranularity::class;
515
516 3
        return $this;
517
    }
518
519
    /**
520
     * Creates segments as additional shards of the latest version, effectively appending to the segment set instead of
521
     * replacing it. This means that you can append new segments to any datasource regardless of its original
522
     * partitioning scheme. You must use the dynamic partitioning type for the appended segments. If you specify a
523
     * different partitioning type, the task fails with an error.
524
     *
525
     * @param bool $appendToExisting
526
     *
527
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
528
     */
529 6
    public function appendToExisting(bool $appendToExisting = true): IndexTaskBuilder
530
    {
531 6
        $this->appendToExisting = $appendToExisting;
532
533 6
        return $this;
534
    }
535
}
536