Passed
Push — develop ( 8a0331...fc36e2 )
by nguereza
01:36
created

Etl::transform()   B

Complexity

Conditions 7
Paths 18

Size

Total Lines 33
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
c 0
b 0
f 0
dl 0
loc 33
rs 8.6666
cc 7
nc 18
nop 2
1
<?php
2
3
/**
4
 * Platine ETL
5
 *
6
 * Platine ETL is a library to Extract-Transform-Load Data from various sources
7
 *
8
 * This content is released under the MIT License (MIT)
9
 *
10
 * Copyright (c) 2020 Platine ETL
11
 * Copyright (c) 2019 Benoit POLASZEK
12
 *
13
 * Permission is hereby granted, free of charge, to any person obtaining a copy
14
 * of this software and associated documentation files (the "Software"), to deal
15
 * in the Software without restriction, including without limitation the rights
16
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
 * copies of the Software, and to permit persons to whom the Software is
18
 * furnished to do so, subject to the following conditions:
19
 *
20
 * The above copyright notice and this permission notice shall be included in all
21
 * copies or substantial portions of the Software.
22
 *
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
 * SOFTWARE.
30
 */
31
32
declare(strict_types=1);
33
34
namespace Platine\Etl;
35
36
use EmptyIterator;
37
use Exception;
38
use Generator;
39
use Platine\Etl\Event\BaseEvent;
40
use Platine\Etl\Event\EndEvent;
41
use Platine\Etl\Event\FlushEvent;
42
use Platine\Etl\Event\ItemEvent;
43
use Platine\Etl\Event\ItemExceptionEvent;
44
use Platine\Etl\Event\RollbackEvent;
45
use Platine\Etl\Event\StartEvent;
46
use Platine\Etl\Exception\EtlException;
47
use Platine\Etl\Loader\NullLoader;
48
use Platine\Event\Dispatcher;
49
use Platine\Event\DispatcherInterface;
50
use Throwable;
51
52
/**
53
 * @class Etl
54
 * @package Platine\Etl
55
 */
56
class Etl
57
{
58
    /**
59
     * @var callable|null
60
     */
61
    protected $extract = null;
62
63
    /**
64
     * Used to transform data
65
     * @var callable|null
66
     */
67
    protected $transform = null;
68
69
    /**
70
     * Used to initialize loader
71
     * @var callable|null
72
     */
73
    protected $init = null;
74
75
    /**
76
     * The loader
77
     * @var callable
78
     */
79
    protected $load;
80
81
    /**
82
     * @var callable|null
83
     */
84
    protected $flush = null;
85
86
    /**
87
     * @var callable|null
88
     */
89
    protected $rollback = null;
90
91
    /**
92
     * Total to flush
93
     * @var int|null
94
     */
95
    protected ?int $flushCount = null;
96
97
    /**
98
     * Whether to flush data
99
     * @var bool
100
     */
101
    protected bool $isFlush = false;
102
103
    /**
104
     * Whether to skip data
105
     * @var bool
106
     */
107
    protected bool $isSkip = false;
108
109
    /**
110
     * Whether to stop processing data
111
     * @var bool
112
     */
113
    protected bool $isStop = false;
114
115
    /**
116
     * Whether to rollback data
117
     * @var bool
118
     */
119
    protected bool $isRollback = false;
120
121
    /**
122
     * The event dispatcher
123
     * @var DispatcherInterface
124
     */
125
    protected DispatcherInterface $dispatcher;
126
127
    /**
128
     * Additional options
129
     * @var array<string, mixed>
130
     */
131
    protected array $options = [];
132
133
    /**
134
     * Create new instance
135
     * @param callable|null $extract
136
     * @param callable|null $transform
137
     * @param callable|null $init
138
     * @param callable|null $load
139
     * @param callable|null $flush
140
     * @param callable|null $rollback
141
     * @param int|null $flushCount
142
     * @param DispatcherInterface|null $dispatcher
143
     */
144
    public function __construct(
145
        ?callable $extract = null,
146
        ?callable $transform = null,
147
        ?callable $init = null,
148
        ?callable $load = null,
149
        ?callable $flush = null,
150
        ?callable $rollback = null,
151
        ?int $flushCount = null,
152
        ?DispatcherInterface $dispatcher = null
153
    ) {
154
        $this->extract = $extract;
155
        $this->transform = $transform ?? $this->defaultTransformer();
156
        $this->init = $init;
157
        if ($load === null) {
158
            $nullLoader = new NullLoader();
159
            $load = [$nullLoader, 'load'];
160
        }
161
        $this->load = $load;
162
        $this->flush = $flush;
163
        $this->rollback = $rollback;
164
        $this->flushCount = $flushCount !== null ? max(1, $flushCount) : null;
165
        $this->dispatcher = $dispatcher ?? new Dispatcher();
166
    }
167
168
    /**
169
     * Return the additional options
170
     * @return array<string, mixed>
171
     */
172
    public function getOptions(): array
173
    {
174
        return $this->options;
175
    }
176
177
    /**
178
     * Set the additional options
179
     * @param array<string, mixed> $options
180
     * @return $this
181
     */
182
    public function setOptions(array $options): self
183
    {
184
        $this->options = $options;
185
        return $this;
186
    }
187
188
189
    /**
190
     * Run the ETL on the given input.
191
     * @param mixed|null $data
192
     * @param array<string, mixed> $options additional options
193
     * @return void
194
     */
195
    public function process($data = null, array $options = []): void
196
    {
197
        if (count($options) > 0) {
198
            $this->options = $options;
199
        }
200
201
        $flushCounter = 0;
202
        $totalCounter = 0;
203
204
        $this->start();
205
        foreach ($this->extract($data) as $key => $item) {
206
            if ($this->isSkip) {
207
                $this->skip($item, $key);
208
                continue;
209
            }
210
211
            if ($this->isStop) {
212
                $this->stop($item, $key);
213
                break;
214
            }
215
216
            $transformed = $this->transform($item, $key);
217
218
            if ($this->isSkip) {
219
                $this->skip($item, $key);
220
                continue;
221
            }
222
223
            if ($this->isStop) {
224
                $this->stop($item, $key);
225
                break;
226
            }
227
228
            $flushCounter++;
229
            $totalCounter++;
230
231
            if ($totalCounter === 1) {
232
                $this->initLoader($item, $key);
233
            }
234
235
            $needFlush = ($this->flushCount === null ? false : (($totalCounter % $this->flushCount) === 0));
236
            $this->load($transformed(), $item, $key, $needFlush, $flushCounter, $totalCounter);
237
        }
238
239
        $this->end($flushCounter, $totalCounter);
240
    }
241
242
    /**
243
     * Ask the ETl to stop.
244
     * @param bool $isRollback if the loader should rollback instead of flushing.
245
     * @return void
246
     */
247
    public function stopProcess(bool $isRollback = false): void
248
    {
249
        $this->isStop = true;
250
        $this->isRollback = $isRollback;
251
    }
252
253
    /**
254
     * Mark the current item to be skipped.
255
     * @return void
256
     */
257
    public function skipCurrentItem(): void
258
    {
259
        $this->isSkip = true;
260
    }
261
262
    /**
263
     * Ask the loader to trigger flush ASAP.
264
     * @return void
265
     */
266
    public function triggerFlush(): void
267
    {
268
        $this->isFlush = true;
269
    }
270
271
    /**
272
     * Process item skip.
273
     * @param mixed $item
274
     * @param int|string $key
275
     * @return void
276
     */
277
    protected function skip($item, $key): void
278
    {
279
        $this->isSkip = false;
280
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::SKIP, $item, $key, $this));
281
    }
