Passed
Push — etls ( 3c161b )
by Fabrice
11:34
created

YaEtl::getStats()   A

Complexity

Conditions 6
Paths 8

Size

Total Lines 31
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 1
Metric Value
cc 6
eloc 14
c 4
b 0
f 1
nc 8
nop 0
dl 0
loc 31
rs 9.2222
1
<?php
2
3
/*
4
 * This file is part of YaEtl
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowEventAbstract;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowStatusInterface;
15
use fab2s\NodalFlow\NodalFlow;
16
use fab2s\NodalFlow\NodalFlowException;
17
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
18
use fab2s\NodalFlow\Nodes\BranchNode;
19
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
20
use fab2s\NodalFlow\Nodes\NodeInterface;
21
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
22
use fab2s\YaEtl\Events\YaEtlEvent;
23
use fab2s\YaEtl\Extractors\AggregateExtractor;
24
use fab2s\YaEtl\Extractors\ExtractorInterface;
25
use fab2s\YaEtl\Extractors\ExtractorLimitInterface;
26
use fab2s\YaEtl\Extractors\JoinableInterface;
27
use fab2s\YaEtl\Extractors\OnClauseInterface;
28
use fab2s\YaEtl\Loaders\LoaderInterface;
29
use fab2s\YaEtl\Qualifiers\QualifierInterface;
30
use fab2s\YaEtl\Transformers\TransformerInterface;
31
32
/**
33
 * Class YaEtl
34
 */
35
class YaEtl extends NodalFlow
36
{
37
    /**
38
     * @var array
39
     */
40
    protected $flowIncrements = [
41
        'num_extract'     => 0,
42
        'num_extractor'   => 0,
43
        'num_join'        => 0,
44
        'num_joiner'      => 0,
45
        'num_merge'       => 0,
46
        'num_records'     => 'num_iterate',
47
        'num_transform'   => 0,
48
        'num_transformer' => 0,
49
        'num_qualifier'   => 0,
50
        'num_qualify'     => 0,
51
        'num_branch'      => 0,
52
        'num_load'        => 0,
53
        'num_loader'      => 0,
54
        'num_flush'       => 0,
55
    ];
56
57
    /**
58
     * The revers aggregate lookup table
59
     *
60
     * @var array
61
     */
62
    protected $reverseAggregateTable = [];
63
64
    /**
65
     * @var bool
66
     */
67
    protected $forceFlush = false;
68
69
    /**
70
     * Adds an extractor to the Flow which may be aggregated with another one
71
     *
72
     * @param ExtractorInterface      $extractor
73
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
74
     *
75
     * @throws YaEtlException
76
     * @throws NodalFlowException
77
     *
78
     * @return static
79
     */
80
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null): self
81
    {
82
        if ($aggregateWith !== null) {
83
            $this->aggregateTo($extractor, $aggregateWith);
84
        } else {
85
            parent::add($extractor);
86
            $this->flowMap->incrementFlow('num_extractor');
87
        }
88
89
        return $this;
90
    }
91
92
    /**
93
     * @param QualifierInterface $qualifier
94
     *
95
     * @throws NodalFlowException
96
     *
97
     * @return static
98
     */
99
    public function qualify(QualifierInterface $qualifier): self
100
    {
101
        parent::add($qualifier);
102
        $this->flowMap->incrementFlow('num_qualifier');
103
104
        return $this;
105
    }
106
107
    /**
108
     * Override NodalFlow's add method to prohibit its direct usage
109
     *
110
     * @param NodeInterface $node
111
     *
112
     * @throws YaEtlException
113
     *
114
     * @return FlowInterface
115
     */
116
    public function add(NodeInterface $node): FlowInterface
117
    {
118
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
119
    }
120
121
    /**
122
     * By default, branched flows will only see their
123
     * `flush()` method called when the top most parent
124
     * triggers its own `flush()`.
125
     * It make sense most of the time to to do so as
126
     * the most common use case for branches so far is
127
     * to deal with one record at a time without generating
128
     * records (even when left joining). In such case,
129
     * the `flush()` method really need to be called by the flow
130
     * exactly when the top most flow one is.
131
     *
132
     * Set to true if you are generating many records in a branch
133
     * and it makes sense to flush the branch more often
134
     * Also note that the branch will also be flushed at the end
135
     * of its top most parent flow.
136
     *
137
     * @param bool $forceFlush
138
     *
139
     * @return static
140
     */
