level23 /
druid-client
| 1 | <?php |
||||||
| 2 | declare(strict_types=1); |
||||||
| 3 | |||||||
| 4 | namespace Level23\Druid\Tasks; |
||||||
| 5 | |||||||
| 6 | use Closure; |
||||||
| 7 | use InvalidArgumentException; |
||||||
| 8 | use Level23\Druid\DruidClient; |
||||||
| 9 | use Level23\Druid\Types\DataType; |
||||||
| 10 | use Level23\Druid\Context\TaskContext; |
||||||
| 11 | use Level23\Druid\Concerns\HasInterval; |
||||||
| 12 | use Level23\Druid\Concerns\HasInputFormat; |
||||||
| 13 | use Level23\Druid\Concerns\HasAggregations; |
||||||
| 14 | use Level23\Druid\Concerns\HasTuningConfig; |
||||||
| 15 | use Level23\Druid\Transforms\TransformSpec; |
||||||
| 16 | use Level23\Druid\Dimensions\TimestampSpec; |
||||||
| 17 | use Level23\Druid\Types\MultiValueHandling; |
||||||
| 18 | use Level23\Druid\Transforms\TransformBuilder; |
||||||
| 19 | use Level23\Druid\Dimensions\SpatialDimension; |
||||||
| 20 | use Level23\Druid\Concerns\HasQueryGranularity; |
||||||
| 21 | use Level23\Druid\InputSources\DruidInputSource; |
||||||
| 22 | use Level23\Druid\Collections\IntervalCollection; |
||||||
| 23 | use Level23\Druid\Concerns\HasSegmentGranularity; |
||||||
| 24 | use Level23\Druid\Collections\TransformCollection; |
||||||
| 25 | use Level23\Druid\Granularities\UniformGranularity; |
||||||
| 26 | use Level23\Druid\Collections\AggregationCollection; |
||||||
| 27 | use Level23\Druid\InputSources\InputSourceInterface; |
||||||
| 28 | use Level23\Druid\InputFormats\InputFormatInterface; |
||||||
| 29 | use Level23\Druid\Granularities\ArbitraryGranularity; |
||||||
| 30 | use Level23\Druid\Collections\SpatialDimensionCollection; |
||||||
| 31 | |||||||
| 32 | class IndexTaskBuilder extends TaskBuilder |
||||||
| 33 | { |
||||||
| 34 | use HasSegmentGranularity, HasQueryGranularity, HasInterval, HasTuningConfig, HasAggregations, HasInputFormat; |
||||||
| 35 | |||||||
| 36 | /** |
||||||
| 37 | * @var array<array<string,string|bool>> |
||||||
| 38 | */ |
||||||
| 39 | protected array $dimensions = []; |
||||||
| 40 | |||||||
| 41 | protected SpatialDimensionCollection $spatialDimensions; |
||||||
| 42 | |||||||
| 43 | /** |
||||||
| 44 | * The data source where we will write to. |
||||||
| 45 | * |
||||||
| 46 | * @var string |
||||||
| 47 | */ |
||||||
| 48 | protected string $dataSource; |
||||||
| 49 | |||||||
| 50 | protected ?InputSourceInterface $inputSource; |
||||||
| 51 | |||||||
| 52 | protected bool $rollup = false; |
||||||
| 53 | |||||||
| 54 | /** |
||||||
| 55 | * Whether this task should be executed parallel. |
||||||
| 56 | * |
||||||
| 57 | * @var bool |
||||||
| 58 | */ |
||||||
| 59 | protected bool $parallel = false; |
||||||
| 60 | |||||||
| 61 | protected ?TransformSpec $transformSpec = null; |
||||||
| 62 | |||||||
| 63 | protected ?TimestampSpec $timestampSpec = null; |
||||||
| 64 | |||||||
| 65 | /** |
||||||
| 66 | * Here we remember which type of granularity we want. |
||||||
| 67 | * By default, this is UniformGranularity. |
||||||
| 68 | * |
||||||
| 69 | * @var string |
||||||
| 70 | */ |
||||||
| 71 | protected string $granularityType = UniformGranularity::class; |
||||||
| 72 | |||||||
| 73 | protected ?InputFormatInterface $inputFormat = null; |
||||||
| 74 | |||||||
| 75 | protected bool $appendToExisting = false; |
||||||
| 76 | |||||||
| 77 | /** |
||||||
| 78 | * IndexTaskBuilder constructor. |
||||||
| 79 | * |
||||||
| 80 | * @param DruidClient $druidClient |
||||||
| 81 | * @param string $toDataSource Data source where the data will be imported in. |
||||||
| 82 | * @param InputSourceInterface|null $inputSource |
||||||
| 83 | */ |
||||||
| 84 | 34 | public function __construct( |
|||||
| 85 | DruidClient $druidClient, |
||||||
| 86 | string $toDataSource, |
||||||
| 87 | ?InputSourceInterface $inputSource = null |
||||||
| 88 | ) { |
||||||
| 89 | 34 | $this->client = $druidClient; |
|||||
| 90 | 34 | $this->dataSource = $toDataSource; |
|||||
| 91 | 34 | $this->inputSource = $inputSource; |
|||||
| 92 | 34 | $this->spatialDimensions = new SpatialDimensionCollection(); |
|||||
| 93 | } |
||||||
| 94 | |||||||
| 95 | /** |
||||||
| 96 | * Add a dimension. |
||||||
| 97 | * |
||||||
| 98 | * @param string $name |
||||||
| 99 | * @param string|DataType $type |
||||||
| 100 | * |
||||||
| 101 | * @return $this |
||||||
| 102 | */ |
||||||
| 103 | 1 | public function dimension(string $name, string|DataType $type = DataType::STRING): IndexTaskBuilder |
|||||
| 104 | { |
||||||
| 105 | 1 | $this->dimensions[] = [ |
|||||
| 106 | 1 | 'name' => $name, |
|||||
| 107 | 1 | 'type' => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value, |
|||||
| 108 | 1 | ]; |
|||||
| 109 | |||||||
| 110 | 1 | return $this; |
|||||
| 111 | } |
||||||
| 112 | |||||||
| 113 | /** |
||||||
| 114 | * Add a multi-value dimension. |
||||||
| 115 | * |
||||||
| 116 | * @param string $name |
||||||
| 117 | * @param string|DataType $type |
||||||
| 118 | * @param string|MultiValueHandling $multiValueHandling $type |
||||||
| 119 | * @param bool $createBitmapIndex |
||||||
| 120 | * |
||||||
| 121 | * @return $this |
||||||
| 122 | */ |
||||||
| 123 | 2 | public function multiValueDimension( |
|||||
| 124 | string $name, |
||||||
| 125 | string|DataType $type = DataType::STRING, |
||||||
| 126 | string|MultiValueHandling $multiValueHandling = MultiValueHandling::SORTED_ARRAY, |
||||||
| 127 | bool $createBitmapIndex = true |
||||||
| 128 | ): IndexTaskBuilder { |
||||||
| 129 | 2 | $this->dimensions[] = [ |
|||||
| 130 | 2 | 'name' => $name, |
|||||
| 131 | 2 | 'type' => (is_string($type) ? DataType::from(strtolower($type)) : $type)->value, |
|||||
| 132 | 2 | 'multiValueHandling' => (is_string($multiValueHandling) ? MultiValueHandling::from(strtoupper($multiValueHandling)) : $multiValueHandling)->value, |
|||||
| 133 | 2 | 'createBitmapIndex' => $createBitmapIndex, |
|||||
| 134 | 2 | ]; |
|||||
| 135 | |||||||
| 136 | 2 | return $this; |
|||||
| 137 | } |
||||||
| 138 | |||||||
| 139 | /** |
||||||
| 140 | * Add a spatial dimension. |
||||||
| 141 | * |
||||||
| 142 | * @param string $name Name of the dimension. |
||||||
| 143 | * @param string[] $dims Field names where latitude,longitude data are read from. |
||||||
| 144 | * |
||||||
| 145 | * @return $this |
||||||
| 146 | */ |
||||||
| 147 | 1 | public function spatialDimension(string $name, array $dims): IndexTaskBuilder |
|||||
| 148 | { |
||||||
| 149 | 1 | $this->spatialDimensions->add(new SpatialDimension($name, $dims)); |
|||||
| 150 | |||||||
| 151 | 1 | return $this; |
|||||
| 152 | } |
||||||
| 153 | |||||||
| 154 | /** |
||||||
| 155 | * @param string $column Input row field to read the primary timestamp from. Regardless of the name of |
||||||
| 156 | * this input field, the primary timestamp will always be stored as a column named |
||||||
| 157 | * __time in your Druid datasource. |
||||||
| 158 | * @param string $format Timestamp format. Options are: |
||||||
| 159 | * - iso: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456" |
||||||
| 160 | * - posix: seconds since epoch |
||||||
| 161 | * - millis: milliseconds since epoch |
||||||
| 162 | * - micro: microseconds since epoch |
||||||
| 163 | * - nano: nanoseconds since epoch |
||||||
| 164 | * - auto: automatically detects ISO (either 'T' or space separator) or millis |
||||||
| 165 | * format |
||||||
| 166 | * - any Joda DateTimeFormat string |
||||||
| 167 | * @param null|string $missingValue Timestamp to use for input records that have a null or missing timestamp |
||||||
| 168 | * column. Should be in ISO8601 format, like "2000-01-01T01:02:03.456", even if |
||||||
| 169 | * you have specified something else for format. Since Druid requires a primary |
||||||
| 170 | * timestamp, this setting can be useful for ingesting datasets that do not have |
||||||
| 171 | * any per-record timestamps at all. |
||||||
| 172 | * |
||||||
| 173 | * @return $this |
||||||
| 174 | */ |
||||||
| 175 | 9 | public function timestamp(string $column, string $format, ?string $missingValue = null): IndexTaskBuilder |
|||||
| 176 | { |
||||||
| 177 | 9 | $this->timestampSpec = new TimestampSpec($column, $format, $missingValue); |
|||||
| 178 | |||||||
| 179 | 9 | return $this; |
|||||
| 180 | } |
||||||
| 181 | |||||||
| 182 | /** |
||||||
| 183 | * @param \Level23\Druid\Context\TaskContext|array<string,string|int|bool> $context |
||||||
| 184 | * |
||||||
| 185 | * @return \Level23\Druid\Tasks\TaskInterface |
||||||
| 186 | */ |
||||||
| 187 | 11 | protected function buildTask(array|TaskContext $context): TaskInterface |
|||||
| 188 | { |
||||||
| 189 | 11 | if (is_array($context)) { |
|||||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||||||
| 190 | 11 | $context = new TaskContext($context); |
|||||
| 191 | } |
||||||
| 192 | |||||||
| 193 | 11 | if ($this->queryGranularity === null) { |
|||||
| 194 | 1 | throw new InvalidArgumentException('You have to specify a queryGranularity value!'); |
|||||
| 195 | } |
||||||
| 196 | |||||||
| 197 | 10 | if ($this->interval === null) { |
|||||
| 198 | 1 | throw new InvalidArgumentException('You have to specify an interval!'); |
|||||
| 199 | } |
||||||
| 200 | |||||||
| 201 | 9 | if ($this->timestampSpec === null) { |
|||||
| 202 | 1 | throw new InvalidArgumentException('You have to specify an timestamp column!'); |
|||||
| 203 | } |
||||||
| 204 | |||||||
| 205 | 8 | if ($this->granularityType === ArbitraryGranularity::class) { |
|||||
| 206 | 1 | $granularity = new ArbitraryGranularity( |
|||||
| 207 | 1 | $this->queryGranularity, |
|||||
| 208 | 1 | $this->rollup, |
|||||
| 209 | 1 | new IntervalCollection($this->interval) |
|||||
| 210 | 1 | ); |
|||||
| 211 | } else { |
||||||
| 212 | 7 | if ($this->segmentGranularity === null) { |
|||||
| 213 | 1 | throw new InvalidArgumentException('You have to specify a segmentGranularity value!'); |
|||||
| 214 | } |
||||||
| 215 | |||||||
| 216 | 6 | $granularity = new UniformGranularity( |
|||||
| 217 | 6 | $this->segmentGranularity, |
|||||
| 218 | 6 | $this->queryGranularity, |
|||||
| 219 | 6 | $this->rollup, |
|||||
| 220 | 6 | new IntervalCollection($this->interval) |
|||||
| 221 | 6 | ); |
|||||
| 222 | } |
||||||
| 223 | |||||||
| 224 | // No input source known? |
||||||
| 225 | 7 | if (!isset($this->inputSource)) { |
|||||
| 226 | 3 | throw new InvalidArgumentException( |
|||||
| 227 | 3 | 'No InputSource known. You have to supply an input source!.' |
|||||
| 228 | 3 | ); |
|||||
| 229 | } |
||||||
| 230 | |||||||
| 231 | // Do we want to read data from duid? And no interval set yet? Then fill it. We assume this is a reindex task. |
||||||
| 232 | 4 | if ($this->inputSource instanceof DruidInputSource && $this->inputSource->getInterval() === null) { |
|||||
|
0 ignored issues
–
show
The method
getInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
The method
getInterval() does not exist on null.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 233 | 2 | $this->inputSource->setInterval($this->interval); |
|||||
|
0 ignored issues
–
show
The method
setInterval() does not exist on Level23\Druid\InputSources\InputSourceInterface. It seems like you code against a sub-type of Level23\Druid\InputSources\InputSourceInterface such as Level23\Druid\InputSources\DruidInputSource.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
| 234 | } |
||||||
| 235 | |||||||
| 236 | 4 | $task = new IndexTask( |
|||||
| 237 | 4 | $this->dataSource, |
|||||
| 238 | 4 | $this->inputSource, |
|||||
|
0 ignored issues
–
show
It seems like
$this->inputSource can also be of type null; however, parameter $inputSource of Level23\Druid\Tasks\IndexTask::__construct() does only seem to accept Level23\Druid\InputSources\InputSourceInterface, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
| 239 | 4 | $granularity, |
|||||
| 240 | 4 | $this->timestampSpec, |
|||||
| 241 | 4 | $this->transformSpec, |
|||||
| 242 | 4 | $this->tuningConfig, |
|||||
| 243 | 4 | $context, |
|||||
| 244 | 4 | new AggregationCollection(... $this->aggregations), |
|||||
| 245 | 4 | $this->dimensions, |
|||||
| 246 | 4 | $this->taskId, |
|||||
| 247 | 4 | $this->inputFormat, |
|||||
| 248 | 4 | $this->spatialDimensions |
|||||
| 249 | 4 | ); |
|||||
| 250 | |||||||
| 251 | 4 | if ($this->parallel) { |
|||||
| 252 | 3 | $task->setParallel($this->parallel); |
|||||
| 253 | } |
||||||
| 254 | |||||||
| 255 | 4 | if ($this->appendToExisting) { |
|||||
| 256 | 3 | $task->setAppendToExisting($this->appendToExisting); |
|||||
| 257 | } |
||||||
| 258 | |||||||
| 259 | 4 | return $task; |
|||||
| 260 | } |
||||||
| 261 | |||||||
| 262 | /** |
||||||
| 263 | * Call this with a closure. Your closure will receive a TransformBuilder, which allows you to |
||||||
| 264 | * specify a transform which needs to be applied when this indexing job is executed. Optionally you can |
||||||
| 265 | * also specify a filter on which records this transform needs to be applied. |
||||||
| 266 | * |
||||||
| 267 | * Note: calling this method more than once will overwrite the previous data! |
||||||
| 268 | * |
||||||
| 269 | * @param \Closure $transformBuilder |
||||||
| 270 | * |
||||||
| 271 | * @return $this |
||||||
| 272 | */ |
||||||
| 273 | 4 | public function transform(Closure $transformBuilder): IndexTaskBuilder |
|||||
| 274 | { |
||||||
| 275 | 4 | $builder = new TransformBuilder(); |
|||||
| 276 | 4 | call_user_func($transformBuilder, $builder); |
|||||
| 277 | |||||||
| 278 | 4 | if (!$builder->getTransforms() && !$builder->getFilter()) { |
|||||
| 279 | 2 | return $this; |
|||||
| 280 | } |
||||||
| 281 | |||||||
| 282 | 2 | $this->transformSpec = new TransformSpec( |
|||||
| 283 | 2 | new TransformCollection(...$builder->getTransforms()), |
|||||
| 284 | 2 | $builder->getFilter() |
|||||
| 285 | 2 | ); |
|||||
| 286 | |||||||
| 287 | 2 | return $this; |
|||||
| 288 | } |
||||||
| 289 | |||||||
| 290 | /** |
||||||
| 291 | * Enable rollup mode |
||||||
| 292 | * |
||||||
| 293 | * @return $this |
||||||
| 294 | */ |
||||||
| 295 | 2 | public function rollup(): IndexTaskBuilder |
|||||
| 296 | { |
||||||
| 297 | 2 | $this->rollup = true; |
|||||
| 298 | |||||||
| 299 | 2 | return $this; |
|||||
| 300 | } |
||||||
| 301 | |||||||
| 302 | /** |
||||||
| 303 | * @param \Level23\Druid\InputSources\InputSourceInterface $inputSource |
||||||
| 304 | * |
||||||
| 305 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
| 306 | */ |
||||||
| 307 | 2 | public function inputSource(InputSourceInterface $inputSource): IndexTaskBuilder |
|||||
| 308 | { |
||||||
| 309 | 2 | $this->inputSource = $inputSource; |
|||||
| 310 | |||||||
| 311 | 2 | return $this; |
|||||
| 312 | } |
||||||
| 313 | |||||||
| 314 | /** |
||||||
| 315 | * Execute this index task as parallel batch. |
||||||
| 316 | * |
||||||
| 317 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
| 318 | */ |
||||||
| 319 | 5 | public function parallel(): IndexTaskBuilder |
|||||
| 320 | { |
||||||
| 321 | 5 | $this->parallel = true; |
|||||
| 322 | |||||||
| 323 | 5 | return $this; |
|||||
| 324 | } |
||||||
| 325 | |||||||
| 326 | /** |
||||||
| 327 | * Specify that we want to use a UniformGranularity |
||||||
| 328 | * |
||||||
| 329 | * @return $this |
||||||
| 330 | */ |
||||||
| 331 | 4 | public function uniformGranularity(): IndexTaskBuilder |
|||||
| 332 | { |
||||||
| 333 | 4 | $this->granularityType = UniformGranularity::class; |
|||||
| 334 | |||||||
| 335 | 4 | return $this; |
|||||
| 336 | } |
||||||
| 337 | |||||||
| 338 | /** |
||||||
| 339 | * Specify that we want to use a ArbitraryGranularity |
||||||
| 340 | * |
||||||
| 341 | * @return $this |
||||||
| 342 | */ |
||||||
| 343 | 3 | public function arbitraryGranularity(): IndexTaskBuilder |
|||||
| 344 | { |
||||||
| 345 | 3 | $this->granularityType = ArbitraryGranularity::class; |
|||||
| 346 | |||||||
| 347 | 3 | return $this; |
|||||
| 348 | } |
||||||
| 349 | |||||||
| 350 | /** |
||||||
| 351 | * Creates segments as additional shards of the latest version, effectively appending to the segment set instead of |
||||||
| 352 | * replacing it. This means that you can append new segments to any datasource regardless of its original |
||||||
| 353 | * partitioning scheme. You must use the dynamic partitioning type for the appended segments. If you specify a |
||||||
| 354 | * different partitioning type, the task fails with an error. |
||||||
| 355 | * |
||||||
| 356 | * @param bool $appendToExisting |
||||||
| 357 | * |
||||||
| 358 | * @return \Level23\Druid\Tasks\IndexTaskBuilder |
||||||
| 359 | */ |
||||||
| 360 | 6 | public function appendToExisting(bool $appendToExisting = true): IndexTaskBuilder |
|||||
| 361 | { |
||||||
| 362 | 6 | $this->appendToExisting = $appendToExisting; |
|||||
| 363 | |||||||
| 364 | 6 | return $this; |
|||||
| 365 | } |
||||||
| 366 | } |
||||||
| 367 |