282
283
    /**
284
     * @param mixed $item
285
     * @param int|string $key
286
     * @return void
287
     */
288
    protected function stop($item, $key): void
289
    {
290
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::STOP, $item, $key, $this));
291
    }
292
293
    /**
294
     * Start processing
295
     * @return void
296
     */
297
    protected function start(): void
298
    {
299
        $this->reset();
300
        $this->dispatcher->dispatch(new StartEvent($this));
301
    }
302
303
    /**
304
     * reset ETL
305
     * @return void
306
     */
307
    protected function reset(): void
308
    {
309
        $this->isFlush = false;
310
        $this->isSkip = false;
311
        $this->isRollback = false;
312
        $this->isStop = false;
313
    }
314
315
    /**
316
     * Extract data.
317
     * @param mixed $data
318
     * @return iterable<int|string, mixed>
319
     */
320
    protected function extract($data): iterable
321
    {
322
        $options = $this->options;
323
        if (isset($options['extract'])) {
324
            $options = array_merge($options, $options['extract']);
325
            unset($options['extract']);
326
        }
327
328
        $items = $this->extract === null ? $data : ($this->extract)($data, $this, $options);
329
        if ($items === null) {
330
            $items = new EmptyIterator();
331
        }
332
333
        if (is_iterable($items) === false) {
334
            throw new EtlException('Could not extract data');
335
        }
336
337
        try {
338
            foreach ($items as $key => $item) {
339
                try {
340
                    $this->isSkip = false;
341
                    $this->dispatcher->dispatch(new ItemEvent(BaseEvent::EXTRACT, $item, $key, $this));
342
                    yield $key => $item;
343
                } catch (Exception $e) {
344
                    continue;
345
                }
346
            }
347
        } catch (Throwable $e) {
348
            /** @var ItemExceptionEvent $event */
349
            $event = $this->dispatcher->dispatch(
350
                new ItemExceptionEvent(BaseEvent::EXTRACT_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
351
            );
352
353
            if ($event->shouldThrowException()) {
354
                throw $e;
355
            }
356
        }
357
    }
358
359
    /**
360
     * Transform data.
361
     * @param mixed $item
362
     * @param int|string $key
363
     * @return callable
364
     */
365
    protected function transform($item, $key): callable
366
    {
367
        $options = $this->options;
368
        if (isset($options['transform'])) {
369
            $options = array_merge($options, $options['transform']);
370
            unset($options['transform']);
371
        }
372
373
        $tranformed = ($this->transform)($item, $key, $this, $options);
374
        if (!$tranformed instanceof Generator) {
375
            throw new EtlException('The transformer must return a generator');
376
        }
377
378
        $output = [];
379
        try {
380
            foreach ($tranformed as $key => $value) {
0 ignored issues
show
introduced by
$key is overwriting one of the parameters of this function.
Loading history...
381
                $output[] = [$key, $value];
382
            }
383
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, $key, $this));
0 ignored issues
show
Bug introduced by
It seems like $key can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEvent::__construct() does only seem to accept integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

383
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, /** @scrutinizer ignore-type */ $key, $this));
Loading history...
384
        } catch (Exception $e) {
385
           /** @var ItemExceptionEvent $event */
386
            $event = $this->dispatcher->dispatch(
387
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
0 ignored issues
show
Bug introduced by
It seems like $key ?? null can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEx...ionEvent::__construct() does only seem to accept integer|null|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

387
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, /** @scrutinizer ignore-type */ $key ?? null, $this, $e)
Loading history...
388
            );
