Etl::flush()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
c 0
b 0
f 0
dl 0
loc 13
rs 9.9666
cc 2
nc 2
nop 2
1
<?php
2
3
/**
4
 * Platine ETL
5
 *
6
 * Platine ETL is a library to Extract-Transform-Load Data from various sources
7
 *
8
 * This content is released under the MIT License (MIT)
9
 *
10
 * Copyright (c) 2020 Platine ETL
11
 * Copyright (c) 2019 Benoit POLASZEK
12
 *
13
 * Permission is hereby granted, free of charge, to any person obtaining a copy
14
 * of this software and associated documentation files (the "Software"), to deal
15
 * in the Software without restriction, including without limitation the rights
16
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
 * copies of the Software, and to permit persons to whom the Software is
18
 * furnished to do so, subject to the following conditions:
19
 *
20
 * The above copyright notice and this permission notice shall be included in all
21
 * copies or substantial portions of the Software.
22
 *
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
 * SOFTWARE.
30
 */
31
32
declare(strict_types=1);
33
34
namespace Platine\Etl;
35
36
use EmptyIterator;
37
use Exception;
38
use Generator;
39
use Platine\Etl\Event\BaseEvent;
40
use Platine\Etl\Event\EndEvent;
41
use Platine\Etl\Event\FlushEvent;
42
use Platine\Etl\Event\ItemEvent;
43
use Platine\Etl\Event\ItemExceptionEvent;
44
use Platine\Etl\Event\RollbackEvent;
45
use Platine\Etl\Event\StartEvent;
46
use Platine\Etl\Exception\EtlException;
47
use Platine\Etl\Loader\NullLoader;
48
use Platine\Event\Dispatcher;
49
use Platine\Event\DispatcherInterface;
50
use Throwable;
51
52
/**
53
 * @class Etl
54
 * @package Platine\Etl
55
 */
56
class Etl
57
{
58
    /**
59
     * @var callable|null
60
     */
61
    protected $extract = null;
62
63
    /**
64
     * Used to transform data
65
     * @var callable|null
66
     */
67
    protected $transform = null;
68
69
    /**
70
     * Used to initialize loader
71
     * @var callable|null
72
     */
73
    protected $init = null;
74
75
    /**
76
     * The loader
77
     * @var callable
78
     */
79
    protected $load;
80
81
    /**
82
     * @var callable|null
83
     */
84
    protected $flush = null;
85
86
    /**
87
     * @var callable|null
88
     */
89
    protected $rollback = null;
90
91
    /**
92
     * Total to flush
93
     * @var int|null
94
     */
95
    protected ?int $flushCount = null;
96
97
    /**
98
     * Whether to flush data
99
     * @var bool
100
     */
101
    protected bool $isFlush = false;
102
103
    /**
104
     * Whether to skip data
105
     * @var bool
106
     */
107
    protected bool $isSkip = false;
108
109
    /**
110
     * Whether to stop processing data
111
     * @var bool
112
     */
113
    protected bool $isStop = false;
114
115
    /**
116
     * Whether to rollback data
117
     * @var bool
118
     */
119
    protected bool $isRollback = false;
120
121
    /**
122
     * The event dispatcher
123
     * @var DispatcherInterface
124
     */
125
    protected DispatcherInterface $dispatcher;
126
127
    /**
128
     * Additional options
129
     * @var array<string, mixed>
130
     */
131
    protected array $options = [];
132
133
    /**
134
     * Create new instance
135
     * @param callable|null $extract
136
     * @param callable|null $transform
137
     * @param callable|null $init
138
     * @param callable|null $load
139
     * @param callable|null $flush
140
     * @param callable|null $rollback
141
     * @param int|null $flushCount
142
     * @param DispatcherInterface|null $dispatcher
143
     */
144
    public function __construct(
145
        ?callable $extract = null,
146
        ?callable $transform = null,
147
        ?callable $init = null,
148
        ?callable $load = null,
149
        ?callable $flush = null,
150
        ?callable $rollback = null,
151
        ?int $flushCount = null,
152
        ?DispatcherInterface $dispatcher = null
153
    ) {
154
        $this->extract = $extract;
155
        $this->transform = $transform ?? $this->defaultTransformer();
156
        $this->init = $init;
157
        if ($load === null) {
158
            $nullLoader = new NullLoader();
159
            $load = [$nullLoader, 'load'];
160
        }
161
        $this->load = $load;
162
        $this->flush = $flush;
163
        $this->rollback = $rollback;
164
        $this->flushCount = $flushCount !== null ? max(1, $flushCount) : null;
165
        $this->dispatcher = $dispatcher ?? new Dispatcher();
166
    }
167
168
    /**
169
     * Return the additional options
170
     * @return array<string, mixed>
171
     */
172
    public function getOptions(): array
173
    {
174
        return $this->options;
175
    }
176
177
    /**
178
     * Set the additional options
179
     * @param array<string, mixed> $options
180
     * @return $this
181
     */
182
    public function setOptions(array $options): self
183
    {
184
        $this->options = $options;
185
        return $this;
186
    }
187
188
189
    /**
190
     * Run the ETL on the given input.
191
     * @param mixed $data
192
     * @param array<string, mixed> $options additional options
193
     * @return void
194
     */
195
    public function process(mixed $data = null, array $options = []): void
196
    {
197
        if (count($options) > 0) {
198
            $this->options = $options;
199
        }
200
201
        $flushCounter = 0;
202
        $totalCounter = 0;
203
204
        $this->start();
205
        foreach ($this->extract($data) as $key => $item) {
206
            if ($this->isSkip) {
207
                $this->skip($item, $key);
208
                continue;
209
            }
210
211
            if ($this->isStop) {
212
                $this->stop($item, $key);
213
                break;
214
            }
215
216
            $transformed = $this->transform($item, $key);
217
218
            if ($this->isSkip) {
219
                $this->skip($item, $key);
220
                continue;
221
            }
222
223
            if ($this->isStop) {
224
                $this->stop($item, $key);
225
                break;
226
            }
227
228
            $flushCounter++;
229
            $totalCounter++;
230
231
            if ($totalCounter === 1) {
232
                $this->initLoader($item, $key);
233
            }
234
235
            $needFlush = ($this->flushCount === null ? false : (($totalCounter % $this->flushCount) === 0));
236
            $this->load($transformed(), $item, $key, $needFlush, $flushCounter, $totalCounter);
237
        }
238
239
        $this->end($flushCounter, $totalCounter);
240
    }
241
242
    /**
243
     * Ask the ETl to stop.
244
     * @param bool $isRollback if the loader should rollback instead of flushing.
245
     * @return void
246
     */
247
    public function stopProcess(bool $isRollback = false): void
248
    {
249
        $this->isStop = true;
250
        $this->isRollback = $isRollback;
251
    }
252
253
    /**
254
     * Mark the current item to be skipped.
255
     * @return void
256
     */
257
    public function skipCurrentItem(): void
258
    {
259
        $this->isSkip = true;
260
    }
261
262
    /**
263
     * Ask the loader to trigger flush ASAP.
264
     * @return void
265
     */
266
    public function triggerFlush(): void
267
    {
268
        $this->isFlush = true;
269
    }
270
271
    /**
272
     * Process item skip.
273
     * @param mixed $item
274
     * @param int|string $key
275
     * @return void
276
     */
277
    protected function skip(mixed $item, int|string $key): void
278
    {
279
        $this->isSkip = false;
280
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::SKIP, $item, $key, $this));
281
    }
