Passed
Push — develop ( 8a0331...fc36e2 )
by nguereza
01:36
created

Etl::__construct()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 22
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
c 0
b 0
f 0
dl 0
loc 22
rs 9.9
cc 3
nc 4
nop 8

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php
2
3
/**
4
 * Platine ETL
5
 *
6
 * Platine ETL is a library to Extract-Transform-Load Data from various sources
7
 *
8
 * This content is released under the MIT License (MIT)
9
 *
10
 * Copyright (c) 2020 Platine ETL
11
 * Copyright (c) 2019 Benoit POLASZEK
12
 *
13
 * Permission is hereby granted, free of charge, to any person obtaining a copy
14
 * of this software and associated documentation files (the "Software"), to deal
15
 * in the Software without restriction, including without limitation the rights
16
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
 * copies of the Software, and to permit persons to whom the Software is
18
 * furnished to do so, subject to the following conditions:
19
 *
20
 * The above copyright notice and this permission notice shall be included in all
21
 * copies or substantial portions of the Software.
22
 *
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
 * SOFTWARE.
30
 */
31
32
declare(strict_types=1);
33
34
namespace Platine\Etl;
35
36
use EmptyIterator;
37
use Exception;
38
use Generator;
39
use Platine\Etl\Event\BaseEvent;
40
use Platine\Etl\Event\EndEvent;
41
use Platine\Etl\Event\FlushEvent;
42
use Platine\Etl\Event\ItemEvent;
43
use Platine\Etl\Event\ItemExceptionEvent;
44
use Platine\Etl\Event\RollbackEvent;
45
use Platine\Etl\Event\StartEvent;
46
use Platine\Etl\Exception\EtlException;
47
use Platine\Etl\Loader\NullLoader;
48
use Platine\Event\Dispatcher;
49
use Platine\Event\DispatcherInterface;
50
use Throwable;
51
52
/**
53
 * @class Etl
54
 * @package Platine\Etl
55
 */
56
class Etl
57
{
58
    /**
59
     * @var callable|null
60
     */
61
    protected $extract = null;
62
63
    /**
64
     * Used to transform data
65
     * @var callable|null
66
     */
67
    protected $transform = null;
68
69
    /**
70
     * Used to initialize loader
71
     * @var callable|null
72
     */
73
    protected $init = null;
74
75
    /**
76
     * The loader
77
     * @var callable
78
     */
79
    protected $load;
80
81
    /**
82
     * @var callable|null
83
     */
84
    protected $flush = null;
85
86
    /**
87
     * @var callable|null
88
     */
89
    protected $rollback = null;
90
91
    /**
92
     * Total to flush
93
     * @var int|null
94
     */
95
    protected ?int $flushCount = null;
96
97
    /**
98
     * Whether to flush data
99
     * @var bool
100
     */
101
    protected bool $isFlush = false;
102
103
    /**
104
     * Whether to skip data
105
     * @var bool
106
     */
107
    protected bool $isSkip = false;
108
109
    /**
110
     * Whether to stop processing data
111
     * @var bool
112
     */
113
    protected bool $isStop = false;
114
115
    /**
116
     * Whether to rollback data
117
     * @var bool
118
     */
119
    protected bool $isRollback = false;
120
121
    /**
122
     * The event dispatcher
123
     * @var DispatcherInterface
124
     */
125
    protected DispatcherInterface $dispatcher;
126
127
    /**
128
     * Additional options
129
     * @var array<string, mixed>
130
     */
131
    protected array $options = [];
132
133
    /**
134
     * Create new instance
135
     * @param callable|null $extract
136
     * @param callable|null $transform
137
     * @param callable|null $init
138
     * @param callable|null $load
139
     * @param callable|null $flush
140
     * @param callable|null $rollback
141
     * @param int|null $flushCount
142
     * @param DispatcherInterface|null $dispatcher
143
     */
144
    public function __construct(
145
        ?callable $extract = null,
146
        ?callable $transform = null,
147
        ?callable $init = null,
148
        ?callable $load = null,
149
        ?callable $flush = null,
150
        ?callable $rollback = null,
151
        ?int $flushCount = null,
152
        ?DispatcherInterface $dispatcher = null
153
    ) {
154
        $this->extract = $extract;
155
        $this->transform = $transform ?? $this->defaultTransformer();
156
        $this->init = $init;
157
        if ($load === null) {
158
            $nullLoader = new NullLoader();
159
            $load = [$nullLoader, 'load'];
160
        }
161
        $this->load = $load;
162
        $this->flush = $flush;
163
        $this->rollback = $rollback;
164
        $this->flushCount = $flushCount !== null ? max(1, $flushCount) : null;
165
        $this->dispatcher = $dispatcher ?? new Dispatcher();
166
    }
167
168
    /**
169
     * Return the additional options
170
     * @return array<string, mixed>
171
     */
172
    public function getOptions(): array
173
    {
174
        return $this->options;
175
    }
176
177
    /**
178
     * Set the additional options
179
     * @param array<string, mixed> $options
180
     * @return $this
181
     */
182
    public function setOptions(array $options): self
183
    {
184
        $this->options = $options;
185
        return $this;
186
    }
187
188
189
    /**
190
     * Run the ETL on the given input.
191
     * @param mixed|null $data
192
     * @param array<string, mixed> $options additional options
193
     * @return void
194
     */
195
    public function process($data = null, array $options = []): void
196
    {
197
        if (count($options) > 0) {
198
            $this->options = $options;
199
        }
200
201
        $flushCounter = 0;
202
        $totalCounter = 0;
203
204
        $this->start();
205
        foreach ($this->extract($data) as $key => $item) {
206
            if ($this->isSkip) {
207
                $this->skip($item, $key);
208
                continue;
209
            }
210
211
            if ($this->isStop) {
212
                $this->stop($item, $key);
213
                break;
214
            }
215
216
            $transformed = $this->transform($item, $key);
217
218
            if ($this->isSkip) {
219
                $this->skip($item, $key);
220
                continue;
221
            }
222
223
            if ($this->isStop) {
224
                $this->stop($item, $key);
225
                break;
226
            }
227
228
            $flushCounter++;
229
            $totalCounter++;
230
231
            if ($totalCounter === 1) {
232
                $this->initLoader($item, $key);
233
            }
234
235
            $needFlush = ($this->flushCount === null ? false : (($totalCounter % $this->flushCount) === 0));
236
            $this->load($transformed(), $item, $key, $needFlush, $flushCounter, $totalCounter);
237
        }
238
239
        $this->end($flushCounter, $totalCounter);
240
    }
241
242
    /**
243
     * Ask the ETl to stop.
244
     * @param bool $isRollback if the loader should rollback instead of flushing.
245
     * @return void
246
     */
247
    public function stopProcess(bool $isRollback = false): void
248
    {
249
        $this->isStop = true;
250
        $this->isRollback = $isRollback;
251
    }
252
253
    /**
254
     * Mark the current item to be skipped.
255
     * @return void
256
     */
257
    public function skipCurrentItem(): void
258
    {
259
        $this->isSkip = true;
260
    }
261
262
    /**
263
     * Ask the loader to trigger flush ASAP.
264
     * @return void
265
     */
266
    public function triggerFlush(): void
267
    {
268
        $this->isFlush = true;
269
    }
270
271
    /**
272
     * Process item skip.
273
     * @param mixed $item
274
     * @param int|string $key
275
     * @return void
276
     */
277
    protected function skip($item, $key): void
278
    {
279
        $this->isSkip = false;
280
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::SKIP, $item, $key, $this));
281
    }
