Passed
Pull Request — master (#7)
by Fabrice
05:33 queued 02:26
created

YaEtl::forceFlush()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 5
rs 10
1
<?php
2
3
/*
4
 * This file is part of YaEtl
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowEventAbstract;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowStatusInterface;
15
use fab2s\NodalFlow\NodalFlow;
16
use fab2s\NodalFlow\NodalFlowException;
17
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
18
use fab2s\NodalFlow\Nodes\BranchNode;
19
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
20
use fab2s\NodalFlow\Nodes\NodeInterface;
21
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
22
use fab2s\NodalFlow\YaEtlException;
23
use fab2s\YaEtl\Events\YaEtlEvent;
24
use fab2s\YaEtl\Extractors\AggregateExtractor;
25
use fab2s\YaEtl\Extractors\ExtractorInterface;
26
use fab2s\YaEtl\Extractors\JoinableInterface;
27
use fab2s\YaEtl\Extractors\OnClauseInterface;
28
use fab2s\YaEtl\Loaders\LoaderInterface;
29
use fab2s\YaEtl\Qualifiers\QualifierInterface;
30
use fab2s\YaEtl\Transformers\TransformerInterface;
31
32
/**
33
 * Class YaEtl
34
 */
35
class YaEtl extends NodalFlow
36
{
37
    /**
38
     * @var array
39
     */
40
    protected $flowIncrements = [
41
        'num_extract'     => 0,
42
        'num_extractor'   => 0,
43
        'num_join'        => 0,
44
        'num_joiner'      => 0,
45
        'num_merge'       => 0,
46
        'num_records'     => 'num_iterate',
47
        'num_transform'   => 0,
48
        'num_transformer' => 0,
49
        'num_qualifier'   => 0,
50
        'num_qualify'     => 0,
51
        'num_branch'      => 0,
52
        'num_load'        => 0,
53
        'num_loader'      => 0,
54
        'num_flush'       => 0,
55
    ];
56
57
    /**
58
     * The revers aggregate lookup table
59
     *
60
     * @var array
61
     */
62
    protected $reverseAggregateTable = [];
63
64
    /**
65
     * @var bool
66
     */
67
    protected $forceFlush = false;
68
69
    /**
70
     * Adds an extractor to the Flow which may be aggregated with another one
71
     *
72
     * @param ExtractorInterface      $extractor
73
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
74
     *
75
     * @throws YaEtlException
76
     * @throws NodalFlowException
77
     *
78
     * @return static
79
     */
80
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null): self
81
    {
82
        if ($aggregateWith !== null) {
83
            $this->aggregateTo($extractor, $aggregateWith);
84
        } else {
85
            parent::add($extractor);
86
            $this->flowMap->incrementFlow('num_extractor');
87
        }
88
89
        return $this;
90
    }
91
92
    /**
93
     * @param QualifierInterface $qualifier
94
     *
95
     * @throws NodalFlowException
96
     *
97
     * @return static
98
     */
99
    public function qualify(QualifierInterface $qualifier): self
100
    {
101
        parent::add($qualifier);
102
        $this->flowMap->incrementFlow('num_qualifier');
103
104
        return $this;
105
    }
106
107
    /**
108
     * Override NodalFlow's add method to prohibit its direct usage
109
     *
110
     * @param NodeInterface $node
111
     *
112
     * @throws YaEtlException
113
     *
114
     * @return FlowInterface
115
     */
116
    public function add(NodeInterface $node): FlowInterface
117
    {
118
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
119
    }
120
121
    /**
122
     * By default, branched flows will only see their
123
     * `flush()` method called when the top most parent
124
     * triggers its own `flush()`.
125
     * It make sense most of the time to to do so as
126
     * the most common use case for branches so far is
127
     * to deal with one record at a time without generating
128
     * records (even when left joining). In such case,
129
     * the `flush()` method really need to be called by the flow
130
     * exactly when the top most flow one is.
131
     *
132
     * Set to true if you are generating many records in a branch
133
     * and it makes sense to flush the branch more often
134
     * Also note that the branch will also be flushed at the end
135
     * of its top most parent flow.
136
     *
137
     * @param bool $forceFlush
138
     *
139
     * @return static
140
     */
141
    public function forceFlush(bool $forceFlush): self
142
    {
143
        $this->forceFlush = $forceFlush;
144
145
        return $this;
146
    }
147
148
    /**
149
     * Adds a Joiner to a specific Extractor in the FLow
150
     *
151
     * @param JoinableInterface $extractor
152
     * @param JoinableInterface $joinFrom
153
     * @param OnClauseInterface $onClause
154
     *
155
     * @throws NodalFlowException
156
     *
157
     * @return static
158
     */
159
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause): self
160
    {
161
        $joinFrom->registerJoinerOnClause($onClause);
162
        $extractor->setJoinFrom($joinFrom);
163
        $extractor->setOnClause($onClause);
164
165
        parent::add($extractor);
166
        $this->flowMap->incrementFlow('num_joiner');
167
168
        return $this;
169
    }
