Passed
Push — master ( 7bba5d...c1c3fd )
by Fabrice
02:35
created

YaEtl::flushNodes()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 18
rs 8.8571
cc 5
eloc 10
nc 4
nop 1
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowStatusInterface;
13
use fab2s\NodalFlow\NodalFlow;
14
use fab2s\NodalFlow\Nodes\AggregateNode;
15
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
16
use fab2s\NodalFlow\Nodes\BranchNode;
17
use fab2s\NodalFlow\Nodes\NodeInterface;
18
use fab2s\NodalFlow\YaEtlException;
19
use fab2s\YaEtl\Extractors\ExtractorInterface;
20
use fab2s\YaEtl\Extractors\JoinableInterface;
21
use fab2s\YaEtl\Extractors\OnClauseInterface;
22
use fab2s\YaEtl\Loaders\LoaderInterface;
23
use fab2s\YaEtl\Transformers\TransformerInterface;
24
25
/**
26
 * Class YaEtl
27
 */
28
class YaEtl extends NodalFlow
29
{
30
    /**
31
     * The stats items added to NodalFlow's ones
32
     *
33
     * @var array
34
     */
35
    protected $stats = [
36
        'start'           => null,
37
        'end'             => null,
38
        'duration'        => null,
39
        'mib'             => null,
40
        'report'          => '',
41
        'num_extract'     => 0,
42
        'num_extractor'   => 0,
43
        'num_join'        => 0,
44
        'num_joiner'      => 0,
45
        'num_merge'       => 0,
46
        'num_records'     => 0,
47
        'num_transform'   => 0,
48
        'num_transformer' => 0,
49
        'num_branch'      => 0,
50
        'num_load'        => 0,
51
        'num_loader'      => 0,
52
        'num_flush'       => 0,
53
        'invocations'     => [],
54
        'nodes'           => [],
55
    ];
56
57
    /**
58
     * The revers aggregate lookup table
59
     *
60
     * @var array
61
     */
62
    protected $reverseAggregateTable = [];
63
64
    /**
65
     * @var bool
66
     */
67
    protected $forceFlush = false;
68
69
    /**
70
     * Adds an extractor to the Flow which may be aggregated with another one
71
     *
72
     * @param ExtractorInterface      $extractor
73
     * @param null|ExtractorInterface $aggregateWith Use the extractore instance you want to aggregate with
74
     *
75
     * @return $this
76
     */
77
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null)
78
    {
79
        $this->enforceNodeInstanceUnicity($extractor);
80
        if ($aggregateWith !== null) {
81
            $this->aggregateTo($extractor, $aggregateWith);
82
        } else {
83
            parent::add($extractor);
84
        }
85
86
        ++$this->stats['num_extractor'];
87
88
        return $this;
89
    }
90
91
    /**
92
     * Override NodalFlow's add method to prohibit its direct usage
93
     *
94
     * @param NodeInterface $node
95
     *
96
     * @throws YaEtlException
97
     */
98
    public function add(NodeInterface $node)
99
    {
100
        throw new YaEtlException('add() is not directly available, use YaEtl grammar from(), transform(), join() and / or to() instead');
101
    }
102
103
    /**
104
     * By default, branched flows will only see their
105
     * `flush()` method called when the top most parent
106
     * triggers its own `flush()`.
107
     * It make sense most of the time to to do so as
108
     * the most common use case for branches so far is
109
     * to deal with one record at a time without generating
110
     * records (even when left joining). In such case,
111
     * the `flush()` method really need to be called by the flow
112
     * exactly when the top most flow one is.
113
     *
114
     * Set to true if you are generating many records in a branch
115
     * and it makes sense to flush the branch more often
116
     * Also note that the branch will also be flushed at the end
117
     * of its top most parent.
118
     *
119
     * @param bool $forceFlush
120
     *
121
     * @return $this
122
     */
123
    public function forceFlush($forceFlush)
124
    {
125
        $this->forceFlush = (bool) $forceFlush;
126
127
        return $this;
128
    }
129
130
    /**
131
     * Adds a Joiner to a specific Extractor in the FLow
132
     *
133
     * @param JoinableInterface $extractor
134
     * @param JoinableInterface $joinFrom
135
     * @param OnClauseInterface $onClause
136
     *
137
     * @return $this
138
     */
139
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause)
140
    {
141
        $this->enforceNodeInstanceUnicity($extractor);
142
        $joinFrom->registerJoinerOnClause($onClause);
143
        $extractor->setJoinFrom($joinFrom);
144
        $extractor->setOnClause($onClause);
145
146
        parent::add($extractor);
147
        ++$this->stats['num_joiner'];
148
149
        return $this;
150
    }
