YaEtl::getStats()   A
last analyzed

Complexity

Conditions 6
Paths 8

Size

Total Lines 31
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 1
Metric Value
cc 6
eloc 14
c 4
b 0
f 1
nc 8
nop 0
dl 0
loc 31
rs 9.2222
1
<?php
2
3
/*
4
 * This file is part of YaEtl
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowEventAbstract;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowStatusInterface;
15
use fab2s\NodalFlow\NodalFlow;
16
use fab2s\NodalFlow\NodalFlowException;
17
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
18
use fab2s\NodalFlow\Nodes\BranchNode;
19
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
20
use fab2s\NodalFlow\Nodes\NodeInterface;
21
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
22
use fab2s\YaEtl\Events\YaEtlEvent;
23
use fab2s\YaEtl\Extractors\AggregateExtractor;
24
use fab2s\YaEtl\Extractors\ExtractorInterface;
25
use fab2s\YaEtl\Extractors\JoinableInterface;
26
use fab2s\YaEtl\Extractors\OnClauseInterface;
27
use fab2s\YaEtl\Loaders\LoaderInterface;
28
use fab2s\YaEtl\Qualifiers\QualifierInterface;
29
use fab2s\YaEtl\Transformers\TransformerInterface;
30
31
/**
32
 * Class YaEtl
33
 */
34
class YaEtl extends NodalFlow
35
{
36
    /**
37
     * @var array
38
     */
39
    protected $flowIncrements = [
40
        'num_extract'     => 0,
41
        'num_extractor'   => 0,
42
        'num_join'        => 0,
43
        'num_joiner'      => 0,
44
        'num_merge'       => 0,
45
        'num_records'     => 'num_iterate',
46
        'num_transform'   => 0,
47
        'num_transformer' => 0,
48
        'num_qualifier'   => 0,
49
        'num_qualify'     => 0,
50
        'num_branch'      => 0,
51
        'num_load'        => 0,
52
        'num_loader'      => 0,
53
        'num_flush'       => 0,
54
    ];
55
56
    /**
57
     * The revers aggregate lookup table
58
     *
59
     * @var array
60
     */
61
    protected $reverseAggregateTable = [];
62
63
    /**
64
     * @var bool
65
     */
66
    protected $forceFlush = false;
67
68
    /**
69
     * Adds an extractor to the Flow which may be aggregated with another one
70
     *
71
     * @param ExtractorInterface      $extractor
72
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
73
     *
74
     * @throws YaEtlException
75
     * @throws NodalFlowException
76
     *
77
     * @return static
78
     */
79
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null): self
80
    {
81
        if ($aggregateWith !== null) {
82
            $this->aggregateTo($extractor, $aggregateWith);
83
        } else {
84
            parent::add($extractor);
85
            $this->flowMap->incrementFlow('num_extractor');
86
        }
87
88
        return $this;
89
    }
90
91
    /**
92
     * @param QualifierInterface $qualifier
93
     *
94
     * @throws NodalFlowException
95
     *
96
     * @return static
97
     */
98
    public function qualify(QualifierInterface $qualifier): self
99
    {
100
        parent::add($qualifier);
101
        $this->flowMap->incrementFlow('num_qualifier');
102
103
        return $this;
104
    }
105
106
    /**
107
     * Override NodalFlow's add method to prohibit its direct usage
108
     *
109
     * @param NodeInterface $node
110
     *
111
     * @throws YaEtlException
112
     *
113
     * @return FlowInterface
114
     */
115
    public function add(NodeInterface $node): FlowInterface
116
    {
117
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
118
    }
119
120
    /**
121
     * By default, branched flows will only see their
122
     * `flush()` method called when the top most parent
123
     * triggers its own `flush()`.
124
     * It make sense most of the time to to do so as
125
     * the most common use case for branches so far is
126
     * to deal with one record at a time without generating
127
     * records (even when left joining). In such case,
128
     * the `flush()` method really need to be called by the flow
129
     * exactly when the top most flow one is.
130
     *
131
     * Set to true if you are generating many records in a branch
132
     * and it makes sense to flush the branch more often
133
     * Also note that the branch will also be flushed at the end
134
     * of its top most parent flow.
135
     *
136
     * @param bool $forceFlush
137
     *
138
     * @return static
139
     */
