Completed
Push — master ( 5fe880...6d053f )
by Fabrice
03:04
created

YaEtl::add()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\NodalFlow;
13
use fab2s\NodalFlow\Nodes\AggregateNode;
14
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
15
use fab2s\NodalFlow\Nodes\BranchNode;
16
use fab2s\NodalFlow\Nodes\NodeInterface;
17
use fab2s\YaEtl\Extractors\ExtractorInterface;
18
use fab2s\YaEtl\Extractors\JoinableInterface;
19
use fab2s\YaEtl\Extractors\OnClauseInterface;
20
use fab2s\YaEtl\Loaders\LoaderInterface;
21
use fab2s\YaEtl\Transformers\TransformerInterface;
22
23
/**
24
 * Class YaEtl
25
 */
26
class YaEtl extends NodalFlow
27
{
28
    /**
29
     * The total amount of record to fetch, in case
30
     * there is a limit set
31
     *
32
     * @var int
33
     */
34
    protected $extractLimit;
35
36
    /**
37
     * @var array
38
     */
39
    protected $aggregateNodes = [];
40
41
    /**
42
     * @var int
43
     */
44
    protected $aggregateNodeIdx = 0;
45
46
    /**
47
     * @var array
48
     */
49
    protected $stats = [
50
        'start'           => null,
51
        'end'             => null,
52
        'duration'        => null,
53
        'mib'             => null,
54
        'report'          => '',
55
        'num_extract'     => 0,
56
        'num_extractor'   => 0,
57
        'num_join'        => 0,
58
        'num_joiner'      => 0,
59
        'num_merge'       => 0,
60
        'num_records'     => 0,
61
        'num_transform'   => 0,
62
        'num_transformer' => 0,
63
        'num_branch'      => 0,
64
        'num_load'        => 0,
65
        'num_loader'      => 0,
66
        'num_flush'       => 0,
67
        'invocations'     => [],
68
        'nodes'           => [],
69
    ];
70
71
    /**
72
     * @var array
73
     */
74
    protected $reverseAggregateTable = [];
75
76
    /**
77
     * @param int $recordLimit
78
     *
79
     * @return $this
80
     */
81
    public function setExtractLimit($recordLimit)
82
    {
83
        $this->extractLimit = max(1, (int) $recordLimit);
84
85
        return $this;
86
    }
87
88
    /**
89
     * @param ExtractorInterface      $extractor
90
     * @param null|ExtractorInterface $aggregateWith Use the extractore instance you want to aggregate with
91
     *
92
     * @return $this
93
     */
94
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null)
95
    {
96
        $this->enforceNodeInstanceUnicity($extractor);
97
        if ($aggregateWith !== null) {
98
            $this->aggregateTo($extractor, $aggregateWith);
99
        } else {
100
            parent::add($extractor);
101
        }
102
103
        ++$this->stats['num_extractor'];
104
105
        return $this;
106
    }
107
108
    /**
109
     * @param NodeInterface $node
110
     *
111
     * @throws Exception
112
     */
113
    public function add(NodeInterface $node)
114
    {
115
        throw new \Exception('[YaEtl] add() is not directly available, use YaEtl grammar from(), transform(), join() and / or to() instead');
116
    }
117
118
    /**
119
     * @param JoinableInterface $extractor
120
     * @param JoinableInterface $joinFrom
121
     * @param OnClauseInterface $onClause
122
     *
123
     * @return $this
124
     */
125
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause)
126
    {
127
        $this->enforceNodeInstanceUnicity($extractor);
128
        $joinFrom->registerJoinerOnClause($onClause);
129
        $extractor->setJoinFrom($joinFrom);
130
        $extractor->setOnClause($onClause);
131
132
        parent::add($extractor);
133
        ++$this->stats['num_joiner'];
134
135
        return $this;
136
    }
137
138
    /**
139
     * @param TransformerInterface $transformer
140
     *
141
     * @return $this
142
     */
143
    public function transform(TransformerInterface $transformer)