151
152
    /**
153
     * Adds a Transformer to the Flow
154
     *
155
     * @param TransformerInterface $transformer
156
     *
157
     * @return $this
158
     */
159
    public function transform(TransformerInterface $transformer)
160
    {
161
        $this->enforceNodeInstanceUnicity($transformer);
162
        parent::add($transformer);
163
        ++$this->stats['num_transformer'];
164
165
        return $this;
166
    }
167
168
    /**
169
     * Adds a Loader to the Flow
170
     *
171
     * @param LoaderInterface $loader
172
     *
173
     * @return $this
174
     */
175
    public function to(LoaderInterface $loader)
176
    {
177
        $this->enforceNodeInstanceUnicity($loader);
178
        parent::add($loader);
179
        ++$this->stats['num_loader'];
180
181
        return $this;
182
    }
183
184
    /**
185
     * Adds a Branch (Flow) to the Flow
186
     *
187
     * @staticvar type $flowHashes
188
     *
189
     * @param YaEtl $flow            The Branch to add in this Flow
190
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
191
     *                               a bag of Nodes to execute at this location of the Flow
192
     *
193
     * @throws YaEtlException
194
     *
195
     * @return $this
196
     */
197
    public function branch(YaEtl $flow, $isAReturningVal = false)
198
    {
199
        static $flowHashes;
200
        if (!isset($flowHashes)) {
201
            $flowHashes = [
202
                $this->objectHash($this) => 1,
203
            ];
204
        }
205
206
        $flowHash = $this->objectHash($flow);
207
        if (isset($flowHashes[$flowHash])) {
208
            throw new YaEtlException('An instance of ' . \get_class($flow) . ' appears to be already in use in this flow. Please clone / re new before reuse');
209
        }
210
211
        $flowHashes[$flowHash] = 1;
212
213
        parent::add(new BranchNode($flow, $isAReturningVal));
214
        ++$this->stats['num_branch'];
215
216
        return $this;
217
    }
218
219
    /**
220
     * Triggered right after the flow stops
221
     *
222
     * @return $this
223
     */
224
    public function flowEnd()
225
    {
226
        $this->flush();
227
228
        parent::flowEnd();
229
230
        return $this;
231
    }
232
233
    /**
234
     * KISS method to expose basic stats
235
     *
236
     * @return array
0 ignored issues
show
Documentation introduced by
Consider making the return type a bit more specific; maybe use array<string,integer|double|string>.

This check looks for the generic type array as a return type and suggests a more specific type. This type is inferred from the actual code.

Loading history...
237
     */
238
    public function getStats()
239
    {
240
        $stats          = $this->processStats(parent::getstats());
241
        $stats['nodes'] = $this->getNodeStats();
242
243
        $this->collectNodeStats($stats);
244
245
        $stats['duration'] = $stats['end'] - $stats['start'];
246
        $stats             = \array_replace($stats, $this->duration($stats['duration']));
247
        $stats['report']   = \sprintf(
248
            '[YaEtl](%s) %s Extractor - %s Extract - %s Record (%s Iterations)
249
[YaEtl] %s Joiner - %s Join - %s Continue - %s Break - %s Branch
250
[YaEtl] %s Transformer - %s Transform - %s Loader - %s Load - %s Flush
251
[YaEtl] Time : %s - Memory: %4.2fMiB',
252
            $this->flowStatus,
253
            \number_format($stats['num_extractor'], 0, '.', ' '),
254
            \number_format($stats['num_extract'], 0, '.', ' '),
255
            \number_format($stats['num_records'], 0, '.', ' '),
256
            \number_format($this->numIterate, 0, '.', ' '),
257
            \number_format($stats['num_joiner'], 0, '.', ' '),
258
            \number_format($stats['num_join'], 0, '.', ' '),
259
            \number_format($this->numContinue, 0, '.', ' '),
260
            \number_format($this->numBreak, 0, '.', ' '),
261
            \number_format($stats['num_branch'], 0, '.', ' '),
262
            \number_format($stats['num_transformer'], 0, '.', ' '),
263
            \number_format($stats['num_transform'], 0, '.', ' '),
264
            \number_format($stats['num_loader'], 0, '.', ' '),
265
            \number_format($stats['num_load'], 0, '.', ' '),
266
            \number_format($stats['num_flush'], 0, '.', ' '),
267
            $stats['durationStr'],
268
            $stats['mib']
269
        );
270
271
        return $stats;
272
    }
273
274
    /**
275
     * Tells if the flow is set to force flush
276
     * Only used when branched (to tell the parent)
277
     *
278
     * @return bool
279
     */
280
    public function isForceFlush()
281
    {
282
        return !empty($this->forceFlush);
283
    }