170
171
    /**
172
     * Adds a Transformer to the Flow
173
     *
174
     * @param TransformerInterface $transformer
175
     *
176
     * @throws NodalFlowException
177
     *
178
     * @return static
179
     */
180
    public function transform(TransformerInterface $transformer): self
181
    {
182
        parent::add($transformer);
183
        $this->flowMap->incrementFlow('num_transformer');
184
185
        return $this;
186
    }
187
188
    /**
189
     * Adds a Loader to the Flow
190
     *
191
     * @param LoaderInterface $loader
192
     *
193
     * @throws NodalFlowException
194
     *
195
     * @return static
196
     */
197
    public function to(LoaderInterface $loader): self
198
    {
199
        parent::add($loader);
200
        $this->flowMap->incrementFlow('num_loader');
201
202
        return $this;
203
    }
204
205
    /**
206
     * Adds a Branch (Flow) to the Flow
207
     *
208
     * @param YaEtl $flow            The Branch to add in this Flow
209
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
210
     *                               a bag of Nodes to execute at this location of the Flow
211
     *
212
     * @throws NodalFlowException
213
     *
214
     * @return static
215
     */
216
    public function branch(self $flow, $isAReturningVal = false): self
217
    {
218
        parent::add(new BranchNode($flow, $isAReturningVal));
219
        $this->flowMap->incrementFlow('num_branch');
220
221
        return $this;
222
    }
223
224
    /**
225
     * Triggered right after the flow stops
226
     *
227
     * @return static
228
     */
229
    public function flowEnd(): NodalFlow
230
    {
231
        $this->flush();
232
233
        parent::flowEnd();
234
235
        return $this;
236
    }
237
238
    /**
239
     * KISS method to expose basic stats
240
     *
241
     * @return array<string,integer|string>
242
     */
243
    public function getStats(): array
244
    {
245
        $stats = parent::getstats();
246
247
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations)
248
[YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_QUALIFIER_TOTAL} Qualifier - {NUM_QUALIFY_TOTAL} Qualify
249
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load
250
[YaEtl] {NUM_BRANCH_TOTAL} Branch - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_FLUSH_TOTAL} Flush
251
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
252
253
        $vars = [];
254
        foreach ($this->flowIncrements as $key => $ignore) {
255
            $stats[$key . '_total'] += $stats[$key];
256
        }
257
258
        foreach ($stats as $varName => $value) {
259
            if (is_array($value)) {
260
                continue;
261
            }
262
263
            if (is_numeric($value)) {
264
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
265
                continue;
266
            }
267
268
            $vars['{' . strtoupper($varName) . '}'] = $value;
269
        }
270
271
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
272
273
        return $stats;
274
    }
275
276
    /**
277
     * Tells if the flow is set to force flush
278
     * Only used when branched (to tell the parent)
279
     *
280
     * @return bool
281
     */
282
    public function isForceFlush(): bool
283
    {
284
        return !empty($this->forceFlush);
285
    }
286
287
    /**
288
     * @param string $class
289
     *
290
     * @throws \ReflectionException
291
     *
292
     * @return static
293
     */
