Passed
Push — master ( 44183e...5f9ead )
by Fabrice
02:05
created

YaEtl::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl;
11
12
use fab2s\NodalFlow\Flows\FlowStatusInterface;
13
use fab2s\NodalFlow\NodalFlow;
14
use fab2s\NodalFlow\NodalFlowException;
15
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
16
use fab2s\NodalFlow\Nodes\BranchNode;
17
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
18
use fab2s\NodalFlow\Nodes\NodeInterface;
19
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
20
use fab2s\NodalFlow\YaEtlException;
21
use fab2s\YaEtl\Events\YaEtlEvent;
22
use fab2s\YaEtl\Extractors\AggregateExtractor;
23
use fab2s\YaEtl\Extractors\ExtractorInterface;
24
use fab2s\YaEtl\Extractors\JoinableInterface;
25
use fab2s\YaEtl\Extractors\OnClauseInterface;
26
use fab2s\YaEtl\Loaders\LoaderInterface;
27
use fab2s\YaEtl\Qualifiers\QualifierInterface;
28
use fab2s\YaEtl\Transformers\TransformerInterface;
29
30
/**
31
 * Class YaEtl
32
 */
33
class YaEtl extends NodalFlow
34
{
35
    /**
36
     * @var array
37
     */
38
    protected $flowIncrements = [
39
        'num_extract'     => 0,
40
        'num_extractor'   => 0,
41
        'num_join'        => 0,
42
        'num_joiner'      => 0,
43
        'num_merge'       => 0,
44
        'num_records'     => 'num_iterate',
45
        'num_transform'   => 0,
46
        'num_transformer' => 0,
47
        'num_qualifier'   => 0,
48
        'num_qualify'     => 0,
49
        'num_branch'      => 0,
50
        'num_load'        => 0,
51
        'num_loader'      => 0,
52
        'num_flush'       => 0,
53
    ];
54
55
    /**
56
     * The revers aggregate lookup table
57
     *
58
     * @var array
59
     */
60
    protected $reverseAggregateTable = [];
61
62
    /**
63
     * @var bool
64
     */
65
    protected $forceFlush = false;
66
67
    /**
68
     * YaEtl constructor.
69
     *
70
     * @throws NodalFlowException
71
     */
72
    public function __construct()
73
    {
74
        parent::__construct();
75
        $this->sharedEvent = new YaEtlEvent($this);
76
    }
77
78
    /**
79
     * Adds an extractor to the Flow which may be aggregated with another one
80
     *
81
     * @param ExtractorInterface      $extractor
82
     * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with
83
     *
84
     * @throws YaEtlException
85
     * @throws NodalFlowException
86
     *
87
     * @return $this
88
     */
89
    public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null)
90
    {
91
        if ($aggregateWith !== null) {
92
            $this->aggregateTo($extractor, $aggregateWith);
93
        } else {
94
            parent::add($extractor);
95
            $this->flowMap->incrementFlow('num_extractor');
96
        }
97
98
        return $this;
99
    }
100
101
    /**
102
     * @param QualifierInterface $qualifier
103
     *
104
     * @throws NodalFlowException
105
     *
106
     * @return $this
107
     */
108
    public function qualify(QualifierInterface $qualifier)
109
    {
110
        parent::add($qualifier);
111
        $this->flowMap->incrementFlow('num_qualifier');
112
113
        return $this;
114
    }
115
116
    /**
117
     * Override NodalFlow's add method to prohibit its direct usage
118
     *
119
     * @param NodeInterface $node
120
     *
121
     * @throws YaEtlException
122
     */
123
    public function add(NodeInterface $node)
124
    {
125
        throw new YaEtlException('add() is not directly available, use YaEtl grammar instead');
126
    }