389
390
            if ($event->shouldThrowException()) {
391
                throw $e;
392
            }
393
        }
394
395
        return static function () use ($output) {
396
            foreach ($output as [$key, $value]) {
397
                yield $key => $value;
398
            }
399
        };
400
    }
401
402
    /**
403
     * Init the loader on the 1st item.
404
     * @param mixed $item
405
     * @param int|string $key
406
     * @return void
407
     */
408
    protected function initLoader($item, $key): void
409
    {
410
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOADER_INIT, $item, $key, $this));
411
        if ($this->init === null) {
412
            return;
413
        }
414
415
        $options = $this->options;
416
        if (isset($options['loader'])) {
417
            $options = array_merge($options, $options['loader']);
418
            unset($options['loader']);
419
        }
420
421
        ($this->init)($options);
422
    }
423
424
    /**
425
     * Load data.
426
     * @param iterable<int|string, mixed> $data
427
     * @param mixed $item
428
     * @param int|string $key
429
     * @param bool $flush
430
     * @param int $flushCounter
431
     * @param int $totalCounter
432
     * @return void
433
     */
434
    protected function load(
435
        iterable $data,
436
        $item,
437
        $key,
438
        bool $flush,
439
        int &$flushCounter,
440
        int &$totalCounter
441
    ): void {
442
        try {
443
            ($this->load)($data, $key, $this);
444
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOAD, $item, $key, $this));
445
        } catch (Throwable $e) {
446
            /** @var ItemExceptionEvent $event */
447
            $event = $this->dispatcher->dispatch(
448
                new ItemExceptionEvent(BaseEvent::LOAD_EXCEPTION, $item ?? null, $key, $this, $e)
449
            );
450
451
            if ($event->shouldThrowException()) {
452
                throw $e;
453
            }
454
455
            $flushCounter--;
456
            $totalCounter--;
457
        }
458
459
        $needFlush = $this->isFlush || $flush;
460
        if ($needFlush) {
461
            $this->flush($flushCounter, true);
462
        }
463
    }
464
465
    /**
466
     * Flush element
467
     * @param int $flushCounter
468
     * @param bool $partial
469
     * @return void
470
     */
471
    protected function flush(int &$flushCounter, bool $partial): void
472
    {
473
        if ($this->flush === null) {
474
            return;
475
        }
476
        ($this->flush)($partial);
477
        $this->dispatcher->dispatch(new FlushEvent($this, $flushCounter, $partial));
478
        $flushCounter = 0;
479
        $this->isFlush = false;
480
    }
481
482
    /**
483
     * Restore loader's initial state.
484
     * @param int $flushCounter
485
     * @return void
486
     */
487
    protected function rollback(int &$flushCounter): void
488
    {
489
        if ($this->rollback === null) {
490
            return;
491
        }
492
        ($this->rollback)();
493
        $this->dispatcher->dispatch(new RollbackEvent($this, $flushCounter));
494
        $flushCounter = 0;
495
    }
496
497
    /**
498
     * Process the end of the ETL.
499
     * @param int $flushCounter
500
     * @param int $totalCounter
501
     * @return void
502
     */
503
    protected function end(int $flushCounter, int $totalCounter): void
504
    {
505
        if ($this->isRollback) {
506
            $this->rollback($flushCounter);
507
            $totalCounter = max(0, $totalCounter - $flushCounter);
508
        } else {
509
            $this->flush($flushCounter, false);
510
        }
511
        $this->dispatcher->dispatch(new EndEvent($this, (int) $totalCounter));
512
        $this->reset();
513
    }
514
515
    /**
516
     * The default transformer to use if none is set
517
     * @return callable
518
     */
519
    protected function defaultTransformer(): callable
520
    {
521
        return function ($item, $key): Generator {
522
            yield $key => $item;
523
        };
524
    }
525
}
526