1 | <?php |
||||||
2 | declare(strict_types=1); |
||||||
3 | |||||||
4 | namespace Level23\Druid\Tasks; |
||||||
5 | |||||||
6 | use Closure; |
||||||
7 | use InvalidArgumentException; |
||||||
8 | use Level23\Druid\DruidClient; |
||||||
9 | use Level23\Druid\Types\DataType; |
||||||
10 | use Level23\Druid\Context\TaskContext; |
||||||
11 | use Level23\Druid\Concerns\HasInterval; |
||||||
12 | use Level23\Druid\Concerns\HasInputFormat; |
||||||
13 | use Level23\Druid\Concerns\HasAggregations; |
||||||
14 | use Level23\Druid\Concerns\HasTuningConfig; |
||||||
15 | use Level23\Druid\Transforms\TransformSpec; |
||||||
16 | use Level23\Druid\Dimensions\TimestampSpec; |
||||||
17 | use Level23\Druid\Types\MultiValueHandling; |
||||||
18 | use Level23\Druid\Transforms\TransformBuilder; |
||||||
19 | use Level23\Druid\Dimensions\SpatialDimension; |
||||||
20 | use Level23\Druid\Concerns\HasQueryGranularity; |
||||||
21 | use Level23\Druid\InputSources\DruidInputSource; |
||||||
22 | use Level23\Druid\Collections\IntervalCollection; |
||||||
23 | use Level23\Druid\Concerns\HasSegmentGranularity; |
||||||
24 | use Level23\Druid\Collections\TransformCollection; |
||||||
25 | use Level23\Druid\Granularities\UniformGranularity; |
||||||
26 | use Level23\Druid\Collections\AggregationCollection; |
||||||
27 | use Level23\Druid\InputSources\InputSourceInterface; |
||||||
28 | use Level23\Druid\InputFormats\InputFormatInterface; |
||||||
29 | use Level23\Druid\Granularities\ArbitraryGranularity; |
||||||
30 | use Level23\Druid\Collections\SpatialDimensionCollection; |
||||||
31 | |||||||
32 | class IndexTaskBuilder extends TaskBuilder |
||||||
33 | { |
||||||
34 | use HasSegmentGranularity, HasQueryGranularity, HasInterval, HasTuningConfig, HasAggregations, HasInputFormat; |
||||||
35 | |||||||
36 | /** |
||||||
37 | * @var array<array<string,string|bool>> |
||||||
38 | */ |
||||||
39 | protected array $dimensions = []; |
||||||
40 | |||||||
41 | protected SpatialDimensionCollection $spatialDimensions; |
||||||
42 | |||||||
43 | /** |
||||||
44 | * The data source where we will write to. |
||||||
45 | * |
||||||
46 | * @var string |
||||||
47 | */ |
||||||
48 | protected string $dataSource; |
||||||
49 | |||||||
50 | protected ?InputSourceInterface $inputSource; |
||||||
51 | |||||||
52 | protected bool $rollup = false; |
||||||
53 | |||||||
54 | /** |
||||||
55 | * Whether this task should be executed parallel. |
||||||
56 | * |
||||||
57 | * @var bool |
||||||
58 | */ |
||||||
59 | protected bool $parallel = false; |
||||||
60 | |||||||
61 | protected ?TransformSpec $transformSpec = null; |
||||||
62 | |||||||
63 | protected ?TimestampSpec $timestampSpec = null; |
||||||
64 | |||||||
65 | /** |
||||||
66 | * Here we remember which type of granularity we want. |
||||||
67 | * By default, this is UniformGranularity. |
||||||
68 | * |
||||||
69 | * @var string |
||||||
70 | */ |
||||||
71 | protected string $granularityType = UniformGranularity::class; |
||||||
72 | |||||||
73 | protected ?InputFormatInterface $inputFormat = null; |
||||||
74 | |||||||
75 | protected bool $appendToExisting = false; |
||||||
76 | |||||||
77 | /** |
||||||
78 | * IndexTaskBuilder constructor. |
||||||
79 | * |
||||||
80 | * @param DruidClient $druidClient |
||||||
81 | * @param string $toDataSource Data source where the data will be imported in. |
||||||
82 | * @param InputSourceInterface|null $inputSource |
||||||
83 | */ |
||||||
84 | 32 | public function __construct( |
|||||
85 | DruidClient $druidClient, |
||||||
86 | string $toDataSource, |
||||||
87 | ?InputSourceInterface $inputSource = null |
||||||
88 | ) { |
||||||
89 | 32 | $this->client = $druidClient; |
|||||
90 | 32 | $this->dataSource = $toDataSource; |
|||||
91 | 32 | $this->inputSource = $inputSource; |
|||||
92 | 32 | $this->spatialDimensions = new SpatialDimensionCollection(); |
|||||
93 | } |
||||||
94 | |||||||
95 | /** |
||||||
96 | * Add a dimension. |
||||||
97 | * |
||||||
98 | * @param string $name |
||||||
99 | * @param string|DataType $type |
||||||
100 | * |
||||||
101 | * @return $this |
||||||
102 | */ |
||||||
103 | 1 | public function dimension(string $name, string|DataType $type = DataType::STRING): IndexTaskBuilder |
|||||
104 | { |
||||||
105 | 1 | $this->dimensions[] = [ |
|||||
106 | 1 | 'name' => $name, |
|||||
107 | 1 | 'type' => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value, |
|||||
108 | 1 | ]; |
|||||
109 | |||||||
110 | 1 | return $this; |
|||||
111 | } |
||||||
112 | |||||||
113 | /** |
||||||
114 | * Add a multi-value dimension. |
||||||
115 | * |
||||||
116 | * @param string $name |
||||||
117 | * @param string|DataType $type |
||||||
118 | * @param string|MultiValueHandling $multiValueHandling $type |
||||||
119 | * @param bool $createBitmapIndex |
||||||
120 | * |
||||||
121 | * @return $this |
||||||
122 | */ |
||||||
123 | 2 | public function multiValueDimension( |
|||||
124 | string $name, |
||||||
125 | string|DataType $type = DataType::STRING, |
||||||
126 | string|MultiValueHandling $multiValueHandling = MultiValueHandling::SORTED_ARRAY, |
||||||
127 | bool $createBitmapIndex = true |
||||||
128 | ): IndexTaskBuilder { |
||||||
129 | 2 | $this->dimensions[] = [ |
|||||
130 | 2 | 'name' => $name, |
|||||
131 | 2 | 'type' => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value, |
|||||
132 | 2 | 'multiValueHandling' => (is_string($multiValueHandling) ? MultiValueHandling::from(strtoupper($multiValueHandling)) : $multiValueHandling)->value, |
|||||
133 | 2 | 'createBitmapIndex' => $createBitmapIndex, |
|||||
134 | 2 | ]; |
|||||
135 | |||||||
136 | 2 | return $this; |
|||||
137 | } |
||||||
138 | |||||||
139 | /** |
||||||
140 | * Add a spatial dimension. |
||||||
141 | * |
||||||
142 | * @param string $name Name of the dimension. |
||||||
143 | * @param string[] $dims Field names where latitude,longitude data are read from. |
||||||
144 | * |
||||||
145 | * @return $this |
||||||
146 | */ |
||||||
147 | 1 | public function spatialDimension(string $name, array $dims): IndexTaskBuilder |
|||||
148 | { |
||||||
149 | 1 | $this->spatialDimensions->add(new SpatialDimension($name, $dims)); |
|||||
150 | |||||||
151 | 1 | return $this; |
|||||
152 | } |
||||||
153 | |||||||
154 | /** |
||||||
155 | * @param string $column Input row field to read the primary timestamp from. Regardless of the name of |
||||||
156 | * this input field, the primary timestamp will always be stored as a column named |
||||||
157 | * __time in your Druid datasource. |
||||||
158 | * @param string $format Timestamp format. Options are: |
||||||
159 | * - iso: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456" |
||||||
160 | * - posix: seconds since epoch |
||||||
161 | * - millis: milliseconds since epoch |
||||||
162 | * - micro: microseconds since epoch |
||||||
163 | * - nano: nanoseconds since epoch |
||||||
164 | * - auto: automatically detects ISO (either 'T' or space separator) or millis |
||||||
165 | * format |
||||||
166 | * - any Joda DateTimeFormat string |
||||||
167 | * @param null|string $missingValue Timestamp to use for input records that have a null or missing timestamp |
||||||
168 | * column. Should be in ISO8601 format, like "2000-01-01T01:02:03.456", even if |
||||||
169 | * you have specified something else for format. Since Druid requires a primary |
||||||
170 | * timestamp, this setting can be useful for ingesting datasets that do not have |
||||||
171 | * any per-record timestamps at all. |
||||||
172 | * |
||||||
173 | * @return $this |
||||||
174 | */ |
||||||
175 | 9 | public function timestamp(string $column, string $format, ?string $missingValue = null): IndexTaskBuilder |
|||||
176 | { |
||||||
177 | 9 | $this->timestampSpec = new TimestampSpec($column, $format, $missingValue); |
|||||
178 | |||||||
179 | 9 | return $this; |
|||||
180 | } |
||||||
181 | |||||||
182 | /** |
||||||
183 | * @param \Level23\Druid\Context\TaskContext|array<string,string|int|bool> $context |
||||||
184 | * |
||||||
185 | * @return \Level23\Druid\Tasks\TaskInterface |
||||||
186 | */ |
||||||
187 | 11 | protected function buildTask(array|TaskContext $context): TaskInterface |
|||||
188 | { |
||||||
189 | 11 | if (is_array($context)) { |
|||||
0 ignored issues
–
show
introduced
by
![]() |
|||||||
190 | 11 | $context = new TaskContext($context); |
|||||
191 | } |
||||||
192 | |||||||
193 | 11 | if ($this->queryGranularity === null) { |
|||||
194 | 1 | throw new InvalidArgumentException('You have to specify a queryGranularity value!'); |
|||||
195 | } |
||||||
196 | |||||||
197 | 10 | if ($this->interval === null) { |
|||||
198 | 1 | throw new InvalidArgumentException('You have to specify an interval!'); |
|||||
199 | } |
||||||
200 | |||||||
201 | 9 | if ($this->timestampSpec === null) { |
|||||
202 | 1 | throw new InvalidArgumentException('You have to specify an timestamp column!'); |
|||||
203 | } |
||||||
204 | |||||||
205 | 8 | if ($this->granularityType === ArbitraryGranularity::class) { |
|||||
206 | 1 | $granularity = new ArbitraryGranularity( |
|||||
207 | 1 | $this->queryGranularity, |
|||||
208 | 1 | $this->rollup, |
|||||
209 | 1 | new IntervalCollection($this->interval) |
|||||
210 | 1 | ); |
|||||
211 | } else { |
||||||
212 | 7 | if ($this->segmentGranularity === null) { |
|||||
213 | 1 | throw new InvalidArgumentException('You have to specify a segmentGranularity value!'); |
|||||
214 | } |
||||||
215 | |||||||
216 | 6 | $granularity = new UniformGranularity( |
|||||
217 | 6 | $this->segmentGranularity, |
|||||
218 | 6 | $this->queryGranularity, |
|||||
219 | 6 | $this->rollup, |
|||||
220 | 6 | new IntervalCollection($this->interval) |
|||||
221 | 6 | ); |
|||||
222 | } |
||||||
223 | |||||||
224 | // No input source known? |
||||||
225 | 7 | if (!isset($this->inputSource)) { |
|||||
226 | 3 | throw new InvalidArgumentException( |
|||||
227 | 3 | 'No InputSource known. You have to supply an input source!.' |
|||||
228 | 3 | ); |
|||||
229 | } |
||||||
230 | |||||||
231 | // Do we want to read data from duid? And no interval set yet? Then fill it. We assume this is a reindex task. |
||||||
232 | 4 | if ($this->inputSource instanceof DruidInputSource && $this->inputSource->getInterval() === null) { |
|||||
0 ignored issues
–
show
The method
getInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface . It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() The method
getInterval() does not exist on null .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
233 | 2 | $this->inputSource->setInterval($this->interval); |
|||||
0 ignored issues
–
show
The method
setInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface . It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
234 | } |
||||||
235 | |||||||
236 | 4 | $task = new IndexTask( |
|||||
237 | 4 | $this->dataSource, |
|||||
238 | 4 | $this->inputSource, |
|||||
0 ignored issues
–
show
It seems like
$this->inputSource can also be of type null ; however, parameter $inputSource of Level23\Druid\Tasks\IndexTask::__construct() does only seem to accept Level23\Druid\InputSources\InputSourceInterface , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
239 | 4 | $granularity, |
|||||
240 | 4 | $this->timestampSpec, |
|||||
241 | 4 | $this->transformSpec, |
|||||
242 | 4 | $this->tuningConfig, |
|||||
243 | 4 | $context, |
|||||
244 | 4 | new AggregationCollection(... $this->aggregations), |
|||||
245 | 4 | $this->dimensions, |
|||||
246 | 4 | $this->taskId, |
|||||
247 | 4 | $this->inputFormat, |
|||||
248 | 4 | $this->spatialDimensions |
|||||
249 | 4 | ); |
|||||
250 | |||||||
251 | 4 | if ($this->parallel) { |
|||||
252 | 3 | $task->setParallel($this->parallel); |
|||||
253 | } |
||||||
254 | |||||||
255 | 4 | if ($this->appendToExisting) { |
|||||
256 | 3 | $task->setAppendToExisting($this->appendToExisting); |
|||||
257 | } |
||||||
258 | |||||||
259 | 4 | return $task; |
|||||
260 | } |
||||||
261 | |||||||
262 | /** |
||||||
263 | * Call this with a closure. Your closure will receive a TransformBuilder, which allows you to |
||||||
264 | * specify a transform which needs to be applied when this indexing job is executed. Optionally you can |
||||||
265 | * also specify a filter on which records this transform needs to be applied. |
||||||
266 | * |
||||||
267 | * Note: calling this method more than once will overwrite the previous data! |
||||||
268 | * |
||||||
269 | * @param \Closure $transformBuilder |
||||||
270 | * |
||||||
271 | * @return $this |
||||||
272 | */ |
||||||
273 | 2 | public function transform(Closure $transformBuilder): IndexTaskBuilder |
|||||
274 | { |
||||||
275 | 2 | $builder = new TransformBuilder(); |
|||||
276 | 2 | call_user_func($transformBuilder, $builder); |
|||||
277 | |||||||
278 | 2 | if (!$builder->getTransforms()) { |
|||||
279 | 1 | return $this; |
|||||
280 | } |
||||||
281 | |||||||
282 | 1 | $this->transformSpec = new TransformSpec( |
|||||
283 | 1 | new TransformCollection(...$builder->getTransforms()), |
|||||
284 | 1 | $builder->getFilter() |
|||||
285 | 1 | ); |
|||||
286 | |||||||
287 | 1 | return $this; |
|||||
288 | } |
||||||
289 | |||||||
290 | /** |
||||||
291 | * Enable rollup mode |
||||||
292 | * |
||||||
293 | * @return $this |
||||||
294 | */ |
||||||
295 | 2 | public function rollup(): IndexTaskBuilder |
|||||
296 | { |
||||||
297 | 2 | $this->rollup = true; |
|||||
298 | |||||||
299 | 2 | return $this; |
|||||
300 | } |
||||||
301 | |||||||
302 | /** |
||||||
303 | * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource |
||||||
304 | * |
||||||
305 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
306 | */ |
||||||
307 | 2 | public function inputSource(InputSourceInterface $inputSource): IndexTaskBuilder |
|||||
308 | { |
||||||
309 | 2 | $this->inputSource = $inputSource; |
|||||
310 | |||||||
311 | 2 | return $this; |
|||||
312 | } |
||||||
313 | |||||||
314 | /** |
||||||
315 | * Execute this index task as parallel batch. |
||||||
316 | * |
||||||
317 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
318 | */ |
||||||
319 | 5 | public function parallel(): IndexTaskBuilder |
|||||
320 | { |
||||||
321 | 5 | $this->parallel = true; |
|||||
322 | |||||||
323 | 5 | return $this; |
|||||
324 | } |
||||||
325 | |||||||
326 | /** |
||||||
327 | * Specify that we want to use a UniformGranularity |
||||||
328 | * |
||||||
329 | * @return $this |
||||||
330 | */ |
||||||
331 | 4 | public function uniformGranularity(): IndexTaskBuilder |
|||||
332 | { |
||||||
333 | 4 | $this->granularityType = UniformGranularity::class; |
|||||
334 | |||||||
335 | 4 | return $this; |
|||||
336 | } |
||||||
337 | |||||||
338 | /** |
||||||
339 | * Specify that we want to use a ArbitraryGranularity |
||||||
340 | * |
||||||
341 | * @return $this |
||||||
342 | */ |
||||||
343 | 3 | public function arbitraryGranularity(): IndexTaskBuilder |
|||||
344 | { |
||||||
345 | 3 | $this->granularityType = ArbitraryGranularity::class; |
|||||
346 | |||||||
347 | 3 | return $this; |
|||||
348 | } |
||||||
349 | |||||||
350 | /** |
||||||
351 | * Creates segments as additional shards of the latest version, effectively appending to the segment set instead of |
||||||
352 | * replacing it. This means that you can append new segments to any datasource regardless of its original |
||||||
353 | * partitioning scheme. You must use the dynamic partitioning type for the appended segments. If you specify a |
||||||
354 | * different partitioning type, the task fails with an error. |
||||||
355 | * |
||||||
356 | * @param bool $appendToExisting |
||||||
357 | * |
||||||
358 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
359 | */ |
||||||
360 | 6 | public function appendToExisting(bool $appendToExisting = true): IndexTaskBuilder |
|||||
361 | { |
||||||
362 | 6 | $this->appendToExisting = $appendToExisting; |
|||||
363 | |||||||
364 | 6 | return $this; |
|||||
365 | } |
||||||
366 | } |
||||||
367 |