Completed
Push — master ( 1bee2e...dd3854 )
by Fabrice
05:28
created

YaEtl::replace()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 25
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 25
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 14
nc 2
nop 2
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowStatusInterface;
13
use fab2s\NodalFlow\NodalFlow;
14
use fab2s\NodalFlow\NodalFlowException;
15
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
16
use fab2s\NodalFlow\Nodes\BranchNode;
17
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
18
use fab2s\NodalFlow\Nodes\NodeInterface;
19
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
20
use fab2s\NodalFlow\YaEtlException;
21
use fab2s\YaEtl\Extractors\AggregateExtractor;
22
use fab2s\YaEtl\Extractors\ExtractorInterface;
23
use fab2s\YaEtl\Extractors\JoinableInterface;
24
use fab2s\YaEtl\Extractors\OnClauseInterface;
25
use fab2s\YaEtl\Loaders\LoaderInterface;
26
use fab2s\YaEtl\Transformers\TransformerInterface;
27
28
/**
29
 * Class YaEtl
30
 */
31
class YaEtl extends NodalFlow
32
{
33
    /**
34
     * @var array
35
     */
36
    protected $flowIncrements = [
37
        'num_extract'     => 0,
38
        'num_extractor'   => 0,
39
        'num_join'        => 0,
40
        'num_joiner'      => 0,
41
        'num_merge'       => 0,
42
        'num_records'     => 'num_iterate',
43
        'num_transform'   => 0,
44
        'num_transformer' => 0,
45
        'num_branch'      => 0,
46
        'num_load'        => 0,
47
        'num_loader'      => 0,
48
        'num_flush'       => 0,
49
    ];
50
51
    /**
52
     * The revers aggregate lookup table
53
     *
54
     * @var array
55
     */
56
    protected $reverseAggregateTable = [];
57
58
    /**
59
     * @var bool
60
     */
61
    protected $forceFlush = false;
62
63
    /**
64
     * Adds an extractor to the Flow which may be aggregated with another one
65
     *
66
     * @param ExtractorInterface      $extractor
67
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
68
     *
69
     * @throws YaEtlException
70
     * @throws NodalFlowException
71
     *
72
     * @return $this
73
     */
74
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null)
75
    {
76
        if ($aggregateWith !== null) {
77
            $this->aggregateTo($extractor, $aggregateWith);
78
        } else {
79
            parent::add($extractor);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (add() instead of from()). Are you sure this is correct? If so, you might want to change this to $this->add().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
80
            $this->flowMap->incrementFlow('num_extractor');
81
        }
82
83
        return $this;
84
    }
85
86
    /**
87
     * Override NodalFlow's add method to prohibit its direct usage
88
     *
89
     * @param NodeInterface $node
90
     *
91
     * @throws YaEtlException
92
     */
93
    public function add(NodeInterface $node)
94
    {
95
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
96
    }
97
98
    /**
99
     * By default, branched flows will only see their
100
     * `flush()` method called when the top most parent
101
     * triggers its own `flush()`.
102
     * It make sense most of the time to to do so as
103
     * the most common use case for branches so far is
104
     * to deal with one record at a time without generating
105
     * records (even when left joining). In such case,
106
     * the `flush()` method really need to be called by the flow
107
     * exactly when the top most flow one is.
108
     *
109
     * Set to true if you are generating many records in a branch
110
     * and it makes sense to flush the branch more often
111
     * Also note that the branch will also be flushed at the end
112
     * of its top most parent flow.
113
     *
114
     * @param bool $forceFlush
115
     *
116
     * @return $this
117
     */
118
    public function forceFlush($forceFlush)
119
    {
120
        $this->forceFlush = (bool) $forceFlush;
121
122
        return $this;
123
    }
124
125
    /**
126
     * Adds a Joiner to a specific Extractor in the FLow
127
     *
128
     * @param JoinableInterface $extractor
129
     * @param JoinableInterface $joinFrom
130
     * @param OnClauseInterface $onClause
131
     *
132
     * @throws NodalFlowException
133
     *
134
     * @return $this
135
     */
136
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause)
137
    {
138
        $joinFrom->registerJoinerOnClause($onClause);
139
        $extractor->setJoinFrom($joinFrom);
140
        $extractor->setOnClause($onClause);
141
142
        parent::add($extractor);
143
        $this->flowMap->incrementFlow('num_joiner');
144
145
        return $this;
146
    }
147
148
    /**
149
     * Adds a Transformer to the Flow
150
     *
151
     * @param TransformerInterface $transformer
152
     *
153
     * @throws NodalFlowException
154
     *
155
     * @return $this
156
     */
157
    public function transform(TransformerInterface $transformer)
158
    {
159
        parent::add($transformer);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (add() instead of transform()). Are you sure this is correct? If so, you might want to change this to $this->add().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
160
        $this->flowMap->incrementFlow('num_transformer');
161
162
        return $this;
163
    }
164
165
    /**
166
     * Adds a Loader to the Flow
167
     *
168
     * @param LoaderInterface $loader
169
     *
170
     * @throws NodalFlowException
171
     *
172
     * @return $this
173
     */
174
    public function to(LoaderInterface $loader)
175
    {
176
        parent::add($loader);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (add() instead of to()). Are you sure this is correct? If so, you might want to change this to $this->add().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
177
        $this->flowMap->incrementFlow('num_loader');
178
179
        return $this;
180
    }