140
    public function forceFlush(bool $forceFlush): self
141
    {
142
        $this->forceFlush = $forceFlush;
143
144
        return $this;
145
    }
146
147
    /**
148
     * Adds a Joiner to a specific Extractor in the FLow
149
     *
150
     * @param JoinableInterface $extractor
151
     * @param JoinableInterface $joinFrom
152
     * @param OnClauseInterface $onClause
153
     *
154
     * @throws NodalFlowException
155
     *
156
     * @return static
157
     */
158
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause): self
159
    {
160
        $joinFrom->registerJoinerOnClause($onClause);
161
        $extractor->setJoinFrom($joinFrom);
162
        $extractor->setOnClause($onClause);
163
164
        parent::add($extractor);
165
        $this->flowMap->incrementFlow('num_joiner');
166
167
        return $this;
168
    }
169
170
    /**
171
     * Adds a Transformer to the Flow
172
     *
173
     * @param TransformerInterface $transformer
174
     *
175
     * @throws NodalFlowException
176
     *
177
     * @return static
178
     */
179
    public function transform(TransformerInterface $transformer): self
180
    {
181
        parent::add($transformer);
182
        $this->flowMap->incrementFlow('num_transformer');
183
184
        return $this;
185
    }
186
187
    /**
188
     * Adds a Loader to the Flow
189
     *
190
     * @param LoaderInterface $loader
191
     *
192
     * @throws NodalFlowException
193
     *
194
     * @return static
195
     */
196
    public function to(LoaderInterface $loader): self
197
    {
198
        parent::add($loader);
199
        $this->flowMap->incrementFlow('num_loader');
200
201
        return $this;
202
    }
203
204
    /**
205
     * Adds a Branch (Flow) to the Flow
206
     *
207
     * @param YaEtl $flow            The Branch to add in this Flow
208
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
209
     *                               a bag of Nodes to execute at this location of the Flow
210
     *
211
     * @throws NodalFlowException
212
     *
213
     * @return static
214
     */
215
    public function branch(self $flow, $isAReturningVal = false): self
216
    {
217
        parent::add(new BranchNode($flow, $isAReturningVal));
218
        $this->flowMap->incrementFlow('num_branch');
219
220
        return $this;
221
    }
222
223
    /**
224
     * Triggered right after the flow stops
225
     *
226
     * @return static
227
     */
228
    public function flowEnd(): NodalFlow
229
    {
230
        $this->flush();
231
232
        parent::flowEnd();
233
234
        return $this;
235
    }
236
237
    /**
238
     * KISS method to expose basic stats
239
     *
240
     * @return array<string,integer|string>
241
     */
242
    public function getStats(): array
243
    {
244
        $stats = parent::getstats();
245
246
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations)
247
[YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_QUALIFIER_TOTAL} Qualifier - {NUM_QUALIFY_TOTAL} Qualify
248
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load
249
[YaEtl] {NUM_BRANCH_TOTAL} Branch - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_FLUSH_TOTAL} Flush
250
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
251
252
        $vars = [];
253
        foreach ($this->flowIncrements as $key => $ignore) {
254
            $stats[$key . '_total'] += $stats[$key];
255
        }
256
257
        foreach ($stats as $varName => $value) {
258
            if (is_array($value)) {
259
                continue;
260
            }
261
262
            if (is_numeric($value)) {
263
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
264
                continue;
265
            }
266
267
            $vars['{' . strtoupper($varName) . '}'] = $value;
268
        }
269
270
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
271
272
        return $stats;
273
    }
274
275
    /**
276
     * Tells if the flow is set to force flush
277
     * Only used when branched (to tell the parent)
278
     *
279
     * @return bool
280
     */