144
    {
145
        $this->enforceNodeInstanceUnicity($transformer);
146
        parent::add($transformer);
147
        ++$this->stats['num_transformer'];
148
149
        return $this;
150
    }
151
152
    /**
153
     * @param LoaderInterface $loader
154
     *
155
     * @return $this
156
     */
157
    public function to(LoaderInterface $loader)
158
    {
159
        $this->enforceNodeInstanceUnicity($loader);
160
        parent::add($loader);
161
        ++$this->stats['num_loader'];
162
163
        return $this;
164
    }
165
166
    /**
167
     * @staticvar type $flowHashes
168
     *
169
     * @param YaEtl $flow
170
     * @param type  $isAReturningVal
0 ignored issues
show
Documentation introduced by
Should the type for parameter $isAReturningVal not be false|type?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
171
     *
172
     * @throws \Exception
173
     *
174
     * @return $this
175
     */
176
    public function branch(YaEtl $flow, $isAReturningVal = false)
177
    {
178
        static $flowHashes;
179
        if (!isset($flowHashes)) {
180
            $flowHashes = [
181
                $this->objectHash($this) => 1,
182
            ];
183
        }
184
185
        $flowHash = $this->objectHash($flow);
186
        if (isset($flowHashes[$flowHash])) {
187
            throw new \Exception('[YaEtl] An instance of ' . \get_class($flow) . ' appears to be already in use in this flow. Please clone / re new before reuse');
188
        }
189
190
        $flowHashes[$flowHash] = 1;
191
192
        parent::add(new BranchNode($flow, $isAReturningVal));
193
        ++$this->stats['num_branch'];
194
195
        return $this;
196
    }
197
198
    /**
199
     * Triggered right after the flow stops
200
     *
201
     * @return $this
202
     */
203
    public function flowEnd()
204
    {
205
        $this->flush();
206
207
        parent::flowEnd();
208
209
        return $this;
210
    }
211
212
    /**
213
     * kiss method to expose basic stats
214
     *
215
     * @return $this
0 ignored issues
show
Documentation introduced by
Should the return type not be array<string,integer|double|string>?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
216
     */
217
    public function getStats()
218
    {
219
        $stats          = $this->processStats(parent::getstats());
220
        $stats['nodes'] = $this->getNodeStats();
221
222
        $this->collectNodeStats($stats);
223
224
        $stats['duration'] = $stats['end'] - $stats['start'];
225
        $stats             = \array_replace($stats, $this->duration($stats['duration']));
226
        $stats['report']   = \sprintf(
227
            '[YaEtl](%s) %s Extractor - %s Extract - %s Record (%s Iterations)
228
[YaEtl] %s Joiner - %s Join - %s Branch
229
[YaEtl] %s Transformer - %s Transform - %s Loader - %s Load - %s Flush
230
[YaEtl] Time : %s - Memory: %4.2fMiB',
231
            $this->flowStatus,
232
            \number_format($stats['num_extractor'], 0, '.', ' '),
233
            \number_format($stats['num_extract'], 0, '.', ' '),
234
            \number_format($stats['num_records'], 0, '.', ' '),
235
            \number_format($this->numIterate, 0, '.', ' '),
236
            \number_format($stats['num_joiner'], 0, '.', ' '),
237
            \number_format($stats['num_join'], 0, '.', ' '),
238
            \number_format($stats['num_branch'], 0, '.', ' '),
239
            \number_format($stats['num_transformer'], 0, '.', ' '),
240
            \number_format($stats['num_transform'], 0, '.', ' '),
241
            \number_format($stats['num_loader'], 0, '.', ' '),
242
            \number_format($stats['num_load'], 0, '.', ' '),
243
            \number_format($stats['num_flush'], 0, '.', ' '),
244
            $stats['durationStr'],
245
            $stats['mib']
246
        );
247
248
        return $stats;
249
    }
250
251
    /**
252
     * @param ExtractorInterface $extractor
253
     * @param ExtractorInterface $aggregateWith
254
     *
255
     * @throws \Exception
256
     *
257
     * @return $this
258
     */
