Passed
Push — master ( 578412...e442d2 )
by Teye
05:30
created

IndexTaskBuilder::tsvFormat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 16
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 7
c 0
b 0
f 0
dl 0
loc 16
ccs 3
cts 3
cp 1
rs 10
cc 1
nc 1
nop 5
crap 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Tasks;
5
6
use Closure;
7
use InvalidArgumentException;
8
use Level23\Druid\DruidClient;
9
use Level23\Druid\Types\DataType;
10
use Level23\Druid\Context\TaskContext;
11
use Level23\Druid\Concerns\HasInterval;
12
use Level23\Druid\Concerns\HasInputFormat;
13
use Level23\Druid\Concerns\HasAggregations;
14
use Level23\Druid\Concerns\HasTuningConfig;
15
use Level23\Druid\Transforms\TransformSpec;
16
use Level23\Druid\Dimensions\TimestampSpec;
17
use Level23\Druid\Types\MultiValueHandling;
18
use Level23\Druid\Transforms\TransformBuilder;
19
use Level23\Druid\Dimensions\SpatialDimension;
20
use Level23\Druid\Concerns\HasQueryGranularity;
21
use Level23\Druid\InputSources\DruidInputSource;
22
use Level23\Druid\Collections\IntervalCollection;
23
use Level23\Druid\Concerns\HasSegmentGranularity;
24
use Level23\Druid\Collections\TransformCollection;
25
use Level23\Druid\Granularities\UniformGranularity;
26
use Level23\Druid\Collections\AggregationCollection;
27
use Level23\Druid\InputSources\InputSourceInterface;
28
use Level23\Druid\InputFormats\InputFormatInterface;
29
use Level23\Druid\Granularities\ArbitraryGranularity;
30
use Level23\Druid\Collections\SpatialDimensionCollection;
31
32
class IndexTaskBuilder extends TaskBuilder
33
{
34
    use HasSegmentGranularity, HasQueryGranularity, HasInterval, HasTuningConfig, HasAggregations, HasInputFormat;
35
36
    /**
37
     * @var array<array<string,string|bool>>
38
     */
39
    protected array $dimensions = [];
40
41
    protected SpatialDimensionCollection $spatialDimensions;
42
43
    /**
44
     * The data source where we will write to.
45
     *
46
     * @var string
47
     */
48
    protected string $dataSource;
49
50
    protected ?InputSourceInterface $inputSource;
51
52
    protected bool $rollup = false;
53
54
    /**
55
     * Whether this task should be executed parallel.
56
     *
57
     * @var bool
58
     */
59
    protected bool $parallel = false;
60
61
    protected ?TransformSpec $transformSpec = null;
62
63
    protected ?TimestampSpec $timestampSpec = null;
64
65
    /**
66
     * Here we remember which type of granularity we want.
67
     * By default, this is UniformGranularity.
68
     *
69
     * @var string
70
     */
71
    protected string $granularityType = UniformGranularity::class;
72
73
    protected ?InputFormatInterface $inputFormat = null;
74
75
    protected bool $appendToExisting = false;
76
77
    /**
78
     * IndexTaskBuilder constructor.
79
     *
80
     * @param DruidClient               $druidClient
81
     * @param string                    $toDataSource Data source where the data will be imported in.
82
     * @param InputSourceInterface|null $inputSource
83
     */
84 33
    public function __construct(
85
        DruidClient $druidClient,
86
        string $toDataSource,
87
        ?InputSourceInterface $inputSource = null
88
    ) {
89 33
        $this->client            = $druidClient;
90 33
        $this->dataSource        = $toDataSource;
91 33
        $this->inputSource       = $inputSource;
92 33
        $this->spatialDimensions = new SpatialDimensionCollection();
93
    }
94
95
    /**
96
     * Add a dimension.
97
     *
98
     * @param string $name
99
     * @param string $type
100
     *
101
     * @return $this
102
     */
103 1
    public function dimension(string $name, string $type = DataType::STRING): IndexTaskBuilder
104
    {
105 1
        $this->dimensions[] = ['name' => $name, 'type' => DataType::validate($type)];
106
107 1
        return $this;
108
    }
109
110
    /**
111
     * Add a multi-value dimension.
112
     *
113
     * @param string $name
114
     * @param string $type
115
     * @param string $multiValueHandling $type
116
     * @param bool   $createBitmapIndex
117
     *
118
     * @return $this
119
     */
120 2
    public function multiValueDimension(
121
        string $name,
122
        string $type = DataType::STRING,
123
        string $multiValueHandling = MultiValueHandling::SORTED_ARRAY,
124
        bool $createBitmapIndex = true
125
    ): IndexTaskBuilder {
126 2
        $this->dimensions[] = [
127
            'name'               => $name,
128 2
            'type'               => DataType::validate($type),
129 2
            'multiValueHandling' => MultiValueHandling::validate($multiValueHandling),
130
            'createBitmapIndex'  => $createBitmapIndex,
131
        ];
132
133 2
        return $this;
134
    }
135
136
    /**
137
     * Add a spatial dimension.
138
     *
139
     * @param string   $name Name of the dimension.
140
     * @param string[] $dims Field names where latitude,longitude data are read from.
141
     *
142
     * @return $this
143
     */
144 1
    public function spatialDimension(string $name, array $dims): IndexTaskBuilder
145
    {
146 1
        $this->spatialDimensions->add(new SpatialDimension($name, $dims));
147
148 1
        return $this;
149
    }
150
151
    /**
152
     * Enable append mode. When this is set, we will add the data retrieved from the firehose to the segments, instead
153
     * of overwriting the data in the segments.
154
     *
155
     * @return $this
156
     * @deprecated Use appendToExisting() instead.
157
     */
158 1
    public function append(): IndexTaskBuilder
159
    {
160 1
        $this->appendToExisting();
161
162 1
        return $this;
163
    }
164
165
    /**
166
     * @param string      $column       Input row field to read the primary timestamp from. Regardless of the name of
167
     *                                  this input field, the primary timestamp will always be stored as a column named
168
     *                                  __time in your Druid datasource.
169
     * @param string      $format       Timestamp format. Options are:
170
     *                                  - iso: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"
171
     *                                  - posix: seconds since epoch
172
     *                                  - millis: milliseconds since epoch
173
     *                                  - micro: microseconds since epoch
174
     *                                  - nano: nanoseconds since epoch
175
     *                                  - auto: automatically detects ISO (either 'T' or space separator) or millis
176
     *                                  format
177
     *                                  - any Joda DateTimeFormat string
178
     * @param null|string $missingValue Timestamp to use for input records that have a null or missing timestamp
179
     *                                  column. Should be in ISO8601 format, like "2000-01-01T01:02:03.456", even if
180
     *                                  you have specified something else for format. Since Druid requires a primary
181
     *                                  timestamp, this setting can be useful for ingesting datasets that do not have
182
     *                                  any per-record timestamps at all.
183
     *
184
     * @return $this
185
     */
186 9
    public function timestamp(string $column, string $format, ?string $missingValue = null): IndexTaskBuilder
187
    {
188 9
        $this->timestampSpec = new TimestampSpec($column, $format, $missingValue);
189
190 9
        return $this;
191
    }
192
193
    /**
194
     * @param \Level23\Druid\Context\TaskContext|array<string,string|int|bool> $context
195
     *
196
     * @return \Level23\Druid\Tasks\TaskInterface
197
     */
198 11
    protected function buildTask($context): TaskInterface
199
    {
200 11
        if (is_array($context)) {
201 11
            $context = new TaskContext($context);
202
        }
203
204 11
        if ($this->queryGranularity === null) {
205 1
            throw new InvalidArgumentException('You have to specify a queryGranularity value!');
206
        }
207
208 10
        if ($this->interval === null) {
209 1
            throw new InvalidArgumentException('You have to specify an interval!');
210
        }
211
212 9
        if ($this->timestampSpec === null) {
213 1
            throw new InvalidArgumentException('You have to specify an timestamp column!');
214
        }
215
216 8
        if ($this->granularityType == ArbitraryGranularity::class) {
217 1
            $granularity = new ArbitraryGranularity(
218 1
                $this->queryGranularity,
219 1
                $this->rollup,
220 1
                new IntervalCollection($this->interval)
221
            );
222
        } else {
223 7
            if ($this->segmentGranularity === null) {
224 1
                throw new InvalidArgumentException('You have to specify a segmentGranularity value!');
225
            }
226
227 6
            $granularity = new UniformGranularity(
228 6
                $this->segmentGranularity,
229 6
                $this->queryGranularity,
230 6
                $this->rollup,
231 6
                new IntervalCollection($this->interval)
232
            );
233
        }
234
235
        // No input source known? Then use our deprecated "string" approach.
236 7
        if (!isset($this->inputSource)) {
237 3
            throw new InvalidArgumentException(
238
                'No InputSource known. You have to supply an input source!.'
239
            );
240
        }
241
242
        // Do we want to read data from duid? And no interval set yet? Then fill it. We assume this is a reindex task.
243 4
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->getInterval() === null) {
0 ignored issues
show
Bug introduced by
The method getInterval() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

243
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
The method getInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

243
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {
Loading history...
244 2
            $this->inputSource->setInterval($this->interval);
0 ignored issues
show
Bug introduced by
The method setInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

244
            $this->inputSource->/** @scrutinizer ignore-call */ 
245
                                setInterval($this->interval);
Loading history...
245
        }
246
247 4
        $task = new IndexTask(
248 4
            $this->dataSource,
249 4
            $this->inputSource,
0 ignored issues
show
Bug introduced by
It seems like $this->inputSource can also be of type null; however, parameter $inputSource of Level23\Druid\Tasks\IndexTask::__construct() does only seem to accept Level23\Druid\InputSources\InputSourceInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

249
            /** @scrutinizer ignore-type */ $this->inputSource,
Loading history...
250
            $granularity,
251 4
            $this->timestampSpec,
252 4
            $this->transformSpec,
253 4
            $this->tuningConfig,
254
            $context,
255 4
            new AggregationCollection(... $this->aggregations),
256 4
            $this->dimensions,
257 4
            $this->taskId,
258 4
            $this->inputFormat,
259 4
            $this->spatialDimensions
260
        );
261
262 4
        if ($this->parallel) {
263 3
            $task->setParallel($this->parallel);
264
        }
265
266 4
        if ($this->appendToExisting) {
267 3
            $task->setAppendToExisting($this->appendToExisting);
268
        }
269
270 4
        return $task;
271
    }
272
273
    /**
274
     * Call this with a closure. Your closure will receive a TransformBuilder, which allows you to
275
     * specify a transform which needs to be applied when this indexing job is executed. Optionally you can
276
     * also specify a filter on which records this transform needs to be applied.
277
     *
278
     * Note: calling this method more than once will overwrite the previous data!
279
     *
280
     * @param \Closure $transformBuilder
281
     *
282
     * @return $this
283
     */
284 2
    public function transform(Closure $transformBuilder): IndexTaskBuilder
285
    {
286 2
        $builder = new TransformBuilder();
287 2
        call_user_func($transformBuilder, $builder);
288
289 2
        if (!$builder->getTransforms()) {
290 1
            return $this;
291
        }
292
293 1
        $this->transformSpec = new TransformSpec(
294 1
            new TransformCollection(...$builder->getTransforms()),
295 1
            $builder->getFilter()
296
        );
297
298 1
        return $this;
299
    }
300
301
    /**
302
     * Enable rollup mode
303
     *
304
     * @return $this
305
     */
306 2
    public function rollup(): IndexTaskBuilder
307
    {
308 2
        $this->rollup = true;
309
310 2
        return $this;
311
    }
312
313
    /**
314
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
315
     *
316
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
317
     */
318 2
    public function inputSource(InputSourceInterface $inputSource): IndexTaskBuilder
319
    {
320 2
        $this->inputSource = $inputSource;
321
322 2
        return $this;
323
    }
324
325
    /**
326
     * Execute this index task as parallel batch.
327
     *
328
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
329
     */
330 5
    public function parallel(): IndexTaskBuilder
331
    {
332 5
        $this->parallel = true;
333
334 5
        return $this;
335
    }
336
337
    /**
338
     * Specify that we want to use a UniformGranularity
339
     *
340
     * @return $this
341
     */
342 4
    public function uniformGranularity(): IndexTaskBuilder
343
    {
344 4
        $this->granularityType = UniformGranularity::class;
345
346 4
        return $this;
347
    }
348
349
    /**
350
     * Specify that we want to use a ArbitraryGranularity
351
     *
352
     * @return $this
353
     */
354 3
    public function arbitraryGranularity(): IndexTaskBuilder
355
    {
356 3
        $this->granularityType = ArbitraryGranularity::class;
357
358 3
        return $this;
359
    }
360
361
    /**
362
     * Creates segments as additional shards of the latest version, effectively appending to the segment set instead of
363
     * replacing it. This means that you can append new segments to any datasource regardless of its original
364
     * partitioning scheme. You must use the dynamic partitioning type for the appended segments. If you specify a
365
     * different partitioning type, the task fails with an error.
366
     *
367
     * @param bool $appendToExisting
368
     *
369
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
370
     */
371 6
    public function appendToExisting(bool $appendToExisting = true): IndexTaskBuilder
372
    {
373 6
        $this->appendToExisting = $appendToExisting;
374
375 6
        return $this;
376
    }
377
}
378