294
    protected function initDispatchArgs(string $class): FlowEventAbstract
295
    {
296
        parent::initDispatchArgs($class);
297
        $this->dispatchArgs[$this->eventInstanceKey] = new YaEtlEvent($this);
298
299
        return $this;
300
    }
301
302
    /**
303
     * Used internally to aggregate Extractors
304
     *
305
     * @param ExtractorInterface $extractor
306
     * @param ExtractorInterface $aggregateWith
307
     *
308
     * @throws YaEtlException
309
     * @throws NodalFlowException
310
     *
311
     * @return static
312
     */
313
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith): self
314
    {
315
        // aggregate with target Node
316
        $aggregateWithNodeId = $aggregateWith->getId();
317
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
318
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
319
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
320
        }
321
322
        /** @var TraversableNodeInterface $aggregateWithNode */
323
        $aggregateWithNode = $this->nodes[$aggregateWithIdx];
324
        if ($aggregateWithNode instanceof AggregateNodeInterface) {
325
            $aggregateWithNode->addTraversable($extractor);
326
            $this->reverseAggregateTable[$extractor->getId()] = $aggregateWithIdx;
327
328
            return $this;
329
        }
330
331
        $aggregateNode = new AggregateExtractor(true);
332
        // keep track of this extractor before we bury it in the aggregate
333
        $this->reverseAggregateTable[$aggregateWithNode->getId()] = $aggregateWithIdx;
334
        // now replace its slot in the main tree
335
        $this->replace($aggregateWithIdx, $aggregateNode);
0 ignored issues
show
Bug introduced by
It seems like $aggregateWithIdx can also be of type null; however, parameter $nodeIdx of fab2s\NodalFlow\NodalFlow::replace() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

335
        $this->replace(/** @scrutinizer ignore-type */ $aggregateWithIdx, $aggregateNode);
Loading history...
336
        $aggregateNode->addTraversable($aggregateWithNode)
337
            ->addTraversable($extractor);
338
339
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
340
        $reg = &$this->registry->get($this->getId());
341
        --$reg['flowStats']['num_extractor'];
342
343
        // aggregate node did take care of setting carrier
344
        $this->reverseAggregateTable[$aggregateNode->getId()] = $aggregateWithIdx;
345
        $this->reverseAggregateTable[$extractor->getId()]     = $aggregateWithIdx;
346
347
        return $this;
348
    }
349
350
    /**
351
     * Calls each WorkFlow's loaders and branch flush method
352
     *
353
     * @param FlowStatusInterface|null $flowStatus
354
     *
355
     * @return static
356
     */
357
    protected function flush(FlowStatusInterface $flowStatus = null): self
358
    {
359
        if ($flowStatus === null) {
360
            if ($this->hasParent() && !$this->isForceFlush()) {
361
                // we'll get another chance at this
362
                return $this;
363
            }
364
365
            // use current status
366
            return $this->flushNodes($this->flowStatus);
367
        }
368
369
        // use parent's status
370
        return $this->flushNodes($flowStatus);
371
    }
372
373
    /**
374
     * Actually flush nodes
375
     *
376
     * @param FlowStatusInterface $flowStatus
377
     *
378
     * @return static
379
     */
380
    protected function flushNodes(FlowStatusInterface $flowStatus): self
381
    {
382
        foreach ($this->nodes as $node) {
383
            if ($node instanceof LoaderInterface) {
384
                $node->flush($flowStatus);
385
                $this->flowMap->incrementFlow('num_flush');
386
                $this->triggerEvent(YaEtlEvent::FLOW_FLUSH, $node);
387
                continue;
388
            }
389
390
            // start with only flushing YaEtl and extends
391
            if ($node instanceof BranchNodeInterface) {
392
                $flow = $node->getPayload();
393
                if (is_a($flow, static::class)) {
394
                    /* @var YaEtl $flow */
395
                    $flow->flush($flowStatus);
396
                }
397
            }
398
        }
399
400
        return $this;
401
    }
402
}
403