282
283
    /**
284
     * @param mixed $item
285
     * @param int|string $key
286
     * @return void
287
     */
288
    protected function stop($item, $key): void
289
    {
290
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::STOP, $item, $key, $this));
291
    }
292
293
    /**
294
     * Start processing
295
     * @return void
296
     */
297
    protected function start(): void
298
    {
299
        $this->reset();
300
        $this->dispatcher->dispatch(new StartEvent($this));
301
    }
302
303
    /**
304
     * reset ETL
305
     * @return void
306
     */
307
    protected function reset(): void
308
    {
309
        $this->isFlush = false;
310
        $this->isSkip = false;
311
        $this->isRollback = false;
312
        $this->isStop = false;
313
    }
314
315
    /**
316
     * Extract data.
317
     * @param mixed $data
318
     * @return iterable<int|string, mixed>
319
     */
320
    protected function extract($data): iterable
321
    {
322
        $options = $this->options;
323
        if (isset($options['extract'])) {
324
            $options = array_merge($options, $options['extract']);
325
            unset($options['extract']);
326
        }
327
328
        $items = $this->extract === null ? $data : ($this->extract)($data, $this, $options);
329
        if ($items === null) {
330
            $items = new EmptyIterator();
331
        }
332
333
        if (is_iterable($items) === false) {
334
            throw new EtlException('Could not extract data');
335
        }
336
337
        try {
338
            foreach ($items as $key => $item) {
339
                try {
340
                    $this->isSkip = false;
341
                    $this->dispatcher->dispatch(new ItemEvent(BaseEvent::EXTRACT, $item, $key, $this));
342
                    yield $key => $item;
343
                } catch (Exception $e) {
344
                    continue;
345
                }
346
            }
347
        } catch (Throwable $e) {
348
            /** @var ItemExceptionEvent $event */
349
            $event = $this->dispatcher->dispatch(
350
                new ItemExceptionEvent(BaseEvent::EXTRACT_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
351
            );
352
353
            if ($event->shouldThrowException()) {
354
                throw $e;
355
            }
356
        }
357
    }
358
359
    /**
360
     * Transform data.
361
     * @param mixed $item
362
     * @param int|string $key
363
     * @return callable
364
     */
365
    protected function transform($item, $key): callable
366
    {
367
        $options = $this->options;
368
        if (isset($options['transform'])) {
369
            $options = array_merge($options, $options['transform']);
370
            unset($options['transform']);
371
        }
372
373
        $tranformed = ($this->transform)($item, $key, $this, $options);
374
        if (!$tranformed instanceof Generator) {
375
            throw new EtlException('The transformer must return a generator');
376
        }
377
378
        $output = [];
379
        try {
380
            foreach ($tranformed as $key => $value) {
0 ignored issues
show
introduced by
$key is overwriting one of the parameters of this function.
Loading history...
381
                $output[] = [$key, $value];
382
            }
383
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, $key, $this));
0 ignored issues
show
Bug introduced by
It seems like $key can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEvent::__construct() does only seem to accept integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

383
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, /** @scrutinizer ignore-type */ $key, $this));
Loading history...
384
        } catch (Exception $e) {
385
           /** @var ItemExceptionEvent $event */
386
            $event = $this->dispatcher->dispatch(
387
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
0 ignored issues
show
Bug introduced by
It seems like $key ?? null can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEx...ionEvent::__construct() does only seem to accept integer|null|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

387
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, /** @scrutinizer ignore-type */ $key ?? null, $this, $e)
Loading history...
388
            );