282
283
    /**
284
     * @param mixed $item
285
     * @param int|string $key
286
     * @return void
287
     */
288
    protected function stop(mixed $item, int|string $key): void
289
    {
290
        $this->dispatcher->dispatch(new ItemEvent(
291
            BaseEvent::STOP,
292
            $item,
293
            $key,
294
            $this
295
        ));
296
    }
297
298
    /**
299
     * Start processing
300
     * @return void
301
     */
302
    protected function start(): void
303
    {
304
        $this->reset();
305
        $this->dispatcher->dispatch(new StartEvent($this));
306
    }
307
308
    /**
309
     * reset ETL
310
     * @return void
311
     */
312
    protected function reset(): void
313
    {
314
        $this->isFlush = false;
315
        $this->isSkip = false;
316
        $this->isRollback = false;
317
        $this->isStop = false;
318
    }
319
320
    /**
321
     * Extract data.
322
     * @param mixed $data
323
     * @return iterable<int|string, mixed>
324
     */
325
    protected function extract(mixed $data): iterable
326
    {
327
        $items = $this->extract === null
328
                ? $data
329
                : ($this->extract)($data, $this, $this->options);
330
        if ($items === null) {
331
            $items = new EmptyIterator();
332
        }
333
334
        if (is_iterable($items) === false) {
335
            throw new EtlException('Could not extract data');
336
        }
337
338
        try {
339
            foreach ($items as $key => $item) {
340
                try {
341
                    $this->isSkip = false;
342
                    $this->dispatcher->dispatch(new ItemEvent(
343
                        BaseEvent::EXTRACT,
344
                        $item,
345
                        $key,
346
                        $this
347
                    ));
348
                    yield $key => $item;
349
                } catch (Exception $e) {
350
                    continue;
351
                }
352
            }
353
        } catch (Throwable $e) {
354
            /** @var ItemExceptionEvent $event */
355
            $event = $this->dispatcher->dispatch(
356
                new ItemExceptionEvent(
357
                    BaseEvent::EXTRACT_EXCEPTION,
358
                    $item ?? null,
359
                    $key ?? null,
360
                    $this,
361
                    $e
362
                )
363
            );
364
365
            if ($event->shouldThrowException()) {
366
                throw $e;
367
            }
368
        }
369
    }
370
371
    /**
372
     * Transform data.
373
     * @param mixed $item
374
     * @param int|string $key
375
     * @return callable
376
     */
377
    protected function transform(mixed $item, int|string $key): callable
