Passed
Push — develop ( fc36e2...f9edd1 )
by nguereza
01:43
created

Etl::__construct()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 22
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
c 0
b 0
f 0
dl 0
loc 22
rs 9.9
cc 3
nc 4
nop 8

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php
2
3
/**
4
 * Platine ETL
5
 *
6
 * Platine ETL is a library to Extract-Transform-Load Data from various sources
7
 *
8
 * This content is released under the MIT License (MIT)
9
 *
10
 * Copyright (c) 2020 Platine ETL
11
 * Copyright (c) 2019 Benoit POLASZEK
12
 *
13
 * Permission is hereby granted, free of charge, to any person obtaining a copy
14
 * of this software and associated documentation files (the "Software"), to deal
15
 * in the Software without restriction, including without limitation the rights
16
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
 * copies of the Software, and to permit persons to whom the Software is
18
 * furnished to do so, subject to the following conditions:
19
 *
20
 * The above copyright notice and this permission notice shall be included in all
21
 * copies or substantial portions of the Software.
22
 *
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
 * SOFTWARE.
30
 */
31
32
declare(strict_types=1);
33
34
namespace Platine\Etl;
35
36
use EmptyIterator;
37
use Exception;
38
use Generator;
39
use Platine\Etl\Event\BaseEvent;
40
use Platine\Etl\Event\EndEvent;
41
use Platine\Etl\Event\FlushEvent;
42
use Platine\Etl\Event\ItemEvent;
43
use Platine\Etl\Event\ItemExceptionEvent;
44
use Platine\Etl\Event\RollbackEvent;
45
use Platine\Etl\Event\StartEvent;
46
use Platine\Etl\Exception\EtlException;
47
use Platine\Etl\Loader\NullLoader;
48
use Platine\Event\Dispatcher;
49
use Platine\Event\DispatcherInterface;
50
use Throwable;
51
52
/**
53
 * @class Etl
54
 * @package Platine\Etl
55
 */
56
class Etl
57
{
58
    /**
59
     * @var callable|null
60
     */
61
    protected $extract = null;
62
63
    /**
64
     * Used to transform data
65
     * @var callable|null
66
     */
67
    protected $transform = null;
68
69
    /**
70
     * Used to initialize loader
71
     * @var callable|null
72
     */
73
    protected $init = null;
74
75
    /**
76
     * The loader
77
     * @var callable
78
     */
79
    protected $load;
80
81
    /**
82
     * @var callable|null
83
     */
84
    protected $flush = null;
85
86
    /**
87
     * @var callable|null
88
     */
89
    protected $rollback = null;
90
91
    /**
92
     * Total to flush
93
     * @var int|null
94
     */
95
    protected ?int $flushCount = null;
96
97
    /**
98
     * Whether to flush data
99
     * @var bool
100
     */
101
    protected bool $isFlush = false;
102
103
    /**
104
     * Whether to skip data
105
     * @var bool
106
     */
107
    protected bool $isSkip = false;
108
109
    /**
110
     * Whether to stop processing data
111
     * @var bool
112
     */
113
    protected bool $isStop = false;
114
115
    /**
116
     * Whether to rollback data
117
     * @var bool
118
     */
119
    protected bool $isRollback = false;
120
121
    /**
122
     * The event dispatcher
123
     * @var DispatcherInterface
124
     */
125
    protected DispatcherInterface $dispatcher;
126
127
    /**
128
     * Additional options
129
     * @var array<string, mixed>
130
     */
131
    protected array $options = [];
132
133
    /**
134
     * Create new instance
135
     * @param callable|null $extract
136
     * @param callable|null $transform
137
     * @param callable|null $init
138
     * @param callable|null $load
139
     * @param callable|null $flush
140
     * @param callable|null $rollback
141
     * @param int|null $flushCount
142
     * @param DispatcherInterface|null $dispatcher
143
     */
144
    public function __construct(
145
        ?callable $extract = null,
146
        ?callable $transform = null,
147
        ?callable $init = null,
148
        ?callable $load = null,
149
        ?callable $flush = null,
150
        ?callable $rollback = null,
151
        ?int $flushCount = null,
152
        ?DispatcherInterface $dispatcher = null
153
    ) {
154
        $this->extract = $extract;
155
        $this->transform = $transform ?? $this->defaultTransformer();
156
        $this->init = $init;
157
        if ($load === null) {
158
            $nullLoader = new NullLoader();
159
            $load = [$nullLoader, 'load'];
160
        }
161
        $this->load = $load;
162
        $this->flush = $flush;
163
        $this->rollback = $rollback;
164
        $this->flushCount = $flushCount !== null ? max(1, $flushCount) : null;
165
        $this->dispatcher = $dispatcher ?? new Dispatcher();
166
    }
167
168
    /**
169
     * Return the additional options
170
     * @return array<string, mixed>
171
     */
172
    public function getOptions(): array
173
    {
174
        return $this->options;
175
    }
176
177
    /**
178
     * Set the additional options
179
     * @param array<string, mixed> $options
180
     * @return $this
181
     */
182
    public function setOptions(array $options): self
183
    {
184
        $this->options = $options;
185
        return $this;
186
    }
187
188
189
    /**
190
     * Run the ETL on the given input.
191
     * @param mixed|null $data
192
     * @param array<string, mixed> $options additional options
193
     * @return void
194
     */
195
    public function process($data = null, array $options = []): void
196
    {
197
        if (count($options) > 0) {
198
            $this->options = $options;
199
        }
200
201
        $flushCounter = 0;
202
        $totalCounter = 0;
203
204
        $this->start();
205
        foreach ($this->extract($data) as $key => $item) {
206
            if ($this->isSkip) {
207
                $this->skip($item, $key);
208
                continue;
209
            }
210
211
            if ($this->isStop) {
212
                $this->stop($item, $key);
213
                break;
214
            }
215
216
            $transformed = $this->transform($item, $key);
217
218
            if ($this->isSkip) {
219
                $this->skip($item, $key);
220
                continue;
221
            }
222
223
            if ($this->isStop) {
224
                $this->stop($item, $key);
225
                break;
226
            }
227
228
            $flushCounter++;
229
            $totalCounter++;
230
231
            if ($totalCounter === 1) {
232
                $this->initLoader($item, $key);
233
            }
234
235
            $needFlush = ($this->flushCount === null ? false : (($totalCounter % $this->flushCount) === 0));
236
            $this->load($transformed(), $item, $key, $needFlush, $flushCounter, $totalCounter);
237
        }
238
239
        $this->end($flushCounter, $totalCounter);
240
    }
241
242
    /**
243
     * Ask the ETl to stop.
244
     * @param bool $isRollback if the loader should rollback instead of flushing.
245
     * @return void
246
     */
247
    public function stopProcess(bool $isRollback = false): void
248
    {
249
        $this->isStop = true;
250
        $this->isRollback = $isRollback;
251
    }
252
253
    /**
254
     * Mark the current item to be skipped.
255
     * @return void
256
     */
257
    public function skipCurrentItem(): void
258
    {
259
        $this->isSkip = true;
260
    }
261
262
    /**
263
     * Ask the loader to trigger flush ASAP.
264
     * @return void
265
     */
266
    public function triggerFlush(): void
267
    {
268
        $this->isFlush = true;
269
    }
270
271
    /**
272
     * Process item skip.
273
     * @param mixed $item
274
     * @param int|string $key
275
     * @return void
276
     */
277
    protected function skip($item, $key): void
278
    {
279
        $this->isSkip = false;
280
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::SKIP, $item, $key, $this));
281
    }