127
128
    /**
129
     * By default, branched flows will only see their
130
     * `flush()` method called when the top most parent
131
     * triggers its own `flush()`.
132
     * It make sense most of the time to to do so as
133
     * the most common use case for branches so far is
134
     * to deal with one record at a time without generating
135
     * records (even when left joining). In such case,
136
     * the `flush()` method really need to be called by the flow
137
     * exactly when the top most flow one is.
138
     *
139
     * Set to true if you are generating many records in a branch
140
     * and it makes sense to flush the branch more often
141
     * Also note that the branch will also be flushed at the end
142
     * of its top most parent flow.
143
     *
144
     * @param bool $forceFlush
145
     *
146
     * @return $this
147
     */
148
    public function forceFlush($forceFlush)
149
    {
150
        $this->forceFlush = (bool) $forceFlush;
151
152
        return $this;
153
    }
154
155
    /**
156
     * Adds a Joiner to a specific Extractor in the FLow
157
     *
158
     * @param JoinableInterface $extractor
159
     * @param JoinableInterface $joinFrom
160
     * @param OnClauseInterface $onClause
161
     *
162
     * @throws NodalFlowException
163
     *
164
     * @return $this
165
     */
166
    public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause)
167
    {
168
        $joinFrom->registerJoinerOnClause($onClause);
169
        $extractor->setJoinFrom($joinFrom);
170
        $extractor->setOnClause($onClause);
171
172
        parent::add($extractor);
173
        $this->flowMap->incrementFlow('num_joiner');
174
175
        return $this;
176
    }
177
178
    /**
179
     * Adds a Transformer to the Flow
180
     *
181
     * @param TransformerInterface $transformer
182
     *
183
     * @throws NodalFlowException
184
     *
185
     * @return $this
186
     */
187
    public function transform(TransformerInterface $transformer)
188
    {
189
        parent::add($transformer);
190
        $this->flowMap->incrementFlow('num_transformer');
191
192
        return $this;
193
    }
194
195
    /**
196
     * Adds a Loader to the Flow
197
     *
198
     * @param LoaderInterface $loader
199
     *
200
     * @throws NodalFlowException
201
     *
202
     * @return $this
203
     */
204
    public function to(LoaderInterface $loader)
205
    {
206
        parent::add($loader);
207
        $this->flowMap->incrementFlow('num_loader');
208
209
        return $this;
210
    }
211
212
    /**
213
     * Adds a Branch (Flow) to the Flow
214
     *
215
     * @param YaEtl $flow            The Branch to add in this Flow
216
     * @param bool  $isAReturningVal To indicate if this Branch Flow is a true Branch or just
217
     *                               a bag of Nodes to execute at this location of the Flow
218
     *
219
     * @throws NodalFlowException
220
     *
221
     * @return $this
222
     */
223
    public function branch(YaEtl $flow, $isAReturningVal = false)
224
    {
225
        parent::add(new BranchNode($flow, $isAReturningVal));
226
        $this->flowMap->incrementFlow('num_branch');
227
228
        return $this;
229
    }
230
231
    /**
232
     * Triggered right after the flow stops
233
     *
234
     * @return $this
235
     */
236
    public function flowEnd()
237
    {
238
        $this->flush();
239
240
        parent::flowEnd();
241
242
        return $this;
243
    }
244
245
    /**
246
     * KISS method to expose basic stats
247
     *
248
     * @return array<string,integer|string>
249
     */
250
    public function getStats()
251
    {
252
        $stats = parent::getstats();
253
254
        $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations)
255
[YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_QUALIFIER_TOTAL} Qualifier - {NUM_QUALIFY_TOTAL} Qualify
256
[YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load
257
[YaEtl] {NUM_BRANCH_TOTAL} Branch - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_FLUSH_TOTAL} Flush
258
[YaEtl] Time : {DURATION} - Memory: {MIB} MiB';
259
260
        $vars = [];
261
        foreach ($this->flowIncrements as $key => $ignore) {
262
            $stats[$key . '_total'] += $stats[$key];
263
        }
264
265
        foreach ($stats as $varName => $value) {
266
            if (is_array($value)) {
267
                continue;
268
            }
269
270
            if (is_numeric($value)) {
271
                $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' ');
272
                continue;
273
            }