378
    {
379
        $tranformed = ($this->transform)($item, $key, $this, $this->options);
380
        if (!$tranformed instanceof Generator) {
381
            throw new EtlException('The transformer must return a generator');
382
        }
383
384
        $output = [];
385
        try {
386
            foreach ($tranformed as $key => $value) {
387
                $output[] = [$key, $value];
388
            }
389
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, $key, $this));
0 ignored issues
show
Bug introduced by
It seems like $key can also be of type null and true; however, parameter $key of Platine\Etl\Event\ItemEvent::__construct() does only seem to accept integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

389
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, /** @scrutinizer ignore-type */ $key, $this));
Loading history...
390
        } catch (Exception $e) {
391
           /** @var ItemExceptionEvent $event */
392
            $event = $this->dispatcher->dispatch(
393
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
0 ignored issues
show
Bug introduced by
It seems like $key ?? null can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEx...ionEvent::__construct() does only seem to accept integer|null|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

393
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, /** @scrutinizer ignore-type */ $key ?? null, $this, $e)
Loading history...
394
            );
395
396
            if ($event->shouldThrowException()) {
397
                throw $e;
398
            }
399
        }
400
401
        return static function () use ($output) {
402
            foreach ($output as [$key, $value]) {
403
                yield $key => $value;
404
            }
405
        };
406
    }
407
408
    /**
409
     * Init the loader on the 1st item.
410
     * @param mixed $item
411
     * @param int|string $key
412
     * @return void
413
     */
414
    protected function initLoader(mixed $item, int|string $key): void
415
    {
416
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOADER_INIT, $item, $key, $this));
417
        if ($this->init === null) {
418
            return;
419
        }
420
421
        ($this->init)($this->options);
422
    }
423
424
    /**
425
     * Load data.
426
     * @param iterable<int|string, mixed> $data
427
     * @param mixed $item
428
     * @param int|string $key
429
     * @param bool $flush
430
     * @param int $flushCounter
431
     * @param int $totalCounter
432
     * @return void
433
     */
434
    protected function load(
435
        iterable $data,
436
        mixed $item,
437
        int|string $key,
438
        bool $flush,
439
        int &$flushCounter,
440
        int &$totalCounter
441
    ): void {
442
        try {
443
            ($this->load)($data, $key, $this);
444
            $this->dispatcher->dispatch(new ItemEvent(
445
                BaseEvent::LOAD,
446
                $item,
447
                $key,
448
                $this
449
            ));
450
        } catch (Throwable $e) {
451
            /** @var ItemExceptionEvent $event */
452
            $event = $this->dispatcher->dispatch(
453
                new ItemExceptionEvent(
454
                    BaseEvent::LOAD_EXCEPTION,
455
                    $item ?? null,
456
                    $key,
457
                    $this,
458
                    $e
459
                )
460
            );
461
462
            if ($event->shouldThrowException()) {
463
                throw $e;
464
            }
465
466
            $flushCounter--;
467
            $totalCounter--;
468
        }
469
470
        $needFlush = $this->isFlush || $flush;
471
        if ($needFlush) {
472
            $this->flush($flushCounter, true);
473
        }
474
    }
475
476
    /**
477
     * Flush element
478
     * @param int $flushCounter
479
     * @param bool $partial
480
     * @return void
481
     */
482
    protected function flush(int &$flushCounter, bool $partial): void
483
    {
484
        if ($this->flush === null) {
485
            return;
486
        }
487
        ($this->flush)($partial);
488
        $this->dispatcher->dispatch(new FlushEvent(
489
            $this,
490
            $flushCounter,
491
            $partial
492
        ));
493
        $flushCounter = 0;
494
        $this->isFlush = false;
495
    }
496
497
    /**
498
     * Restore loader's initial state.
499
     * @param int $flushCounter
500
     * @return void
501
     */
502
    protected function rollback(int &$flushCounter): void
503
    {
504
        if ($this->rollback === null) {
505
            return;
506
        }
507
        ($this->rollback)();
508
        $this->dispatcher->dispatch(new RollbackEvent($this, $flushCounter));
509
        $flushCounter = 0;
510
    }
511
512
    /**
513
     * Process the end of the ETL.
514
     * @param int $flushCounter
515
     * @param int $totalCounter
516
     * @return void
517
     */
518
    protected function end(int $flushCounter, int $totalCounter): void
519
    {
520
        if ($this->isRollback) {
521
            $this->rollback($flushCounter);
522
            $totalCounter = max(0, $totalCounter - $flushCounter);
523
        } else {
524
            $this->flush($flushCounter, false);
525
        }
526
        $this->dispatcher->dispatch(new EndEvent($this, (int) $totalCounter));
527
        $this->reset();
528
    }
529
530
    /**
531
     * The default transformer to use if none is set
532
     * @return callable
533
     */
534
    protected function defaultTransformer(): callable
535
    {
536
        return function ($item, $key): Generator {
537
            yield $key => $item;
538
        };
539
    }
540
}
541