284
285
    /**
286
     * Used internally to aggregate Extracors
287
     *
288
     * @param ExtractorInterface $extractor
289
     * @param ExtractorInterface $aggregateWith
290
     *
291
     * @throws YaEtlException
292
     *
293
     * @return $this
294
     */
295
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith)
296
    {
297
        // aggregate with target Node
298
        $nodeHash = $aggregateWith->getNodeHash();
299
        if (!isset($this->nodeMap[$nodeHash]) && !isset($this->reverseAggregateTable[$nodeHash])) {
300
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
301
        }
302
303
        $aggregateWithIdx = isset($this->nodeMap[$nodeHash]) ? $this->nodeMap[$nodeHash]['index'] : $this->reverseAggregateTable[$nodeHash];
304
        if ($this->nodes[$aggregateWithIdx] instanceof AggregateNodeInterface) {
305
            $this->nodes[$aggregateWithIdx]->addTraversable($extractor);
306
            // aggregate node did take care of setting carrier and hash
307
            $this->reverseAggregateTable[$extractor->getNodeHash()] = $aggregateWithIdx;
308
309
            return $this;
310
        }
311
312
        $aggregateNode = new AggregateNode(true);
313
        $aggregateNode->addTraversable($this->nodes[$aggregateWithIdx])
314
                ->addTraversable($extractor);
315
        // keep track of this extractor before we burry it in the aggregate
316
        $this->reverseAggregateTable[$this->nodes[$aggregateWithIdx]->getNodeHash()] = $aggregateWithIdx;
317
        // now replace its slot in the main tree
318
        $this->replace($aggregateWithIdx, $aggregateNode);
319
        // aggregate node did take care of setting carrier and hash
320
        $this->reverseAggregateTable[$aggregateNode->getNodeHash()]                  = $aggregateWithIdx;
321
        $this->reverseAggregateTable[$extractor->getNodeHash()]                      = $aggregateWithIdx;
322
323
        return $this;
324
    }
325
326
    /**
327
     * Collect Nodes stats
328
     *
329
     * @param array $stats
330
     *
331
     * @return $this
332
     */
333
    protected function collectNodeStats(array &$stats)
334
    {
335
        $stats = \array_replace($this->statsDefault, $stats);
336
        foreach ($this->nodes as $nodeIdx => $node) {
337
            if (($node instanceof JoinableInterface) && $node->getOnClause()) {
338
                $this->nodeStats[$nodeIdx]['num_join'] = $node->getNumRecords();
339
                $stats['num_join'] += $this->nodeStats[$nodeIdx]['num_join'];
340
            } elseif ($node instanceof ExtractorInterface) {
341
                $this->nodeStats[$nodeIdx]['num_records'] = $this->nodeStats[$nodeIdx]['num_iterate'];
342
                $this->nodeStats[$nodeIdx]['num_extract'] = $node->getNumExtract();
343
                $stats['num_records'] += $this->nodeStats[$nodeIdx]['num_iterate'];
344
                $stats['num_extract'] += $this->nodeStats[$nodeIdx]['num_extract'];
345
            } elseif ($node instanceof TransformerInterface) {
346
                $this->nodeStats[$nodeIdx]['num_transform'] = $this->nodeStats[$nodeIdx]['num_exec'];
347
                $stats['num_transform'] += $this->nodeStats[$nodeIdx]['num_transform'];
348
            } elseif ($node instanceof LoaderInterface) {
349
                $this->nodeStats[$nodeIdx]['num_load'] = $this->nodeStats[$nodeIdx]['num_exec'];
350
                $stats['num_load'] += $this->nodeStats[$nodeIdx]['num_load'];
351
            } elseif ($node instanceof AggregateNodeInterface) {
352
                $this->nodeStats[$nodeIdx]['num_records'] = $this->nodeStats[$nodeIdx]['num_iterate'];
353
                $stats['num_records'] += $this->nodeStats[$nodeIdx]['num_iterate'];
354
                $this->nodeStats[$nodeIdx]['num_extract'] = 0;
355
                foreach ($node->getNodeCollection() as $extractorNode) {
356
                    $this->nodeStats[$nodeIdx]['num_extract'] += $extractorNode->getNumExtract();
357
                }
358
359
                $stats['num_extract'] += $this->nodeStats[$nodeIdx]['num_extract'];
360
            }
361
        }
362
363
        return $this;
364
    }
365
366
    /**
367
     * Replaces a node with another one
368
     *
369
     * @param type          $nodeIdx
370
     * @param NodeInterface $node
371
     *
372
     * @throws YaEtlException
373
     *
374
     * @return $this
375
     */
376
    protected function replace($nodeIdx, NodeInterface $node)
