Passed
Pull Request — master (#1)
by Fabrice
03:00
created

YaEtl::qualify()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 1
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowStatusInterface;
13
use fab2s\NodalFlow\NodalFlow;
14
use fab2s\NodalFlow\NodalFlowException;
15
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
16
use fab2s\NodalFlow\Nodes\BranchNode;
17
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
18
use fab2s\NodalFlow\Nodes\NodeInterface;
19
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
20
use fab2s\NodalFlow\YaEtlException;
21
use fab2s\YaEtl\Extractors\AggregateExtractor;
22
use fab2s\YaEtl\Extractors\ExtractorInterface;
23
use fab2s\YaEtl\Extractors\JoinableInterface;
24
use fab2s\YaEtl\Extractors\OnClauseInterface;
25
use fab2s\YaEtl\Loaders\LoaderInterface;
26
use fab2s\YaEtl\Qualifiers\QualifierInterface;
27
use fab2s\YaEtl\Transformers\TransformerInterface;
28
29
/**
30
 * Class YaEtl
31
 */
32
class YaEtl extends NodalFlow
33
{
34
    /**
35
     * @var array
36
     */
37
    protected $flowIncrements = [
38
        'num_extract'     => 0,
39
        'num_extractor'   => 0,
40
        'num_join'        => 0,
41
        'num_joiner'      => 0,
42
        'num_merge'       => 0,
43
        'num_records'     => 'num_iterate',
44
        'num_transform'   => 0,
45
        'num_transformer' => 0,
46
        'num_qualifier'   => 0,
47
        'num_branch'      => 0,
48
        'num_load'        => 0,
49
        'num_loader'      => 0,
50
        'num_flush'       => 0,
51
    ];
52
53
    /**
54
     * The revers aggregate lookup table
55
     *
56
     * @var array
57
     */
58
    protected $reverseAggregateTable = [];
59
60
    /**
61
     * @var bool
62
     */
63
    protected $forceFlush = false;
64
65
    /**
66
     * Adds an extractor to the Flow which may be aggregated with another one
67
     *
68
     * @param ExtractorInterface      $extractor
69
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
70
     *
71
     * @throws YaEtlException
72
     * @throws NodalFlowException
73
     *
74
     * @return $this
75
     */
76
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null)
77
    {
78
        if ($aggregateWith !== null) {
79
            $this->aggregateTo($extractor, $aggregateWith);
80
        } else {
81
            parent::add($extractor);
82
            $this->flowMap->incrementFlow('num_extractor');
83
        }
84
85
        return $this;
86
    }
87
88
    /**
89
     * @param QualifierInterface $qualifier
90
     *
91
     * @throws NodalFlowException
92
     *
93
     * @return $this
94
     */
95
    public function qualify(QualifierInterface $qualifier)
96
    {
97
        parent::add($qualifier);
98
        $this->flowMap->incrementFlow('num_qualifier');
99
100
        return $this;
101
    }
102
103
    /**
104
     * Override NodalFlow's add method to prohibit its direct usage
105
     *
106
     * @param NodeInterface $node
107
     *
108
     * @throws YaEtlException
109
     */
110
    public function add(NodeInterface $node)
111
    {
112
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
113
    }
114
115
    /**
116
     * By default, branched flows will only see their
117
     * `flush()` method called when the top most parent
118
     * triggers its own `flush()`.
119
     * It make sense most of the time to to do so as
120
     * the most common use case for branches so far is
121
     * to deal with one record at a time without generating
122
     * records (even when left joining). In such case,
123
     * the `flush()` method really need to be called by the flow
124
     * exactly when the top most flow one is.
125
     *
126
     * Set to true if you are generating many records in a branch
127
     * and it makes sense to flush the branch more often
128
     * Also note that the branch will also be flushed at the end
129
     * of its top most parent flow.
130
     *
131
     * @param bool $forceFlush
132
     *
133
     * @return $this
134
     */
135
    public function forceFlush($forceFlush)
136
    {
137
        $this->forceFlush = (bool) $forceFlush;
138
139
        return $this;
140
    }
141
142
    /**
143
     * Adds a Joiner to a specific Extractor in the FLow
144
     *
145
     * @param JoinableInterface $extractor
146
     * @param JoinableInterface $joinFrom
147
     * @param OnClauseInterface $onClause
148
     *
149
     * @throws NodalFlowException
150
     *
151
     * @return $this
152
     */
153
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause)
154
    {
155
        $joinFrom->registerJoinerOnClause($onClause);
156
        $extractor->setJoinFrom($joinFrom);
157
        $extractor->setOnClause($onClause);
158
159
        parent::add($extractor);
160
        $this->flowMap->incrementFlow('num_joiner');
161
162
        return $this;
163
    }
164
165
    /**
166
     * Adds a Transformer to the Flow
167
     *
168
     * @param TransformerInterface $transformer
169
     *
170
     * @throws NodalFlowException
171
     *
172
     * @return $this
173
     */
174
    public function transform(TransformerInterface $transformer)
175
    {
176
        parent::add($transformer);
177
        $this->flowMap->incrementFlow('num_transformer');
178
179
        return $this;
180
    }
181
182
    /**
183
     * Adds a Loader to the Flow
184
     *
185
     * @param LoaderInterface $loader
186
     *
187
     * @throws NodalFlowException
188
     *
189
     * @return $this
190
     */
191
    public function to(LoaderInterface $loader)