181
182
    /**
183
     * Adds a Branch (Flow) to the Flow
184
     *
185
     * @param YaEtl $flow            The Branch to add in this Flow
186
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
187
     *                               a bag of Nodes to execute at this location of the Flow
188
     *
189
     * @throws NodalFlowException
190
     *
191
     * @return $this
192
     */
193
    public function branch(YaEtl $flow, $isAReturningVal = false)
194
    {
195
        parent::add(new BranchNode($flow, $isAReturningVal));
196
        $this->flowMap->incrementFlow('num_branch');
197
198
        return $this;
199
    }
200
201
    /**
202
     * Triggered right after the flow stops
203
     *
204
     * @return $this
205
     */
206
    public function flowEnd()
207
    {
208
        $this->flush();
209
210
        parent::flowEnd();
211
212
        return $this;
213
    }
214
215
    /**
216
     * KISS method to expose basic stats
217
     *
218
     * @return array
0 ignored issues
show
Documentation introduced by
Consider making the return type a bit more specific; maybe use array<string,string>.

This check looks for the generic type array as a return type and suggests a more specific type. This type is inferred from the actual code.

Loading history...
219
     */
220
    public function getStats()
221
    {
222
        $stats = parent::getstats();
223
224
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations)
225
[YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_BRANCH_TOTAL} Branch
226
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load - {NUM_FLUSH_TOTAL} Flush
227
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
228
229
        $vars = [];
230
        foreach ($this->flowIncrements as $key => $ignore) {
231
            $stats[$key . '_total'] += $stats[$key];
232
        }
233
234
        foreach ($stats as $varName => $value) {
235
            if (is_array($value)) {
236
                continue;
237
            }
238
239
            if (is_numeric($value)) {
240
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
241
                continue;
242
            }
243
244
            $vars['{' . strtoupper($varName) . '}'] = $value;
245
        }
246
247
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
248
249
        return $stats;
250
    }
251
252
    /**
253
     * Tells if the flow is set to force flush
254
     * Only used when branched (to tell the parent)
255
     *
256
     * @return bool
257
     */
258
    public function isForceFlush()
259
    {
260
        return !empty($this->forceFlush);
261
    }
262
263
    /**
264
     * Used internally to aggregate Extractors
265
     *
266
     * @param ExtractorInterface $extractor
267
     * @param ExtractorInterface $aggregateWith
268
     *
269
     * @throws YaEtlException
270
     * @throws NodalFlowException
271
     *
272
     * @return $this
273
     */
274
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith)
275
    {
276
        // aggregate with target Node
277
        $aggregateWithNodeId = $aggregateWith->getId();
278
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
279
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
280
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
281
        }
282
283
        /** @var TraversableNodeInterface $aggregateWithNode */
284
        $aggregateWithNode = $this->nodes[$aggregateWithIdx];
285
        if ($aggregateWithNode instanceof AggregateNodeInterface) {
286
            $aggregateWithNode->addTraversable($extractor);
287
            $this->reverseAggregateTable[$extractor->getId()] = $aggregateWithIdx;
288
289
            return $this;
290
        }
291
292
        $aggregateNode = new AggregateExtractor(true);
293
        // keep track of this extractor before we bury it in the aggregate
294
        $this->reverseAggregateTable[$aggregateWithNode->getId()] = $aggregateWithIdx;
295
        // now replace its slot in the main tree
296
        $this->replace($aggregateWithIdx, $aggregateNode);
297
        $aggregateNode->addTraversable($aggregateWithNode)
298
            ->addTraversable($extractor);
299
300
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
301
        $reg = &$this->registry->get($this->getId());
302
        --$reg['flowStats']['num_extractor'];
303
304
        // aggregate node did take care of setting carrier
305
        $this->reverseAggregateTable[$aggregateNode->getId()] = $aggregateWithIdx;
306
        $this->reverseAggregateTable[$extractor->getId()]     = $aggregateWithIdx;
307
308
        return $this;
309
    }
310
311
    /**
312
     * Calls each WorkFlow's loaders and branch flush method
313
     *
314
     * @param FlowStatusInterface|null $flowStatus
315
     *
316
     * @return $this
317
     */
318
    protected function flush(FlowStatusInterface $flowStatus = null)
319
    {
320
        if ($flowStatus === null) {
321
            if ($this->hasParent() && !$this->isForceFlush()) {
322
                // we'll get another chance at this
323
                return $this;
324
            }
325
326
            // use current status
327
            return $this->flushNodes($this->flowStatus);
328
        }
329
330
        // use parent's status
331
        return $this->flushNodes($flowStatus);
332
    }
333
334
    /**
335
     * Actually flush nodes
336
     *
337
     * @param FlowStatusInterface $flowStatus
338
     *
339
     * @return $this
340
     */
341
    protected function flushNodes(FlowStatusInterface $flowStatus)
342
    {
343
        foreach ($this->nodes as $node) {
344
            if ($node instanceof LoaderInterface) {
345
                $node->flush($flowStatus);
346
                $this->flowMap->incrementFlow('num_flush');
347
                continue;
348
            }
349
350
            // start with only flushing YaEtl and extends
351
            if ($node instanceof BranchNodeInterface) {
352
                $flow = $node->getPayload();
353
                if (\is_a($flow, static::class)
354
) {
355
                    /* @var YaEtl $flow */
356
                    $flow->flush($flowStatus);
357
                }
358
            }
359
        }
360
361
        return $this;
362
    }
363
}
364