377
    {
378
        if (!isset($this->nodes[$nodeIdx])) {
379
            throw new YaEtlException('Argument 1 should be a valid index in nodes, got:' . \gettype($nodeIdx));
380
        }
381
382
        unset($this->nodeMap[$this->nodeStats[$nodeIdx]['hash']], $this->nodeStats[$nodeIdx]);
383
        $nodeHash = $this->objectHash($node);
384
385
        $node->setCarrier($this)->setNodeHash($nodeHash);
386
387
        $this->nodes[$nodeIdx]    = $node;
388
        $this->nodeMap[$nodeHash] = \array_replace($this->nodeMapDefault, [
389
            'class'    => \get_class($node),
390
            'branchId' => $this->flowId,
391
            'hash'     => $nodeHash,
392
            'index'    => $nodeIdx,
393
        ]);
394
395
        // register references to nodeStats to increment faster
396
        // nodeStats can also be used as reverse lookup table
397
        $this->nodeStats[$nodeIdx] = &$this->nodeMap[$nodeHash];
398
399
        return $this;
400
    }
401
402
    /**
403
     * Compute final stats
404
     *
405
     * @param array $stats
406
     *
407
     * @return array
408
     */
409
    protected function processStats($stats)
410
    {
411
        if (!empty($stats['nodes'])) {
412
            $stats['nodes'] = $this->processStats($stats['nodes']);
413
        }
414
415
        if (empty($stats['invocations'])) {
416
            return $stats;
417
        }
418
419
        foreach ($stats['invocations'] as &$value) {
420
            $value           = \array_replace($value, $this->duration($value['duration']));
421
            $value['report'] = \sprintf('[YaEtl] Time : %s - Memory: %4.2fMiB',
422
                $value['durationStr'],
423
                $value['mib']
424
            );
425
        }
426
427
        return $stats;
428
    }
429
430
    /**
431
     * It could lead to really tricky situation if we where to
432
     * allow multiple instances of the same node. It's obviously
433
     * wrong with an Extractor, but even a Transformer could
434
     * create dark corner cases.
435
     *
436
     * @param NodeInterface $node
437
     *
438
     * @throws YaEtlException
439
     *
440
     * @return $this
441
     */
442
    protected function enforceNodeInstanceUnicity(NodeInterface $node)
443
    {
444
        if ($this->findNodeHashInMap($this->objectHash($node), $this->getNodeMap())) {
445
            throw new YaEtlException('This instance of ' . \get_class($node) . ' appears to be already in use in this flow. Please deep clone / re new before reuse');
446
        }
447
448
        return $this;
449
    }
450
451
    /**
452
     * Find a Node by its hash in a nodemap, used to enfore Node instance unicity
453
     *
454
     * @param string $hash
455
     * @param array  $nodeMap
456
     *
457
     * @return bool
458
     */
459
    protected function findNodeHashInMap($hash, $nodeMap)
460
    {
461
        if (isset($nodeMap[$hash])) {
462
            return true;
463
        }
464
465
        foreach ($nodeMap as $mapData) {
466
            if (
467
                !empty($mapData['nodes']) &&
468
                $this->findNodeHashInMap($hash, $mapData['nodes'])
469
            ) {
470
                return true;
471
            }
472
        }
473
474
        return false;
475
    }
476
477
    /**
478
     * Calls each WorkFlow's loaders and branch flush method
479
     *
480
     * @return $this
481
     */
482
    protected function flush(FlowStatusInterface $flowStatus = null)
483
    {
484
        if ($flowStatus === null) {
485
            if ($this->hasParent() && !$this->isForceFlush()) {
486
                // we'll get another chance at this
487
                return $this;
488
            }
489
490
            // use current status
491
            return $this->flushNodes($this->flowStatus);
492
        }
493
494
        // use parent's status
495
        return $this->flushNodes($flowStatus);
496
    }
497
498
    /**
499
     * Actually flush nodes
500
     *
501
     * @param FlowStatusInterface $flowStatus
502
     *
503
     * @return $this
504
     */
505
    protected function flushNodes(FlowStatusInterface $flowStatus)
506
    {
507
        foreach ($this->nodes as $node) {
508
            if ($node instanceof LoaderInterface) {
509
                $node->flush($flowStatus);
510
                ++$this->stats['num_flush'];
511
                continue;
512
            }
513
514
            // start with only flushing YaEtl and extends
515
            if ($node instanceof BranchNode && \is_a($node->getPayload(), static::class)) {
516
                $node->getPayload()->flush($flowStatus);
0 ignored issues
show
Bug introduced by
The method flush cannot be called on $node->getPayload() (of type callable).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
517
                ++$this->stats['num_flush'];
518
            }
519
        }
520
521
        return $this;
522
    }
523
}
524