192
    {
193
        parent::add($loader);
194
        $this->flowMap->incrementFlow('num_loader');
195
196
        return $this;
197
    }
198
199
    /**
200
     * Adds a Branch (Flow) to the Flow
201
     *
202
     * @param YaEtl $flow            The Branch to add in this Flow
203
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
204
     *                               a bag of Nodes to execute at this location of the Flow
205
     *
206
     * @throws NodalFlowException
207
     *
208
     * @return $this
209
     */
210
    public function branch(YaEtl $flow, $isAReturningVal = false)
211
    {
212
        parent::add(new BranchNode($flow, $isAReturningVal));
213
        $this->flowMap->incrementFlow('num_branch');
214
215
        return $this;
216
    }
217
218
    /**
219
     * Triggered right after the flow stops
220
     *
221
     * @return $this
222
     */
223
    public function flowEnd()
224
    {
225
        $this->flush();
226
227
        parent::flowEnd();
228
229
        return $this;
230
    }
231
232
    /**
233
     * KISS method to expose basic stats
234
     *
235
     * @return array<string,integer|string>
236
     */
237
    public function getStats()
238
    {
239
        $stats = parent::getstats();
240
241
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations)
242
[YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_BRANCH_TOTAL} Branch
243
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load - {NUM_FLUSH_TOTAL} Flush
244
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
245
246
        $vars = [];
247
        foreach ($this->flowIncrements as $key => $ignore) {
248
            $stats[$key . '_total'] += $stats[$key];
249
        }
250
251
        foreach ($stats as $varName => $value) {
252
            if (is_array($value)) {
253
                continue;
254
            }
255
256
            if (is_numeric($value)) {
257
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
258
                continue;
259
            }
260
261
            $vars['{' . strtoupper($varName) . '}'] = $value;
262
        }
263
264
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
265
266
        return $stats;
267
    }
268
269
    /**
270
     * Tells if the flow is set to force flush
271
     * Only used when branched (to tell the parent)
272
     *
273
     * @return bool
274
     */
275
    public function isForceFlush()
276
    {
277
        return !empty($this->forceFlush);
278
    }
279
280
    /**
281
     * Used internally to aggregate Extractors
282
     *
283
     * @param ExtractorInterface $extractor
284
     * @param ExtractorInterface $aggregateWith
285
     *
286
     * @throws YaEtlException
287
     * @throws NodalFlowException
288
     *
289
     * @return $this
290
     */
291
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith)
292
    {
293
        // aggregate with target Node
294
        $aggregateWithNodeId = $aggregateWith->getId();
295
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
296
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
297
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
298
        }
299
300
        /** @var TraversableNodeInterface $aggregateWithNode */
301
        $aggregateWithNode = $this->nodes[$aggregateWithIdx];
302
        if ($aggregateWithNode instanceof AggregateNodeInterface) {
303
            $aggregateWithNode->addTraversable($extractor);
304
            $this->reverseAggregateTable[$extractor->getId()] = $aggregateWithIdx;
305
306
            return $this;
307
        }
308
309
        $aggregateNode = new AggregateExtractor(true);
310
        // keep track of this extractor before we bury it in the aggregate
311
        $this->reverseAggregateTable[$aggregateWithNode->getId()] = $aggregateWithIdx;
312
        // now replace its slot in the main tree
313
        $this->replace($aggregateWithIdx, $aggregateNode);
314
        $aggregateNode->addTraversable($aggregateWithNode)
315
            ->addTraversable($extractor);
316
317
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
318
        $reg = &$this->registry->get($this->getId());
319
        --$reg['flowStats']['num_extractor'];
320
321
        // aggregate node did take care of setting carrier
322
        $this->reverseAggregateTable[$aggregateNode->getId()] = $aggregateWithIdx;
323
        $this->reverseAggregateTable[$extractor->getId()]     = $aggregateWithIdx;
324
325
        return $this;
326
    }
327
328
    /**
329
     * Calls each WorkFlow's loaders and branch flush method
330
     *
331
     * @param FlowStatusInterface|null $flowStatus
332
     *
333
     * @return $this
334
     */
335
    protected function flush(FlowStatusInterface $flowStatus = null)
336
    {
337
        if ($flowStatus === null) {
338
            if ($this->hasParent() && !$this->isForceFlush()) {
339
                // we'll get another chance at this
340
                return $this;
341
            }
342
343
            // use current status
344
            return $this->flushNodes($this->flowStatus);
345
        }
346
347
        // use parent's status
348
        return $this->flushNodes($flowStatus);
349
    }
350
351
    /**
352
     * Actually flush nodes
353
     *
354
     * @param FlowStatusInterface $flowStatus
355
     *
356
     * @return $this
357
     */
358
    protected function flushNodes(FlowStatusInterface $flowStatus)
359
    {
360
        foreach ($this->nodes as $node) {
361
            if ($node instanceof LoaderInterface) {
362
                $node->flush($flowStatus);
363
                $this->flowMap->incrementFlow('num_flush');
364
                continue;
365
            }
366
367
            // start with only flushing YaEtl and extends
368
            if ($node instanceof BranchNodeInterface) {
369
                $flow = $node->getPayload();
370
                if (\is_a($flow, static::class)
371
) {
372
                    /* @var YaEtl $flow */
373
                    $flow->flush($flowStatus);
374
                }
375
            }
376
        }
377
378
        return $this;
379
    }
380
}
381