141
    public function forceFlush(bool $forceFlush): self
142
    {
143
        $this->forceFlush = $forceFlush;
144
145
        return $this;
146
    }
147
148
    /**
149
     * Adds a Joiner to a specific Extractor in the FLow
150
     *
151
     * @param JoinableInterface $extractor
152
     * @param JoinableInterface $joinFrom
153
     * @param OnClauseInterface $onClause
154
     *
155
     * @throws NodalFlowException
156
     *
157
     * @return static
158
     */
159
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause): self
160
    {
161
        $joinFrom->registerJoinerOnClause($onClause);
162
        $extractor->setJoinFrom($joinFrom);
163
        $extractor->setOnClause($onClause);
164
165
        parent::add($extractor);
166
        $this->flowMap->incrementFlow('num_joiner');
167
168
        return $this;
169
    }
170
171
    /**
172
     * Adds a Transformer to the Flow
173
     *
174
     * @param TransformerInterface $transformer
175
     *
176
     * @throws NodalFlowException
177
     *
178
     * @return static
179
     */
180
    public function transform(TransformerInterface $transformer): self
181
    {
182
        parent::add($transformer);
183
        $this->flowMap->incrementFlow('num_transformer');
184
185
        return $this;
186
    }
187
188
    /**
189
     * Adds a Loader to the Flow
190
     *
191
     * @param LoaderInterface $loader
192
     *
193
     * @throws NodalFlowException
194
     *
195
     * @return static
196
     */
197
    public function to(LoaderInterface $loader): self
198
    {
199
        parent::add($loader);
200
        $this->flowMap->incrementFlow('num_loader');
201
202
        return $this;
203
    }
204
205
    /**
206
     * Adds a Branch (Flow) to the Flow
207
     *
208
     * @param YaEtl $flow            The Branch to add in this Flow
209
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
210
     *                               a bag of Nodes to execute at this location of the Flow
211
     *
212
     * @throws NodalFlowException
213
     *
214
     * @return static
215
     */
216
    public function branch(self $flow, $isAReturningVal = false): self
217
    {
218
        parent::add(new BranchNode($flow, $isAReturningVal));
219
        $this->flowMap->incrementFlow('num_branch');
220
221
        return $this;
222
    }
223
224
    /**
225
     * KISS method to expose basic stats
226
     *
227
     * @return array<string,integer|string>
228
     */
229
    public function getStats(): array
230
    {
231
        $stats = parent::getstats();
232
233
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_QUALIFIER_TOTAL} Qualifier - {NUM_QUALIFY_TOTAL} Qualify
234
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load - {NUM_FLUSH_TOTAL} Flush
235
[YaEtl] {NUM_RECORDS_TOTAL} Record - {NUM_ITERATE_TOTAL} Iterations - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break
236
[YaEtl] {NUM_BRANCH_TOTAL} Branch - {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join
237
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
238
239
        $vars = [];
240
        foreach ($this->flowIncrements as $key => $ignore) {
241
            $stats[$key . '_total'] += $stats[$key];
242
        }
243
244
        foreach ($stats as $varName => $value) {
245
            if (is_array($value)) {
246
                continue;
247
            }
248
249
            if (is_numeric($value)) {
250
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
251
                continue;
252
            }
253
254
            $vars['{' . strtoupper($varName) . '}'] = $value;
255
        }
256
257
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
258
259
        return $stats;
260
    }
261
262
    /**
263
     * Tells if the flow is set to force flush
264
     * Only used when branched (to tell the parent)
265
     *
266
     * @return bool
267
     */
268
    public function isForceFlush(): bool
269
    {
270
        return !empty($this->forceFlush);
271
    }
272
273
    public function limit(?int $limit, bool $recursive = false): self
274
    {
275
        foreach ($this->nodes as $node) {
276
            if ($node instanceof ExtractorLimitInterface) {
277
                $node->setLimit($limit);
278
                continue;
279
            }
280
281
            if ($recursive && $node instanceof BranchNodeInterface) {
282
                $flow = $node->getPayload();
283
                if (is_a($flow, static::class)) {
284
                    /* @var static $flow */
285
                    $flow->limit($limit, $recursive);
286
                }
287
            }
288
        }
289
290
        return $this;
291
    }