389
390
            if ($event->shouldThrowException()) {
391
                throw $e;
392
            }
393
        }
394
395
        return static function () use ($output) {
396
            foreach ($output as [$key, $value]) {
397
                yield $key => $value;
398
            }
399
        };
400
    }
401
402
    /**
403
     * Init the loader on the 1st item.
404
     * @param mixed $item
405
     * @param int|string $key
406
     * @return void
407
     */
408
    protected function initLoader($item, $key): void
409
    {
410
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOADER_INIT, $item, $key, $this));
411
        if ($this->init === null) {
412
            return;
413
        }
414
415
        $options = $this->options;
416
        if (isset($options['loader'])) {
417
            $options = array_merge($options, $options['loader']);
418
            unset($options['loader']);
419
        }
420
421
        ($this->init)($options);
422
    }
423
424
    /**
425
     * Load data.
426
     * @param iterable<int|string, mixed> $data
427
     * @param mixed $item
428
     * @param int|string $key
429
     * @param bool $flush
430
     * @param int $flushCounter
431
     * @param int $totalCounter
432
     * @return void
433
     */
434
    protected function load(
435
        iterable $data,
436
        $item,
437
        $key,
438
        bool $flush,
439
        int &$flushCounter,
440
        int &$totalCounter
441
    ): void {
442
        try {
443
            ($this->load)($data, $key, $this);
444
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOAD, $item, $key, $this));
445
        } catch (Throwable $e) {
446
            /** @var ItemExceptionEvent $event */
447
            $event = $this->dispatcher->dispatch(
448
                new ItemExceptionEvent(BaseEvent::LOAD_EXCEPTION, $item ?? null, $key, $this, $e)
449
            );
450
451
            if ($event->shouldThrowException()) {
452
                throw $e;
453
            }
454
455
            $flushCounter--;
456
            $totalCounter--;
457
        }
458
459
        $needFlush = $this->isFlush || $flush;
460
        if ($needFlush) {
461
            $this->flush($flushCounter, true);
462
        }
463
    }
464
465
    /**
466
     * Flush element
467
     * @param int $flushCounter
468
     * @param bool $partial
469
     * @return void
470
     */
471
    protected function flush(int &$flushCounter, bool $partial): void
472
    {
473
        if ($this->flush === null) {
474
            return;
475
        }
476
        ($this->flush)($partial);
477
        $this->dispatcher->dispatch(new FlushEvent($this, $flushCounter, $partial));
478
        $flushCounter = 0;
479
        $this->isFlush = false;
480
    }
481
482
    /**
483
     * Restore loader's initial state.
484
     * @param int $flushCounter
485
     * @return void
486
     */
487
    protected function rollback(int &$flushCounter): void
488
    {
489
        if ($this->rollback === null) {
490
            return;
491
        }
492
        ($this->rollback)();
493
        $this->dispatcher->dispatch(new RollbackEvent($this, $flushCounter));
494
        $flushCounter = 0;
495
    }
496
497
    /**
498
     * Process the end of the ETL.
499
     * @param int $flushCounter
500
     * @param int $totalCounter
501
     * @return void
502
     */
503
    protected function end(int $flushCounter, int $totalCounter): void
504
    {
505
        if ($this->isRollback) {
506
            $this->rollback($flushCounter);
507
            $totalCounter = max(0, $totalCounter - $flushCounter);
508
        } else {
509
            $this->flush($flushCounter, false);
510
        }
511
        $this->dispatcher->dispatch(new EndEvent($this, (int) $totalCounter));
512
        $this->reset();
513
    }
514
515
    /**
516
     * The default transformer to use if none is set
517
     * @return callable
518
     */
519
    protected function defaultTransformer(): callable
520
    {
521
        return function ($item, $key): Generator {
522
            yield $key => $item;
523
        };
524
    }
525
}
526