282
283
    /**
284
     * @param mixed $item
285
     * @param int|string $key
286
     * @return void
287
     */
288
    protected function stop($item, $key): void
289
    {
290
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::STOP, $item, $key, $this));
291
    }
292
293
    /**
294
     * Start processing
295
     * @return void
296
     */
297
    protected function start(): void
298
    {
299
        $this->reset();
300
        $this->dispatcher->dispatch(new StartEvent($this));
301
    }
302
303
    /**
304
     * reset ETL
305
     * @return void
306
     */
307
    protected function reset(): void
308
    {
309
        $this->isFlush = false;
310
        $this->isSkip = false;
311
        $this->isRollback = false;
312
        $this->isStop = false;
313
    }
314
315
    /**
316
     * Extract data.
317
     * @param mixed $data
318
     * @return iterable<int|string, mixed>
319
     */
320
    protected function extract($data): iterable
321
    {
322
        $items = $this->extract === null ? $data : ($this->extract)($data, $this, $this->options);
323
        if ($items === null) {
324
            $items = new EmptyIterator();
325
        }
326
327
        if (is_iterable($items) === false) {
328
            throw new EtlException('Could not extract data');
329
        }
330
331
        try {
332
            foreach ($items as $key => $item) {
333
                try {
334
                    $this->isSkip = false;
335
                    $this->dispatcher->dispatch(new ItemEvent(BaseEvent::EXTRACT, $item, $key, $this));
336
                    yield $key => $item;
337
                } catch (Exception $e) {
338
                    continue;
339
                }
340
            }
341
        } catch (Throwable $e) {
342
            /** @var ItemExceptionEvent $event */
343
            $event = $this->dispatcher->dispatch(
344
                new ItemExceptionEvent(BaseEvent::EXTRACT_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
345
            );
346
347
            if ($event->shouldThrowException()) {
348
                throw $e;
349
            }
350
        }
351
    }
352
353
    /**
354
     * Transform data.
355
     * @param mixed $item
356
     * @param int|string $key
357
     * @return callable
358
     */
359
    protected function transform($item, $key): callable
360
    {
361
        $tranformed = ($this->transform)($item, $key, $this, $this->options);
362
        if (!$tranformed instanceof Generator) {
363
            throw new EtlException('The transformer must return a generator');
364
        }
365
366
        $output = [];
367
        try {
368
            foreach ($tranformed as $key => $value) {
0 ignored issues
show
introduced by
$key is overwriting one of the parameters of this function.
Loading history...
369
                $output[] = [$key, $value];
370
            }
371
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, $key, $this));
0 ignored issues
show
Bug introduced by
It seems like $key can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEvent::__construct() does only seem to accept integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

371
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::TRANSFORM, $item, /** @scrutinizer ignore-type */ $key, $this));
Loading history...
372
        } catch (Exception $e) {
373
           /** @var ItemExceptionEvent $event */
374
            $event = $this->dispatcher->dispatch(
375
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, $key ?? null, $this, $e)
0 ignored issues
show
Bug introduced by
It seems like $key ?? null can also be of type true; however, parameter $key of Platine\Etl\Event\ItemEx...ionEvent::__construct() does only seem to accept integer|null|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

375
                new ItemExceptionEvent(BaseEvent::TRANSFORM_EXCEPTION, $item ?? null, /** @scrutinizer ignore-type */ $key ?? null, $this, $e)
Loading history...
376
            );