292
293
    /**
294
     * Triggered right after the flow stops
295
     *
296
     * @return static
297
     */
298
    protected function flowEnd(): NodalFlow
299
    {
300
        $this->flush();
301
302
        parent::flowEnd();
303
304
        return $this;
305
    }
306
307
    /**
308
     * @param string $class
309
     *
310
     * @throws \ReflectionException
311
     *
312
     * @return static
313
     */
314
    protected function initDispatchArgs(string $class): FlowEventAbstract
315
    {
316
        parent::initDispatchArgs($class);
317
        $this->dispatchArgs[$this->eventInstanceKey] = new YaEtlEvent($this);
318
319
        return $this;
320
    }
321
322
    /**
323
     * Used internally to aggregate Extractors
324
     *
325
     * @param ExtractorInterface $extractor
326
     * @param ExtractorInterface $aggregateWith
327
     *
328
     * @throws YaEtlException
329
     * @throws NodalFlowException
330
     *
331
     * @return static
332
     */
333
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith): self
334
    {
335
        // aggregate with target Node
336
        $aggregateWithNodeId = $aggregateWith->getId();
337
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
338
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
339
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
340
        }
341
342
        /* @var TraversableNodeInterface $aggregateWithNode */
343
        if (isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
344
            /** @var AggregateNodeInterface $aggregateWithNode */
345
            $aggregateWithNode = $this->reverseAggregateTable[$aggregateWithNodeId];
346
            $aggregateWithNode->addTraversable($extractor);
347
348
            return $this;
349
        }
350
351
        $aggregateNode = new AggregateExtractor(true);
352
        // keep track of this extractor before we bury it in the aggregate
353
        $this->reverseAggregateTable[$aggregateWith->getId()] = $aggregateNode;
354
        // now replace its slot in the main tree
355
        $this->replace($aggregateWithIdx, $aggregateNode);
0 ignored issues
show
Bug introduced by
It seems like $aggregateWithIdx can also be of type null; however, parameter $nodeIdx of fab2s\NodalFlow\NodalFlow::replace() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

355
        $this->replace(/** @scrutinizer ignore-type */ $aggregateWithIdx, $aggregateNode);
Loading history...
356
        $aggregateNode->addTraversable($aggregateWith)
357
            ->addTraversable($extractor);
358
359
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
360
        $reg = &$this->registry->get($this->getId());
361
        --$reg['flowStats']['num_extractor'];
362
363
        return $this;
364
    }
365
366
    /**
367
     * Calls each WorkFlow's loaders and branch flush method
368
     *
369
     * @param FlowStatusInterface|null $flowStatus
370
     *
371
     * @return static
372
     */
373
    protected function flush(FlowStatusInterface $flowStatus = null): self
374
    {
375
        if ($flowStatus === null) {
376
            if ($this->hasParent() && !$this->isForceFlush()) {
377
                // we'll get another chance at this
378
                return $this;
379
            }
380
381
            // use current status
382
            return $this->flushNodes($this->flowStatus);
383
        }
384
385
        // use parent's status
386
        return $this->flushNodes($flowStatus);
387
    }
388
389
    /**
390
     * Actually flush nodes
391
     *
392
     * @param FlowStatusInterface $flowStatus
393
     *
394
     * @return static
395
     */
396
    protected function flushNodes(FlowStatusInterface $flowStatus): self
397
    {
398
        foreach ($this->nodes as $node) {
399
            if ($node instanceof LoaderInterface) {
400
                $node->flush($flowStatus);
401
                $this->flowMap->incrementFlow('num_flush');
402
                $this->triggerEvent(YaEtlEvent::FLOW_FLUSH, $node);
403
                continue;
404
            }
405
406
            // start with only flushing YaEtl and extends
407
            if ($node instanceof BranchNodeInterface) {
408
                $flow = $node->getPayload();
409
                if (is_a($flow, static::class)) {
410
                    /* @var YaEtl $flow */
411
                    $flow->flush($flowStatus);
412
                }
413
            }
414
        }
415
416
        return $this;
417
    }
418
}
419