Issues (5)

src/EtlTool.php (1 issue)

Labels
Severity
1
<?php
2
3
/**
4
 * Platine ETL
5
 *
6
 * Platine ETL is a library to Extract-Transform-Load Data from various sources
7
 *
8
 * This content is released under the MIT License (MIT)
9
 *
10
 * Copyright (c) 2020 Platine ETL
11
 * Copyright (c) 2019 Benoit POLASZEK
12
 *
13
 * Permission is hereby granted, free of charge, to any person obtaining a copy
14
 * of this software and associated documentation files (the "Software"), to deal
15
 * in the Software without restriction, including without limitation the rights
16
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
 * copies of the Software, and to permit persons to whom the Software is
18
 * furnished to do so, subject to the following conditions:
19
 *
20
 * The above copyright notice and this permission notice shall be included in all
21
 * copies or substantial portions of the Software.
22
 *
23
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
 * SOFTWARE.
30
 */
31
32
declare(strict_types=1);
33
34
namespace Platine\Etl;
35
36
use InvalidArgumentException;
37
use Platine\Etl\Event\BaseEvent;
38
use Platine\Etl\Extractor\ExtractorInterface;
39
use Platine\Etl\Loader\LoaderInterface;
40
use Platine\Etl\Transformer\TransformerInterface;
41
use Platine\Event\Dispatcher;
42
use Platine\Event\DispatcherInterface;
43
use Platine\Event\ListenerInterface;
0 ignored issues
show
The type Platine\Event\ListenerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
44
use RuntimeException;
45
46
/**
47
 * @class EtlTool
48
 * @package Platine\Etl
49
 */