274
275
            $vars['{' . strtoupper($varName) . '}'] = $value;
276
        }
277
278
        $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl);
279
280
        return $stats;
281
    }
282
283
    /**
284
     * Tells if the flow is set to force flush
285
     * Only used when branched (to tell the parent)
286
     *
287
     * @return bool
288
     */
289
    public function isForceFlush()
290
    {
291
        return !empty($this->forceFlush);
292
    }
293
294
    /**
295
     * Used internally to aggregate Extractors
296
     *
297
     * @param ExtractorInterface $extractor
298
     * @param ExtractorInterface $aggregateWith
299
     *
300
     * @throws YaEtlException
301
     * @throws NodalFlowException
302
     *
303
     * @return $this
304
     */
305
    protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith)
306
    {
307
        // aggregate with target Node
308
        $aggregateWithNodeId = $aggregateWith->getId();
309
        $aggregateWithIdx    = $this->flowMap->getNodeIndex($aggregateWithNodeId);
310
        if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) {
311
            throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith));
312
        }
313
314
        /** @var TraversableNodeInterface $aggregateWithNode */
315
        $aggregateWithNode = $this->nodes[$aggregateWithIdx];
316
        if ($aggregateWithNode instanceof AggregateNodeInterface) {
317
            $aggregateWithNode->addTraversable($extractor);
318
            $this->reverseAggregateTable[$extractor->getId()] = $aggregateWithIdx;
319
320
            return $this;
321
        }
322
323
        $aggregateNode = new AggregateExtractor(true);
324
        // keep track of this extractor before we bury it in the aggregate
325
        $this->reverseAggregateTable[$aggregateWithNode->getId()] = $aggregateWithIdx;
326
        // now replace its slot in the main tree
327
        $this->replace($aggregateWithIdx, $aggregateNode);
328
        $aggregateNode->addTraversable($aggregateWithNode)
329
            ->addTraversable($extractor);
330
331
        // adjust counters as we did remove the $aggregateWith Extractor from this flow
332
        $reg = &$this->registry->get($this->getId());
333
        --$reg['flowStats']['num_extractor'];
334
335
        // aggregate node did take care of setting carrier
336
        $this->reverseAggregateTable[$aggregateNode->getId()] = $aggregateWithIdx;
337
        $this->reverseAggregateTable[$extractor->getId()]     = $aggregateWithIdx;
338
339
        return $this;
340
    }
341
342
    /**
343
     * Calls each WorkFlow's loaders and branch flush method
344
     *
345
     * @param FlowStatusInterface|null $flowStatus
346
     *
347
     * @return $this
348
     */
349
    protected function flush(FlowStatusInterface $flowStatus = null)
350
    {
351
        if ($flowStatus === null) {
352
            if ($this->hasParent() && !$this->isForceFlush()) {
353
                // we'll get another chance at this
354
                return $this;
355
            }
356
357
            // use current status
358
            return $this->flushNodes($this->flowStatus);
359
        }
360
361
        // use parent's status
362
        return $this->flushNodes($flowStatus);
363
    }
364
365
    /**
366
     * Actually flush nodes
367
     *
368
     * @param FlowStatusInterface $flowStatus
369
     *
370
     * @return $this
371
     */
372
    protected function flushNodes(FlowStatusInterface $flowStatus)
373
    {
374
        foreach ($this->nodes as $node) {
375
            if ($node instanceof LoaderInterface) {
376
                $node->flush($flowStatus);
377
                $this->flowMap->incrementFlow('num_flush');
378
                $this->triggerEvent(YaEtlEvent::FLOW_FLUSH, $node);
379
                continue;
380
            }
381
382
            // start with only flushing YaEtl and extends
383
            if ($node instanceof BranchNodeInterface) {
384
                $flow = $node->getPayload();
385
                if (\is_a($flow, static::class)
386
) {
387
                    /* @var YaEtl $flow */
388
                    $flow->flush($flowStatus);
389
                }
390
            }
391
        }
392
393
        return $this;
394
    }
395
}
396