1 | <?php |
||
2 | |||
3 | /* |
||
4 | * This file is part of YaEtl |
||
5 | * (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl |
||
6 | * This source file is licensed under the MIT license which you will |
||
7 | * find in the LICENSE file or at https://opensource.org/licenses/MIT |
||
8 | */ |
||
9 | |||
10 | namespace fab2s\YaEtl; |
||
11 | |||
12 | use fab2s\NodalFlow\Flows\FlowEventAbstract; |
||
13 | use fab2s\NodalFlow\Flows\FlowInterface; |
||
14 | use fab2s\NodalFlow\Flows\FlowStatusInterface; |
||
15 | use fab2s\NodalFlow\NodalFlow; |
||
16 | use fab2s\NodalFlow\NodalFlowException; |
||
17 | use fab2s\NodalFlow\Nodes\AggregateNodeInterface; |
||
18 | use fab2s\NodalFlow\Nodes\BranchNode; |
||
19 | use fab2s\NodalFlow\Nodes\BranchNodeInterface; |
||
20 | use fab2s\NodalFlow\Nodes\NodeInterface; |
||
21 | use fab2s\NodalFlow\Nodes\TraversableNodeInterface; |
||
22 | use fab2s\YaEtl\Events\YaEtlEvent; |
||
23 | use fab2s\YaEtl\Extractors\AggregateExtractor; |
||
24 | use fab2s\YaEtl\Extractors\ExtractorInterface; |
||
25 | use fab2s\YaEtl\Extractors\JoinableInterface; |
||
26 | use fab2s\YaEtl\Extractors\OnClauseInterface; |
||
27 | use fab2s\YaEtl\Loaders\LoaderInterface; |
||
28 | use fab2s\YaEtl\Qualifiers\QualifierInterface; |
||
29 | use fab2s\YaEtl\Transformers\TransformerInterface; |
||
30 | |||
31 | /** |
||
32 | * Class YaEtl |
||
33 | */ |
||
34 | class YaEtl extends NodalFlow |
||
35 | { |
||
36 | /** |
||
37 | * @var array |
||
38 | */ |
||
39 | protected $flowIncrements = [ |
||
40 | 'num_extract' => 0, |
||
41 | 'num_extractor' => 0, |
||
42 | 'num_join' => 0, |
||
43 | 'num_joiner' => 0, |
||
44 | 'num_merge' => 0, |
||
45 | 'num_records' => 'num_iterate', |
||
46 | 'num_transform' => 0, |
||
47 | 'num_transformer' => 0, |
||
48 | 'num_qualifier' => 0, |
||
49 | 'num_qualify' => 0, |
||
50 | 'num_branch' => 0, |
||
51 | 'num_load' => 0, |
||
52 | 'num_loader' => 0, |
||
53 | 'num_flush' => 0, |
||
54 | ]; |
||
55 | |||
56 | /** |
||
57 | * The revers aggregate lookup table |
||
58 | * |
||
59 | * @var array |
||
60 | */ |
||
61 | protected $reverseAggregateTable = []; |
||
62 | |||
63 | /** |
||
64 | * @var bool |
||
65 | */ |
||
66 | protected $forceFlush = false; |
||
67 | |||
68 | /** |
||
69 | * Adds an extractor to the Flow which may be aggregated with another one |
||
70 | * |
||
71 | * @param ExtractorInterface $extractor |
||
72 | * @param null|ExtractorInterface $aggregateWith Use the extractor instance you want to aggregate with |
||
73 | * |
||
74 | * @throws YaEtlException |
||
75 | * @throws NodalFlowException |
||
76 | * |
||
77 | * @return static |
||
78 | */ |
||
79 | public function from(ExtractorInterface $extractor, ExtractorInterface $aggregateWith = null): self |
||
80 | { |
||
81 | if ($aggregateWith !== null) { |
||
82 | $this->aggregateTo($extractor, $aggregateWith); |
||
83 | } else { |
||
84 | parent::add($extractor); |
||
85 | $this->flowMap->incrementFlow('num_extractor'); |
||
86 | } |
||
87 | |||
88 | return $this; |
||
89 | } |
||
90 | |||
91 | /** |
||
92 | * @param QualifierInterface $qualifier |
||
93 | * |
||
94 | * @throws NodalFlowException |
||
95 | * |
||
96 | * @return static |
||
97 | */ |
||
98 | public function qualify(QualifierInterface $qualifier): self |
||
99 | { |
||
100 | parent::add($qualifier); |
||
101 | $this->flowMap->incrementFlow('num_qualifier'); |
||
102 | |||
103 | return $this; |
||
104 | } |
||
105 | |||
106 | /** |
||
107 | * Override NodalFlow's add method to prohibit its direct usage |
||
108 | * |
||
109 | * @param NodeInterface $node |
||
110 | * |
||
111 | * @throws YaEtlException |
||
112 | * |
||
113 | * @return FlowInterface |
||
114 | */ |
||
115 | public function add(NodeInterface $node): FlowInterface |
||
116 | { |
||
117 | throw new YaEtlException('add() is not directly available, use YaEtl grammar instead'); |
||
118 | } |
||
119 | |||
120 | /** |
||
121 | * By default, branched flows will only see their |
||
122 | * `flush()` method called when the top most parent |
||
123 | * triggers its own `flush()`. |
||
124 | * It make sense most of the time to to do so as |
||
125 | * the most common use case for branches so far is |
||
126 | * to deal with one record at a time without generating |
||
127 | * records (even when left joining). In such case, |
||
128 | * the `flush()` method really need to be called by the flow |
||
129 | * exactly when the top most flow one is. |
||
130 | * |
||
131 | * Set to true if you are generating many records in a branch |
||
132 | * and it makes sense to flush the branch more often |
||
133 | * Also note that the branch will also be flushed at the end |
||
134 | * of its top most parent flow. |
||
135 | * |
||
136 | * @param bool $forceFlush |
||
137 | * |
||
138 | * @return static |
||
139 | */ |
||
140 | public function forceFlush(bool $forceFlush): self |
||
141 | { |
||
142 | $this->forceFlush = $forceFlush; |
||
143 | |||
144 | return $this; |
||
145 | } |
||
146 | |||
147 | /** |
||
148 | * Adds a Joiner to a specific Extractor in the FLow |
||
149 | * |
||
150 | * @param JoinableInterface $extractor |
||
151 | * @param JoinableInterface $joinFrom |
||
152 | * @param OnClauseInterface $onClause |
||
153 | * |
||
154 | * @throws NodalFlowException |
||
155 | * |
||
156 | * @return static |
||
157 | */ |
||
158 | public function join(JoinableInterface $extractor, JoinableInterface $joinFrom, OnClauseInterface $onClause): self |
||
159 | { |
||
160 | $joinFrom->registerJoinerOnClause($onClause); |
||
161 | $extractor->setJoinFrom($joinFrom); |
||
162 | $extractor->setOnClause($onClause); |
||
163 | |||
164 | parent::add($extractor); |
||
165 | $this->flowMap->incrementFlow('num_joiner'); |
||
166 | |||
167 | return $this; |
||
168 | } |
||
169 | |||
170 | /** |
||
171 | * Adds a Transformer to the Flow |
||
172 | * |
||
173 | * @param TransformerInterface $transformer |
||
174 | * |
||
175 | * @throws NodalFlowException |
||
176 | * |
||
177 | * @return static |
||
178 | */ |
||
179 | public function transform(TransformerInterface $transformer): self |
||
180 | { |
||
181 | parent::add($transformer); |
||
182 | $this->flowMap->incrementFlow('num_transformer'); |
||
183 | |||
184 | return $this; |
||
185 | } |
||
186 | |||
187 | /** |
||
188 | * Adds a Loader to the Flow |
||
189 | * |
||
190 | * @param LoaderInterface $loader |
||
191 | * |
||
192 | * @throws NodalFlowException |
||
193 | * |
||
194 | * @return static |
||
195 | */ |
||
196 | public function to(LoaderInterface $loader): self |
||
197 | { |
||
198 | parent::add($loader); |
||
199 | $this->flowMap->incrementFlow('num_loader'); |
||
200 | |||
201 | return $this; |
||
202 | } |
||
203 | |||
204 | /** |
||
205 | * Adds a Branch (Flow) to the Flow |
||
206 | * |
||
207 | * @param YaEtl $flow The Branch to add in this Flow |
||
208 | * @param bool $isAReturningVal To indicate if this Branch Flow is a true Branch or just |
||
209 | * a bag of Nodes to execute at this location of the Flow |
||
210 | * |
||
211 | * @throws NodalFlowException |
||
212 | * |
||
213 | * @return static |
||
214 | */ |
||
215 | public function branch(self $flow, $isAReturningVal = false): self |
||
216 | { |
||
217 | parent::add(new BranchNode($flow, $isAReturningVal)); |
||
218 | $this->flowMap->incrementFlow('num_branch'); |
||
219 | |||
220 | return $this; |
||
221 | } |
||
222 | |||
223 | /** |
||
224 | * Triggered right after the flow stops |
||
225 | * |
||
226 | * @return static |
||
227 | */ |
||
228 | public function flowEnd(): NodalFlow |
||
229 | { |
||
230 | $this->flush(); |
||
231 | |||
232 | parent::flowEnd(); |
||
233 | |||
234 | return $this; |
||
235 | } |
||
236 | |||
237 | /** |
||
238 | * KISS method to expose basic stats |
||
239 | * |
||
240 | * @return array<string,integer|string> |
||
241 | */ |
||
242 | public function getStats(): array |
||
243 | { |
||
244 | $stats = parent::getstats(); |
||
245 | |||
246 | $tpl = '[YaEtl]({FLOW_STATUS}) {NUM_EXTRACTOR_TOTAL} Extractor - {NUM_EXTRACT_TOTAL} Extract - {NUM_RECORDS_TOTAL} Record ({NUM_ITERATE_TOTAL} Iterations) |
||
247 | [YaEtl] {NUM_JOINER_TOTAL} Joiner - {NUM_JOIN_TOTAL} Join - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_QUALIFIER_TOTAL} Qualifier - {NUM_QUALIFY_TOTAL} Qualify |
||
248 | [YaEtl] {NUM_TRANSFORMER_TOTAL} Transformer - {NUM_TRANSFORM_TOTAL} Transform - {NUM_LOADER_TOTAL} Loader - {NUM_LOAD_TOTAL} Load |
||
249 | [YaEtl] {NUM_BRANCH_TOTAL} Branch - {NUM_CONTINUE_TOTAL} Continue - {NUM_BREAK_TOTAL} Break - {NUM_FLUSH_TOTAL} Flush |
||
250 | [YaEtl] Time : {DURATION} - Memory: {MIB} MiB'; |
||
251 | |||
252 | $vars = []; |
||
253 | foreach ($this->flowIncrements as $key => $ignore) { |
||
254 | $stats[$key . '_total'] += $stats[$key]; |
||
255 | } |
||
256 | |||
257 | foreach ($stats as $varName => $value) { |
||
258 | if (is_array($value)) { |
||
259 | continue; |
||
260 | } |
||
261 | |||
262 | if (is_numeric($value)) { |
||
263 | $vars['{' . strtoupper($varName) . '}'] = \number_format($stats[$varName], is_int($value) ? 0 : 2, '.', ' '); |
||
264 | continue; |
||
265 | } |
||
266 | |||
267 | $vars['{' . strtoupper($varName) . '}'] = $value; |
||
268 | } |
||
269 | |||
270 | $stats['report'] = str_replace(array_keys($vars), array_values($vars), $tpl); |
||
271 | |||
272 | return $stats; |
||
273 | } |
||
274 | |||
275 | /** |
||
276 | * Tells if the flow is set to force flush |
||
277 | * Only used when branched (to tell the parent) |
||
278 | * |
||
279 | * @return bool |
||
280 | */ |
||
281 | public function isForceFlush(): bool |
||
282 | { |
||
283 | return !empty($this->forceFlush); |
||
284 | } |
||
285 | |||
286 | /** |
||
287 | * @param string $class |
||
288 | * |
||
289 | * @throws \ReflectionException |
||
290 | * |
||
291 | * @return static |
||
292 | */ |
||
293 | protected function initDispatchArgs(string $class): FlowEventAbstract |
||
294 | { |
||
295 | parent::initDispatchArgs($class); |
||
296 | $this->dispatchArgs[$this->eventInstanceKey] = new YaEtlEvent($this); |
||
297 | |||
298 | return $this; |
||
299 | } |
||
300 | |||
301 | /** |
||
302 | * Used internally to aggregate Extractors |
||
303 | * |
||
304 | * @param ExtractorInterface $extractor |
||
305 | * @param ExtractorInterface $aggregateWith |
||
306 | * |
||
307 | * @throws YaEtlException |
||
308 | * @throws NodalFlowException |
||
309 | * |
||
310 | * @return static |
||
311 | */ |
||
312 | protected function aggregateTo(ExtractorInterface $extractor, ExtractorInterface $aggregateWith): self |
||
313 | { |
||
314 | // aggregate with target Node |
||
315 | $aggregateWithNodeId = $aggregateWith->getId(); |
||
316 | $aggregateWithIdx = $this->flowMap->getNodeIndex($aggregateWithNodeId); |
||
317 | if ($aggregateWithIdx === null && !isset($this->reverseAggregateTable[$aggregateWithNodeId])) { |
||
318 | throw new YaEtlException('Cannot aggregate with orphaned Node:' . \get_class($aggregateWith)); |
||
319 | } |
||
320 | |||
321 | /* @var TraversableNodeInterface $aggregateWithNode */ |
||
322 | if (isset($this->reverseAggregateTable[$aggregateWithNodeId])) { |
||
323 | /** @var AggregateNodeInterface $aggregateWithNode */ |
||
324 | $aggregateWithNode = $this->reverseAggregateTable[$aggregateWithNodeId]; |
||
325 | $aggregateWithNode->addTraversable($extractor); |
||
326 | |||
327 | return $this; |
||
328 | } |
||
329 | |||
330 | $aggregateNode = new AggregateExtractor(true); |
||
331 | // keep track of this extractor before we bury it in the aggregate |
||
332 | $this->reverseAggregateTable[$aggregateWith->getId()] = $aggregateNode; |
||
333 | // now replace its slot in the main tree |
||
334 | $this->replace($aggregateWithIdx, $aggregateNode); |
||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
335 | $aggregateNode->addTraversable($aggregateWith) |
||
336 | ->addTraversable($extractor); |
||
337 | |||
338 | // adjust counters as we did remove the $aggregateWith Extractor from this flow |
||
339 | $reg = &$this->registry->get($this->getId()); |
||
340 | --$reg['flowStats']['num_extractor']; |
||
341 | |||
342 | return $this; |
||
343 | } |
||
344 | |||
345 | /** |
||
346 | * Calls each WorkFlow's loaders and branch flush method |
||
347 | * |
||
348 | * @param FlowStatusInterface|null $flowStatus |
||
349 | * |
||
350 | * @return static |
||
351 | */ |
||
352 | protected function flush(FlowStatusInterface $flowStatus = null): self |
||
353 | { |
||
354 | if ($flowStatus === null) { |
||
355 | if ($this->hasParent() && !$this->isForceFlush()) { |
||
356 | // we'll get another chance at this |
||
357 | return $this; |
||
358 | } |
||
359 | |||
360 | // use current status |
||
361 | return $this->flushNodes($this->flowStatus); |
||
362 | } |
||
363 | |||
364 | // use parent's status |
||
365 | return $this->flushNodes($flowStatus); |
||
366 | } |
||
367 | |||
368 | /** |
||
369 | * Actually flush nodes |
||
370 | * |
||
371 | * @param FlowStatusInterface $flowStatus |
||
372 | * |
||
373 | * @return static |
||
374 | */ |
||
375 | protected function flushNodes(FlowStatusInterface $flowStatus): self |
||
376 | { |
||
377 | foreach ($this->nodes as $node) { |
||
378 | if ($node instanceof LoaderInterface) { |
||
379 | $node->flush($flowStatus); |
||
380 | $this->flowMap->incrementFlow('num_flush'); |
||
381 | $this->triggerEvent(YaEtlEvent::FLOW_FLUSH, $node); |
||
382 | continue; |
||
383 | } |
||
384 | |||
385 | // start with only flushing YaEtl and extends |
||
386 | if ($node instanceof BranchNodeInterface) { |
||
387 | $flow = $node->getPayload(); |
||
388 | if (is_a($flow, static::class)) { |
||
389 | /* @var YaEtl $flow */ |
||
390 | $flow->flush($flowStatus); |
||
391 | } |
||
392 | } |
||
393 | } |
||
394 | |||
395 | return $this; |
||
396 | } |
||
397 | } |
||
398 |