281
    public function isForceFlush(): bool
282
    {
283
        return !empty($this->forceFlush);
284
    }
285
286
    /**
287
     * @param string $class
288
     *
289
     * @throws \ReflectionException
290
     *
291
     * @return static
292
     */
293
    protected function initDispatchArgs(string $class): FlowEventAbstract
294
    {
295
        parent::initDispatchArgs($class);
296
        $this->dispatchArgs[$this->eventInstanceKey] = new YaEtlEvent($this);
297
298
        return $this;
299
    }
300
301
    /**
302
     * Used internally to aggregate Extractors
303
     *
304
     * @param ExtractorInterface $extractor
305
     * @param ExtractorInterface $aggregateWith
306
     *
307
     * @throws YaEtlException
308
     * @throws NodalFlowException
309
     *
310
     * @return static
311
     */
312
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith): self
313
    {
314
        // aggregate with target Node
315
        $aggregateWithNodeId = $aggregateWith->getId();
316
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
317
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
318
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
319
        }
320
321
        /* @var TraversableNodeInterface $aggregateWithNode */
322
        if (isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
323
            /** @var AggregateNodeInterface $aggregateWithNode */
324
            $aggregateWithNode = $this->reverseAggregateTable[$aggregateWithNodeId];
325
            $aggregateWithNode->addTraversable($extractor);
326
327
            return $this;
328
        }
329
330
        $aggregateNode = new AggregateExtractor(true);
331
        // keep track of this extractor before we bury it in the aggregate
332
        $this->reverseAggregateTable[$aggregateWith->getId()] = $aggregateNode;
333
        // now replace its slot in the main tree
334
        $this->replace($aggregateWithIdx, $aggregateNode);
0 ignored issues
show
Bug introduced by
It seems like $aggregateWithIdx can also be of type null; however, parameter $nodeIdx of fab2s\NodalFlow\NodalFlow::replace() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

334
        $this->replace(/** @scrutinizer ignore-type */ $aggregateWithIdx, $aggregateNode);
Loading history...
335
        $aggregateNode->addTraversable($aggregateWith)
336
            ->addTraversable($extractor);
337
338
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
339
        $reg = &$this->registry->get($this->getId());
340
        --$reg['flowStats']['num_extractor'];
341
342
        return $this;
343
    }
344
345
    /**
346
     * Calls each WorkFlow's loaders and branch flush method
347
     *
348
     * @param FlowStatusInterface|null $flowStatus
349
     *
350
     * @return static
351
     */
352
    protected function flush(FlowStatusInterface $flowStatus = null): self
353
    {
354
        if ($flowStatus === null) {
355
            if ($this->hasParent() && !$this->isForceFlush()) {
356
                // we'll get another chance at this
357
                return $this;
358
            }
359
360
            // use current status
361
            return $this->flushNodes($this->flowStatus);
362
        }
363
364
        // use parent's status
365
        return $this->flushNodes($flowStatus);
366
    }
367
368
    /**
369
     * Actually flush nodes
370
     *
371
     * @param FlowStatusInterface $flowStatus
372
     *
373
     * @return static
374
     */
375
    protected function flushNodes(FlowStatusInterface $flowStatus): self
376
    {
377
        foreach ($this->nodes as $node) {
378
            if ($node instanceof LoaderInterface) {
379
                $node->flush($flowStatus);
380
                $this->flowMap->incrementFlow('num_flush');
381
                $this->triggerEvent(YaEtlEvent::FLOW_FLUSH, $node);
382
                continue;
383
            }
384
385
            // start with only flushing YaEtl and extends
386
            if ($node instanceof BranchNodeInterface) {
387
                $flow = $node->getPayload();
388
                if (is_a($flow, static::class)) {
389
                    /* @var YaEtl $flow */
390
                    $flow->flush($flowStatus);
391
                }
392
            }
393
        }
394
395
        return $this;
396
    }
397
}
398