Issues (52)

src/Tasks/IndexTaskBuilder.php (5 issues)

1
<?php
2
declare(strict_types=1);
3
4
namespace Level23\Druid\Tasks;
5
6
use Closure;
7
use InvalidArgumentException;
8
use Level23\Druid\DruidClient;
9
use Level23\Druid\Types\DataType;
10
use Level23\Druid\Context\TaskContext;
11
use Level23\Druid\Concerns\HasInterval;
12
use Level23\Druid\Concerns\HasInputFormat;
13
use Level23\Druid\Concerns\HasAggregations;
14
use Level23\Druid\Concerns\HasTuningConfig;
15
use Level23\Druid\Transforms\TransformSpec;
16
use Level23\Druid\Dimensions\TimestampSpec;
17
use Level23\Druid\Types\MultiValueHandling;
18
use Level23\Druid\Transforms\TransformBuilder;
19
use Level23\Druid\Dimensions\SpatialDimension;
20
use Level23\Druid\Concerns\HasQueryGranularity;
21
use Level23\Druid\InputSources\DruidInputSource;
22
use Level23\Druid\Collections\IntervalCollection;
23
use Level23\Druid\Concerns\HasSegmentGranularity;
24
use Level23\Druid\Collections\TransformCollection;
25
use Level23\Druid\Granularities\UniformGranularity;
26
use Level23\Druid\Collections\AggregationCollection;
27
use Level23\Druid\InputSources\InputSourceInterface;
28
use Level23\Druid\InputFormats\InputFormatInterface;
29
use Level23\Druid\Granularities\ArbitraryGranularity;
30
use Level23\Druid\Collections\SpatialDimensionCollection;
31
32
class IndexTaskBuilder extends TaskBuilder
33
{
34
    use HasSegmentGranularity, HasQueryGranularity, HasInterval, HasTuningConfig, HasAggregations, HasInputFormat;
35
36
    /**
37
     * @var array<array<string,string|bool>>
38
     */
39
    protected array $dimensions = [];
40
41
    protected SpatialDimensionCollection $spatialDimensions;
42
43
    /**
44
     * The data source where we will write to.
45
     *
46
     * @var string
47
     */
48
    protected string $dataSource;
49
50
    protected ?InputSourceInterface $inputSource;
51
52
    protected bool $rollup = false;
53
54
    /**
55
     * Whether this task should be executed parallel.
56
     *
57
     * @var bool
58
     */
59
    protected bool $parallel = false;
60
61
    protected ?TransformSpec $transformSpec = null;
62
63
    protected ?TimestampSpec $timestampSpec = null;
64
65
    /**
66
     * Here we remember which type of granularity we want.
67
     * By default, this is UniformGranularity.
68
     *
69
     * @var string
70
     */
71
    protected string $granularityType = UniformGranularity::class;
72
73
    protected ?InputFormatInterface $inputFormat = null;
74
75
    protected bool $appendToExisting = false;
76
77
    /**
78
     * IndexTaskBuilder constructor.
79
     *
80
     * @param DruidClient               $druidClient
81
     * @param string                    $toDataSource Data source where the data will be imported in.
82
     * @param InputSourceInterface|null $inputSource
83
     */
84 32
    public function __construct(
85
        DruidClient $druidClient,
86
        string $toDataSource,
87
        ?InputSourceInterface $inputSource = null
88
    ) {
89 32
        $this->client            = $druidClient;
90 32
        $this->dataSource        = $toDataSource;
91 32
        $this->inputSource       = $inputSource;
92 32
        $this->spatialDimensions = new SpatialDimensionCollection();
93
    }
94
95
    /**
96
     * Add a dimension.
97
     *
98
     * @param string          $name
99
     * @param string|DataType $type
100
     *
101
     * @return $this
102
     */
103 1
    public function dimension(string $name, string|DataType $type = DataType::STRING): IndexTaskBuilder
104
    {
105 1
        $this->dimensions[] = [
106 1
            'name' => $name,
107 1
            'type' => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value,
108 1
        ];
109
110 1
        return $this;
111
    }
112
113
    /**
114
     * Add a multi-value dimension.
115
     *
116
     * @param string                    $name
117
     * @param string|DataType           $type
118
     * @param string|MultiValueHandling $multiValueHandling $type
119
     * @param bool                      $createBitmapIndex
120
     *
121
     * @return $this
122
     */
123 2
    public function multiValueDimension(
124
        string $name,
125
        string|DataType $type = DataType::STRING,
126
        string|MultiValueHandling $multiValueHandling = MultiValueHandling::SORTED_ARRAY,
127
        bool $createBitmapIndex = true
128
    ): IndexTaskBuilder {
129 2
        $this->dimensions[] = [
130 2
            'name'               => $name,
131 2
            'type'               => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value,
132 2
            'multiValueHandling' => (is_string($multiValueHandling) ? MultiValueHandling::from(strtoupper($multiValueHandling)) : $multiValueHandling)->value,
133 2
            'createBitmapIndex'  => $createBitmapIndex,
134 2
        ];
135
136 2
        return $this;
137
    }
138
139
    /**
140
     * Add a spatial dimension.
141
     *
142
     * @param string   $name Name of the dimension.
143
     * @param string[] $dims Field names where latitude,longitude data are read from.
144
     *
145
     * @return $this
146
     */
147 1
    public function spatialDimension(string $name, array $dims): IndexTaskBuilder
148
    {
149 1
        $this->spatialDimensions->add(new SpatialDimension($name, $dims));
150
151 1
        return $this;
152
    }
153
154
    /**
155
     * @param string      $column       Input row field to read the primary timestamp from. Regardless of the name of
156
     *                                  this input field, the primary timestamp will always be stored as a column named
157
     *                                  __time in your Druid datasource.
158
     * @param string      $format       Timestamp format. Options are:
159
     *                                  - iso: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"
160
     *                                  - posix: seconds since epoch
161
     *                                  - millis: milliseconds since epoch
162
     *                                  - micro: microseconds since epoch
163
     *                                  - nano: nanoseconds since epoch
164
     *                                  - auto: automatically detects ISO (either 'T' or space separator) or millis
165
     *                                  format
166
     *                                  - any Joda DateTimeFormat string
167
     * @param null|string $missingValue Timestamp to use for input records that have a null or missing timestamp
168
     *                                  column. Should be in ISO8601 format, like "2000-01-01T01:02:03.456", even if
169
     *                                  you have specified something else for format. Since Druid requires a primary
170
     *                                  timestamp, this setting can be useful for ingesting datasets that do not have
171
     *                                  any per-record timestamps at all.
172
     *
173
     * @return $this
174
     */
175 9
    public function timestamp(string $column, string $format, ?string $missingValue = null): IndexTaskBuilder
176
    {
177 9
        $this->timestampSpec = new TimestampSpec($column, $format, $missingValue);
178
179 9
        return $this;
180
    }
181
182
    /**
183
     * @param \Level23\Druid\Context\TaskContext|array<string,string|int|bool> $context
184
     *
185
     * @return \Level23\Druid\Tasks\TaskInterface
186
     */
187 11
    protected function buildTask(array|TaskContext $context): TaskInterface
188
    {
189 11
        if (is_array($context)) {
0 ignored issues
show
The condition is_array($context) is always true.
Loading history...
190 11
            $context = new TaskContext($context);
191
        }
192
193 11
        if ($this->queryGranularity === null) {
194 1
            throw new InvalidArgumentException('You have to specify a queryGranularity value!');
195
        }
196
197 10
        if ($this->interval === null) {
198 1
            throw new InvalidArgumentException('You have to specify an interval!');
199
        }
200
201 9
        if ($this->timestampSpec === null) {
202 1
            throw new InvalidArgumentException('You have to specify an timestamp column!');
203
        }
204
205 8
        if ($this->granularityType === ArbitraryGranularity::class) {
206 1
            $granularity = new ArbitraryGranularity(
207 1
                $this->queryGranularity,
208 1
                $this->rollup,
209 1
                new IntervalCollection($this->interval)
210 1
            );
211
        } else {
212 7
            if ($this->segmentGranularity === null) {
213 1
                throw new InvalidArgumentException('You have to specify a segmentGranularity value!');
214
            }
215
216 6
            $granularity = new UniformGranularity(
217 6
                $this->segmentGranularity,
218 6
                $this->queryGranularity,
219 6
                $this->rollup,
220 6
                new IntervalCollection($this->interval)
221 6
            );
222
        }
223
224
        // No input source known?
225 7
        if (!isset($this->inputSource)) {
226 3
            throw new InvalidArgumentException(
227 3
                'No InputSource known. You have to supply an input source!.'
228 3
            );
229
        }
230
231
        // Do we want to read data from duid? And no interval set yet? Then fill it. We assume this is a reindex task.
232 4
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->getInterval() === null) {
0 ignored issues
show
The method getInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

232
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {
Loading history...
The method getInterval() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

232
        if ($this->inputSource instanceof DruidInputSource && $this->inputSource->/** @scrutinizer ignore-call */ getInterval() === null) {

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
233 2
            $this->inputSource->setInterval($this->interval);
0 ignored issues
show
The method setInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

233
            $this->inputSource->/** @scrutinizer ignore-call */ 
234
                                setInterval($this->interval);
Loading history...
234
        }
235
236 4
        $task = new IndexTask(
237 4
            $this->dataSource,
238 4
            $this->inputSource,
0 ignored issues
show
It seems like $this->inputSource can also be of type null; however, parameter $inputSource of Level23\Druid\Tasks\IndexTask::__construct() does only seem to accept Level23\Druid\InputSources\InputSourceInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

238
            /** @scrutinizer ignore-type */ $this->inputSource,
Loading history...
239 4
            $granularity,
240 4
            $this->timestampSpec,
241 4
            $this->transformSpec,
242 4
            $this->tuningConfig,
243 4
            $context,
244 4
            new AggregationCollection(... $this->aggregations),
245 4
            $this->dimensions,
246 4
            $this->taskId,
247 4
            $this->inputFormat,
248 4
            $this->spatialDimensions
249 4
        );
250
251 4
        if ($this->parallel) {
252 3
            $task->setParallel($this->parallel);
253
        }
254
255 4
        if ($this->appendToExisting) {
256 3
            $task->setAppendToExisting($this->appendToExisting);
257
        }
258
259 4
        return $task;
260
    }
261
262
    /**
263
     * Call this with a closure. Your closure will receive a TransformBuilder, which allows you to
264
     * specify a transform which needs to be applied when this indexing job is executed. Optionally you can
265
     * also specify a filter on which records this transform needs to be applied.
266
     *
267
     * Note: calling this method more than once will overwrite the previous data!
268
     *
269
     * @param \Closure $transformBuilder
270
     *
271
     * @return $this
272
     */
273 2
    public function transform(Closure $transformBuilder): IndexTaskBuilder
274
    {
275 2
        $builder = new TransformBuilder();
276 2
        call_user_func($transformBuilder, $builder);
277
278 2
        if (!$builder->getTransforms()) {
279 1
            return $this;
280
        }
281
282 1
        $this->transformSpec = new TransformSpec(
283 1
            new TransformCollection(...$builder->getTransforms()),
284 1
            $builder->getFilter()
285 1
        );
286
287 1
        return $this;
288
    }
289
290
    /**
291
     * Enable rollup mode
292
     *
293
     * @return $this
294
     */
295 2
    public function rollup(): IndexTaskBuilder
296
    {
297 2
        $this->rollup = true;
298
299 2
        return $this;
300
    }
301
302
    /**
303
     * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource
304
     *
305
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
306
     */
307 2
    public function inputSource(InputSourceInterface $inputSource): IndexTaskBuilder
308
    {
309 2
        $this->inputSource = $inputSource;
310
311 2
        return $this;
312
    }
313
314
    /**
315
     * Execute this index task as parallel batch.
316
     *
317
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
318
     */
319 5
    public function parallel(): IndexTaskBuilder
320
    {
321 5
        $this->parallel = true;
322
323 5
        return $this;
324
    }
325
326
    /**
327
     * Specify that we want to use a UniformGranularity
328
     *
329
     * @return $this
330
     */
331 4
    public function uniformGranularity(): IndexTaskBuilder
332
    {
333 4
        $this->granularityType = UniformGranularity::class;
334
335 4
        return $this;
336
    }
337
338
    /**
339
     * Specify that we want to use a ArbitraryGranularity
340
     *
341
     * @return $this
342
     */
343 3
    public function arbitraryGranularity(): IndexTaskBuilder
344
    {
345 3
        $this->granularityType = ArbitraryGranularity::class;
346
347 3
        return $this;
348
    }
349
350
    /**
351
     * Creates segments as additional shards of the latest version, effectively appending to the segment set instead of
352
     * replacing it. This means that you can append new segments to any datasource regardless of its original
353
     * partitioning scheme. You must use the dynamic partitioning type for the appended segments. If you specify a
354
     * different partitioning type, the task fails with an error.
355
     *
356
     * @param bool $appendToExisting
357
     *
358
     * @return \Level23\Druid\Tasks\IndexTaskBuilder
359
     */
360 6
    public function appendToExisting(bool $appendToExisting = true): IndexTaskBuilder
361
    {
362 6
        $this->appendToExisting = $appendToExisting;
363
364 6
        return $this;
365
    }
366
}
367