377
378
            if ($event->shouldThrowException()) {
379
                throw $e;
380
            }
381
        }
382
383
        return static function () use ($output) {
384
            foreach ($output as [$key, $value]) {
385
                yield $key => $value;
386
            }
387
        };
388
    }
389
390
    /**
391
     * Init the loader on the 1st item.
392
     * @param mixed $item
393
     * @param int|string $key
394
     * @return void
395
     */
396
    protected function initLoader($item, $key): void
397
    {
398
        $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOADER_INIT, $item, $key, $this));
399
        if ($this->init === null) {
400
            return;
401
        }
402
403
        ($this->init)($this->options);
404
    }
405
406
    /**
407
     * Load data.
408
     * @param iterable<int|string, mixed> $data
409
     * @param mixed $item
410
     * @param int|string $key
411
     * @param bool $flush
412
     * @param int $flushCounter
413
     * @param int $totalCounter
414
     * @return void
415
     */
416
    protected function load(
417
        iterable $data,
418
        $item,
419
        $key,
420
        bool $flush,
421
        int &$flushCounter,
422
        int &$totalCounter
423
    ): void {
424
        try {
425
            ($this->load)($data, $key, $this);
426
            $this->dispatcher->dispatch(new ItemEvent(BaseEvent::LOAD, $item, $key, $this));
427
        } catch (Throwable $e) {
428
            /** @var ItemExceptionEvent $event */
429
            $event = $this->dispatcher->dispatch(
430
                new ItemExceptionEvent(BaseEvent::LOAD_EXCEPTION, $item ?? null, $key, $this, $e)
431
            );
432
433
            if ($event->shouldThrowException()) {
434
                throw $e;
435
            }
436
437
            $flushCounter--;
438
            $totalCounter--;
439
        }
440
441
        $needFlush = $this->isFlush || $flush;
442
        if ($needFlush) {
443
            $this->flush($flushCounter, true);
444
        }
445
    }
446
447
    /**
448
     * Flush element
449
     * @param int $flushCounter
450
     * @param bool $partial
451
     * @return void
452
     */
453
    protected function flush(int &$flushCounter, bool $partial): void
454
    {
455
        if ($this->flush === null) {
456
            return;
457
        }
458
        ($this->flush)($partial);
459
        $this->dispatcher->dispatch(new FlushEvent($this, $flushCounter, $partial));
460
        $flushCounter = 0;
461
        $this->isFlush = false;
462
    }
463
464
    /**
465
     * Restore loader's initial state.
466
     * @param int $flushCounter
467
     * @return void
468
     */
469
    protected function rollback(int &$flushCounter): void
470
    {
471
        if ($this->rollback === null) {
472
            return;
473
        }
474
        ($this->rollback)();
475
        $this->dispatcher->dispatch(new RollbackEvent($this, $flushCounter));
476
        $flushCounter = 0;
477
    }
478
479
    /**
480
     * Process the end of the ETL.
481
     * @param int $flushCounter
482
     * @param int $totalCounter
483
     * @return void
484
     */
485
    protected function end(int $flushCounter, int $totalCounter): void
486
    {
487
        if ($this->isRollback) {
488
            $this->rollback($flushCounter);
489
            $totalCounter = max(0, $totalCounter - $flushCounter);
490
        } else {
491
            $this->flush($flushCounter, false);
492
        }
493
        $this->dispatcher->dispatch(new EndEvent($this, (int) $totalCounter));
494
        $this->reset();
495
    }
496
497
    /**
498
     * The default transformer to use if none is set
499
     * @return callable
500
     */
501
    protected function defaultTransformer(): callable
502
    {
503
        return function ($item, $key): Generator {
504
            yield $key => $item;
505
        };
506
    }
507
}
508