50
class EtlTool
51
{
52
    /**
53
     * @var callable|null
54
     */
55
    protected $extractor = null;
56
57
    /**
58
     * Used to transform data
59
     * @var callable|null
60
     */
61
    protected $transformer = null;
62
63
    /**
64
     * Used to initialize loader
65
     * @var callable|null
66
     */
67
    protected $initLoader = null;
68
69
    /**
70
     * The loader
71
     * @var callable
72
     */
73
    protected $loader;
74
75
    /**
76
     * @var callable|null
77
     */
78
    protected $committer = null;
79
80
    /**
81
     * @var callable|null
82
     */
83
    protected $restorer = null;
84
85
    /**
86
     * Total to flush
87
     * @var int|null
88
     */
89
    protected ?int $flushCount = null;
90
91
    /**
92
     * The event dispatcher
93
     * @var DispatcherInterface
94
     */
95
    protected DispatcherInterface $dispatcher;
96
97
    /**
98
     * Create new instance
99
     * @param ExtractorInterface|callable|null $extractor
100
     * @param TransformerInterface|callable|null $transformer
101
     * @param LoaderInterface|callable|null $loader
102
     * @param DispatcherInterface|null $dispatcher
103
     */
104
    public function __construct(
105
        ExtractorInterface|callable|null $extractor = null,
106
        TransformerInterface|callable|null $transformer = null,
107
        LoaderInterface|callable|null $loader = null,
108
        ?DispatcherInterface $dispatcher = null
109
    ) {
110
        if ($extractor !== null) {
111
            $this->extractor($extractor);
112
        }
113
        if ($transformer !== null) {
114
            $this->transformer($transformer);
115
        }
116
117
        if ($loader !== null) {
118
            $this->loader($loader);
119
        }
120
121
        $this->dispatcher = $dispatcher ?? new Dispatcher();
122
    }
123
124
    /**
125
     * The extractor to be used
126
     * @param ExtractorInterface|callable|null $extractor
127
     * @return $this
128
     */
129
    public function extractor(ExtractorInterface|callable|null $extractor): self
130
    {
131
        if ($extractor instanceof ExtractorInterface) {
132
            $this->extractor = [$extractor, 'extract'];
133
134
            return $this;
135
        }
136
137
        $this->extractor = $extractor;
138
139
140
        return $this;
141
    }
142
143
    /**
144
     * The transformer to be used
145
     * @param TransformerInterface|callable|null $transformer
146
     * @return $this
147
     */
148
    public function transformer(TransformerInterface|callable|null $transformer): self
149
    {
150
        if ($transformer instanceof TransformerInterface) {
151
            $this->transformer = [$transformer, 'transform'];
152
153
            return $this;
154
        }
155
156
        $this->transformer = $transformer;
157
158
        return $this;
159
    }
160
161
    /**
162
     * The loader to be used
163
     * @param LoaderInterface|callable $loader
164
     * @return $this
165
     */
166
    public function loader(LoaderInterface|callable $loader): self
167
    {
168
        if ($loader instanceof LoaderInterface) {
169
            $this->loader = [$loader, 'load'];
170
            $this->initLoader = [$loader, 'init'];
171
            $this->committer = [$loader, 'commit'];
172
            $this->restorer = [$loader, 'rollback'];
173
174
            return $this;
175
        }
176
177
        $this->loader = $loader;
178
179
        return $this;
180
    }
181
182
    /**
183
     *
184
     * @param int|null $flushCount
185
     * @return $this
186
     */
187
    public function setFlushCount(?int $flushCount): self
188
    {
189
        $this->flushCount = $flushCount;
190
        return $this;
191
    }
192
193
    /**
194
     * Register a listener for start event
195
     *
196
     * @param ListenerInterface|callable $listener the Listener interface or any callable
197
     * @param int $priority the listener execution priority
198
     * @return $this
199
     */
200
    public function onStart(
201
        ListenerInterface|callable $listener,
202
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
203
    ): self {
204
        $this->dispatcher->addListener(BaseEvent::START, $listener, $priority);
205
206
        return $this;
207
    }
208
209
    /**
210
     * Register a listener for extract event
211
     *
212
     * @param ListenerInterface|callable $listener the Listener interface or any callable
213
     * @param int $priority the listener execution priority
214
     * @return $this
215
     */
216
    public function onExtract(
217
        ListenerInterface|callable $listener,
218
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
219
    ): self {
220
        $this->dispatcher->addListener(BaseEvent::EXTRACT, $listener, $priority);
221
222
        return $this;
223
    }
224
225
    /**
226
     * Register a listener for extract exception event
227
     *
228
     * @param ListenerInterface|callable $listener the Listener interface or any callable
229
     * @param int $priority the listener execution priority
230
     * @return $this
231
     */
232
    public function onExtractException(
233
        ListenerInterface|callable $listener,
234
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
235
    ): self {
236
        $this->dispatcher->addListener(BaseEvent::EXTRACT_EXCEPTION, $listener, $priority);
237
238
        return $this;
239
    }
240
241
    /**
242
     * Register a listener for transform event
243
     *
244
     * @param ListenerInterface|callable $listener the Listener interface or any callable
245
     * @param int $priority the listener execution priority
246
     * @return $this
247
     */
248
    public function onTransform(
249
        ListenerInterface|callable $listener,
250
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
251
    ): self {
252
        $this->dispatcher->addListener(BaseEvent::TRANSFORM, $listener, $priority);
253
254
        return $this;
255
    }
256
257
    /**
258
     * Register a listener for transform exception event
259
     *
260
     * @param ListenerInterface|callable $listener the Listener interface or any callable
261
     * @param int $priority the listener execution priority
262
     * @return $this
263
     */
264
    public function onTransformException(
265
        ListenerInterface|callable $listener,
266
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
267
    ): self {
268
        $this->dispatcher->addListener(BaseEvent::TRANSFORM_EXCEPTION, $listener, $priority);
269
270
        return $this;
271
    }
272
273
    /**
274
     * Register a listener for loader init event
275
     *
276
     * @param ListenerInterface|callable $listener the Listener interface or any callable
277
     * @param int $priority the listener execution priority
278
     * @return $this
279
     */
280
    public function onLoaderInit(
281
        ListenerInterface|callable $listener,
282
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
283
    ): self {
284
        $this->dispatcher->addListener(BaseEvent::LOADER_INIT, $listener, $priority);
285
286
        return $this;
287
    }
288
289
    /**
290
     * Register a listener for loader load event
291
     *
292
     * @param ListenerInterface|callable $listener the Listener interface or any callable
293
     * @param int $priority the listener execution priority
294
     * @return $this
295
     */
296
    public function onLoad(
297
        ListenerInterface|callable $listener,
298
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
299
    ): self {
300
        $this->dispatcher->addListener(BaseEvent::LOAD, $listener, $priority);
301
302
        return $this;
303
    }
304
305
    /**
306
     * Register a listener for loader load exception event
307
     *
308
     * @param ListenerInterface|callable $listener the Listener interface or any callable
309
     * @param int $priority the listener execution priority
310
     * @return $this
311
     */
312
    public function onLoadException(
313
        ListenerInterface|callable $listener,
314
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
315
    ): self {
316
        $this->dispatcher->addListener(BaseEvent::LOAD_EXCEPTION, $listener, $priority);
317
318
        return $this;
319
    }
320
321
    /**
322
     * Register a listener for flush event
323
     *
324
     * @param ListenerInterface|callable $listener the Listener interface or any callable
325
     * @param int $priority the listener execution priority
326
     * @return $this
327
     */
328
    public function onFlush(
329
        ListenerInterface|callable $listener,
330
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
331
    ): self {
332
        $this->dispatcher->addListener(BaseEvent::FLUSH, $listener, $priority);
333
334
        return $this;
335
    }
336
337
    /**
338
     * Register a listener for skip event
339
     *
340
     * @param ListenerInterface|callable $listener the Listener interface or any callable
341
     * @param int $priority the listener execution priority
342
     * @return $this
343
     */
344
    public function onSkip(
345
        ListenerInterface|callable $listener,
346
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
347
    ): self {
348
        $this->dispatcher->addListener(BaseEvent::SKIP, $listener, $priority);
349
350
        return $this;
351
    }
352
353
    /**
354
     * Register a listener for stop event
355
     *
356
     * @param ListenerInterface|callable $listener the Listener interface or any callable
357
     * @param int $priority the listener execution priority
358
     * @return $this
359
     */
360
    public function onStop(
361
        ListenerInterface|callable $listener,
362
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
363
    ): self {
364
        $this->dispatcher->addListener(BaseEvent::STOP, $listener, $priority);
365
366
        return $this;
367
    }
368
369
    /**
370
     * Register a listener for rollback event
371
     *
372
     * @param ListenerInterface|callable $listener the Listener interface or any callable
373
     * @param int $priority the listener execution priority
374
     * @return $this
375
     */
376
    public function onRollback(
377
        ListenerInterface|callable $listener,
378
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
379
    ): self {
380
        $this->dispatcher->addListener(BaseEvent::ROLLBACK, $listener, $priority);
381
382
        return $this;
383
    }
384
385
    /**
386
     * Register a listener for end event
387
     *
388
     * @param ListenerInterface|callable $listener the Listener interface or any callable
389
     * @param int $priority the listener execution priority
390
     * @return $this
391
     */
392
    public function onEnd(
393
        ListenerInterface|callable $listener,
394
        int $priority = DispatcherInterface::PRIORITY_DEFAULT
395
    ): self {
396
        $this->dispatcher->addListener(BaseEvent::END, $listener, $priority);
397
398
        return $this;
399
    }
400
401
    /**
402
     * Create ETL object
403
     * @return Etl
404
     */
405
    public function create(): Etl
406
    {
407
        if ($this->loader === null) {
408
            throw new RuntimeException('The loader not defined');
409
        }
410
411
        if ($this->flushCount !== null && $this->flushCount <= 0) {
412
            throw new RuntimeException('The flush count must be null or greather than zero (0)');
413
        }
414
415
        return new Etl(
416
            $this->extractor,
417
            $this->transformer,
418
            $this->initLoader,
419
            $this->loader,
420
            $this->committer,
421
            $this->restorer,
422
            $this->flushCount,
423
            $this->dispatcher
424
        );
425
    }
426
}
427