259
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith)
260
    {
261
        // aggregate with target Node
262
        $nodeHash = $aggregateWith->getNodeHash();
263
        if (!isset($this->nodeMap[$nodeHash]) && !isset($this->reverseAggregateTable[$nodeHash])) {
264
            throw new \Exception('[YaEtl] Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
265
        }
266
267
        $aggregateWithIdx = isset($this->nodeMap[$nodeHash]) ? $this->nodeMap[$nodeHash]['index'] : $this->reverseAggregateTable[$nodeHash];
268
        if ($this->nodes[$aggregateWithIdx] instanceof AggregateNodeInterface) {
269
            $this->nodes[$aggregateWithIdx]->addTraversable($extractor);
270
            // aggregate node did take care of setting carrier and hash
271
            $this->reverseAggregateTable[$extractor->getNodeHash()] = $aggregateWithIdx;
272
        } else {
273
            $aggregateNode = new AggregateNode(true);
274
            $aggregateNode->addTraversable($this->nodes[$aggregateWithIdx])
275
                    ->addTraversable($extractor);
276
            // keep track of this extractor before we burry it in the aggregate
277
            $this->reverseAggregateTable[$this->nodes[$aggregateWithIdx]->getNodeHash()] = $aggregateWithIdx;
278
            // now replace its slot in the main tree
279
            $this->replace($aggregateWithIdx, $aggregateNode);
280
            // aggregate node did take care of setting carrier and hash
281
            $this->reverseAggregateTable[$aggregateNode->getNodeHash()]                  = $aggregateWithIdx;
282
            $this->reverseAggregateTable[$extractor->getNodeHash()]                      = $aggregateWithIdx;
283
        }
284
285
        return $this;
286
    }
287
288
    /**
289
     * @param array $stats
290
     *
291
     * @return $this
292
     */
293
    protected function collectNodeStats(array &$stats)
294
    {
295
        $stats = \array_replace($this->statsDefault, $stats);
296
        foreach ($this->nodes as $nodeIdx => $node) {
297
            if (($node instanceof JoinableInterface) && $node->getOnClause()) {
298
                $this->nodeStats[$nodeIdx]['num_join'] = $node->getNumRecords();
299
                $stats['num_join'] += $this->nodeStats[$nodeIdx]['num_join'];
300
            } elseif ($node instanceof ExtractorInterface) {
301
                $this->nodeStats[$nodeIdx]['num_records'] = $this->nodeStats[$nodeIdx]['num_iterate'];
302
                $this->nodeStats[$nodeIdx]['num_extract'] = $node->getNumExtract();
303
                $stats['num_records'] += $this->nodeStats[$nodeIdx]['num_iterate'];
304
                $stats['num_extract'] += $this->nodeStats[$nodeIdx]['num_extract'];
305
            } elseif ($node instanceof TransformerInterface) {
306
                $this->nodeStats[$nodeIdx]['num_transform'] = $this->nodeStats[$nodeIdx]['num_exec'];
307
                $stats['num_transform'] += $this->nodeStats[$nodeIdx]['num_transform'];
308
            } elseif ($node instanceof LoaderInterface) {
309
                $this->nodeStats[$nodeIdx]['num_load'] = $this->nodeStats[$nodeIdx]['num_exec'];
310
                $stats['num_load'] += $this->nodeStats[$nodeIdx]['num_load'];
311
            } elseif ($node instanceof AggregateNodeInterface) {
312
                $this->nodeStats[$nodeIdx]['num_records'] = $this->nodeStats[$nodeIdx]['num_iterate'];
313
                $stats['num_records'] += $this->nodeStats[$nodeIdx]['num_iterate'];
314
                $this->nodeStats[$nodeIdx]['num_extract'] = 0;
315
                foreach ($node->getNodeCollection() as $extractorNode) {
316
                    $this->nodeStats[$nodeIdx]['num_extract'] += $extractorNode->getNumExtract();
317
                }
318
319
                $stats['num_extract'] += $this->nodeStats[$nodeIdx]['num_extract'];
320
            }
321
        }
322
323
        return $this;
324
    }
325
326
    /**
327
     * Replaces a node with another one
328
     *
329
     * @param type          $nodeIdx
330
     * @param NodeInterface $node
331
     *
332
     * @throws \InvalidArgumentException
333
     *
334
     * @return $this
335
     */
336
    protected function replace($nodeIdx, NodeInterface $node)
337
    {
338
        if (!isset($this->nodes[$nodeIdx])) {
339
            throw new \InvalidArgumentException('Argument 1 should be a valid index in nodes, got:' . \gettype($nodeIdx));
340
        }
341
342
        unset($this->nodeMap[$this->nodeStats[$nodeIdx]['hash']], $this->nodeStats[$nodeIdx]);
343
        $nodeHash = $this->objectHash($node);
344
345
        $node->setCarrier($this)->setNodeHash($nodeHash);
346
347
        $this->nodes[$nodeIdx]       = $node;
348
        $this->nodeMap[$nodeHash]    = \array_replace($this->nodeMapDefault, [
349
            'class'    => \get_class($node),
350
            'branchId' => $this->flowId,
351
            'hash'     => $nodeHash,
352
            'index'    => $nodeIdx,
353
        ]);
354
355
        // register references to nodeStats to increment faster
356
        // nodeStats can also be used as reverse lookup table
357
        $this->nodeStats[$nodeIdx] = &$this->nodeMap[$nodeHash];
358
359
        return $this;
360
    }
361
362
    /**
363
     * @param array $stats
364
     *
365
     * @return array
366
     */
367
    protected function processStats($stats)
368
    {
369
        if (!empty($stats['nodes'])) {
370
            $stats['nodes'] = $this->processStats($stats['nodes']);
371
        }
372
373
        foreach ($stats['invocations'] as &$value) {
374
            $value           = \array_replace($value, $this->duration($value['duration']));
375
376
            $value['report'] = \sprintf('[YaEtl] Time : %s - Memory: %4.2fMiB',
377
                $value['durationStr'],
378
                $value['mib']
379
            );
380
        }
381
382
        return $stats;
383
    }
384
385
    /**
386
     * It could lead to really tricky situation if we where to
387
     * allow multiple instances of the same node. It's obviously
388
     * wrong with an Extractor, but even a Transformer could
389
     * create dark corner cases.
390
     *
391
     * @param NodeInterface $node
392
     *
393
     * @throws \Exception
394
     *
395
     * @return $this
396
     */
397
    protected function enforceNodeInstanceUnicity(NodeInterface $node)
398
    {
399
        if ($this->findNodeHashInMap($this->objectHash($node), $this->getNodeMap())) {
400
            throw new \Exception('[YaEtl] This instance of ' . \get_class($node) . ' appears to be already in use in this flow. Please deep clone / re new before reuse');
401
        }
402
403
        return $this;
404
    }
405
406
    /**
407
     * @param string $hash
408
     * @param array  $nodeMap
409
     *
410
     * @return bool
411
     */
412
    protected function findNodeHashInMap($hash, $nodeMap)
413
    {
414
        if (isset($nodeMap[$hash])) {
415
            return true;
416
        }
417
418
        foreach ($nodeMap as $mapData) {
419
            if (
420
                !empty($mapData['nodes']) &&
421
                $this->findNodeHashInMap($hash, $mapData['nodes'])
422
            ) {
423
                return true;
424
            }
425
        }
426
427
        return false;
428
    }
429
430
    /**
431
     * calls each WorkFlow's loaders flush method
432
     *
433
     * @return $this
434
     */
435
    protected function flush()
436
    {
437
        foreach ($this->nodes as $node) {
438
            if ($node instanceof LoaderInterface || \is_a($node, static::class)) {
439
                $node->flush($this->flowStatus);
440
                ++$this->stats['num_flush'];
441
            }
442
        }
443
444
        return $this;
445
    }
446
}
447