Passed
Push — master ( 5d141f...eaa040 )
by Fabrice
01:54
created

NodalFlow::getNodeStats()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 0
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use fab2s\NodalFlow\Callbacks\CallbackInterface;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowStatus;
15
use fab2s\NodalFlow\Flows\FlowStatusInterface;
16
use fab2s\NodalFlow\Nodes\AggregateNodeInterface;
17
use fab2s\NodalFlow\Nodes\BranchNode;
18
use fab2s\NodalFlow\Nodes\NodeInterface;
19
20
/**
21
 * Class NodalFlow
22
 */
23
class NodalFlow implements FlowInterface
24
{
25
    /**
26
     * Flow steps triggering callbacks
27
     */
28
    const FLOW_START    = 'start';
29
    const FLOW_PROGRESS = 'progress';
30
    const FLOW_SUCCESS  = 'success';
31
    const FLOW_FAIL     = 'fail';
32
33
    /**
34
     * The parent Flow, only set when branched
35
     *
36
     * @var FlowInterface
37
     */
38
    public $parent;
39
40
    /**
41
     * This Flow id
42
     *
43
     * @var string
44
     */
45
    protected $flowId;
46
47
    /**
48
     * The underlying node structure
49
     *
50
     * @var array
51
     */
52
    protected $nodes = [];
53
54
    /**
55
     * The current Node index
56
     *
57
     * @var int
58
     */
59
    protected $nodeIdx = 0;
60
61
    /**
62
     * The last index value
63
     *
64
     * @var int
65
     */
66
    protected $lastIdx = 0;
67
68
    /**
69
     * The number of Node in this Flow
70
     *
71
     * @var int
72
     */
73
    protected $nodeCount = 0;
74
75
    /**
76
     * The number of iteration within this Flow
77
     *
78
     * @var int
79
     */
80
    protected $numIterate = 0;
81
82
    /**
83
     * The current registered Callback class if any
84
     *
85
     * @var CallbackInterface|null
86
     */
87
    protected $callBack;
88
89
    /**
90
     * Progress modulo to apply
91
     * Set to x if you want to trigger
92
     * progress every x iterations in flow
93
     *
94
     * @var int
95
     */
96
    protected $progressMod = 1024;
97
98
    /**
99
     * The default Node Map values
100
     *
101
     * @var array
102
     */
103
    protected $nodeMapDefault = [
104
        'class'        => null,
105
        'branchId'     => null,
106
        'hash'         => null,
107
        'index'        => 0,
108
        'num_exec'     => 0,
109
        'num_iterate'  => 0,
110
    ];
111
112
    /**
113
     * The default Node stats values
114
     *
115
     * @var array
116
     */
117
    protected $nodeStatsDefault = [
118
        'num_exec'     => 0,
119
        'num_iterate'  => 0,
120
    ];
121
122
    /**
123
     * Node stats values
124
     *
125
     * @var array
126
     */
127
    protected $nodeStats = [];
128
129
    /**
130
     * The object map, used to enforce object unicity within the Flow
131
     *
132
     * @var array
133
     */
134
    protected $objectMap = [];
135
136
    /**
137
     * The Node Map
138
     *
139
     * @var array
140
     */
141
    protected $nodeMap = [];
142
143
    /**
144
     * The Flow stats default values
145
     *
146
     * @var array
147
     */
148
    protected $statsDefault = [
149
        'start'    => null,
150
        'end'      => null,
151
        'duration' => null,
152
        'mib'      => null,
153
    ];
154
155
    /**
156
     * The Flow Stats
157
     *
158
     * @var array
159
     */
160
    protected $stats = [
161
        'invocations' => [],
162
    ];
163
164
    /**
165
     * Number of exec calls in thhis Flow
166
     *
167
     * @var int
168
     */
169
    protected $numExec = 0;
170
171
    /**
172
     * Continue flag
173
     *
174
     * @var bool
175
     */
176
    protected $continue = false;
177
178
    /**
179
     * Break Flag
180
     *
181
     * @var bool
182
     */
183
    protected $break = false;
184
185
    /**
186
     * Current Flow Status
187
     *
188
     * @var FlowStatusInterface
189
     */
190
    protected $flowStatus;
191
192
    /**
193
     * Current nonce
194
     *
195
     * @var int
196
     */
197
    private static $nonce = 0;
198
199
    /**
200
     * Instantiate a Flow
201
     */
202
    public function __construct()
203
    {
204
        $this->flowId = $this->uniqId();
205
        $this->stats += $this->statsDefault;
206
    }
207
208
    /**
209
     * Adds a Node to the flow
210
     *
211
     * @param NodeInterface $node
212
     *
213
     * @return $this
214
     */
215
    public function add(NodeInterface $node)
216
    {
217
        $nodeHash = $this->objectHash($node);
218
219
        if ($node instanceof BranchNode) {
220
            // this node is a branch, set it's parent
221
            $node->getPayload()->setParent($this);
1 ignored issue
show
Bug introduced by
The method setParent cannot be called on $node->getPayload() (of type callable).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
222
        }
223
224
        $node->setCarrier($this)->setNodeHash($nodeHash);
225
226
        $this->nodes[$this->nodeIdx] = $node;
227
        $this->nodeMap[$nodeHash]    = \array_replace($this->nodeMapDefault, [
228
            'class'    => \get_class($node),
229
            'branchId' => $this->flowId,
230
            'hash'     => $nodeHash,
231
            'index'    => $this->nodeIdx,
232
        ]);
233
234
        // register references to nodeStats to increment faster
235
        // nodeStats can also be used as reverse lookup table
236
        $this->nodeStats[$this->nodeIdx] = &$this->nodeMap[$nodeHash];
237
238
        ++$this->nodeIdx;
239
240
        return $this;
241
    }
242
243
    /**
244
     * Adds a Payload Node to the Flow
245
     *
246
     * @param callable $payload
247
     * @param mixed    $isAReturningVal
248
     * @param mixed    $isATraversable
249
     *
250
     * @return $this
251
     */
252
    public function addPayload(callable $payload, $isAReturningVal, $isATraversable = false)
253
    {
254
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
255
256
        $this->add($node);
257
258
        return $this;
259
    }
260
261
    /**
262
     * Register callback class
263
     *
264
     * @param CallbackInterface $callBack
265
     *
266
     * @return $this
267
     */
268
    public function setCallBack(CallbackInterface $callBack)
269
    {
270
        $this->callBack = $callBack;
271
272
        return $this;
273
    }
274
275
    /**
276
     * Set parent Flow, happens only when branched
277
     *
278
     * @param FlowInterface $flow
279
     *
280
     * @return $this
281
     */
282
    public function setParent(FlowInterface $flow)
283
    {
284
        $this->parent = $flow;
285
286
        return $this;
287
    }
288
289
    /**
290
     * Get eventual parent Flow
291
     *
292
     * @return FlowInterface
293
     */
294
    public function getParent()
295
    {
296
        return $this->parent;
297
    }
298
299
    /**
300
     * Tells if this flow has a parent
301
     *
302
     * @return bool
303
     */
304
    public function hasParent()
305
    {
306
        return !empty($this->parent);
307
    }
308
309
    /**
310
     * Generates a truely unique id for the Flow context
311
     *
312
     * @return string
313
     */
314
    public function uniqId()
315
    {
316
        // while we're at it, drop any doubt about
317
        // colliding from here
318
        return \sha1(uniqid() . $this->getNonce());
319
    }
320
321
    /**
322
     * Generate a friendly (read humanly distinguishable) object hash
323
     *
324
     * @param object $object
325
     *
326
     * @return string
327
     */
328
    public function objectHash($object)
329
    {
330
        return \sha1(\spl_object_hash($object));
331
    }
332
333
    /**
334
     * Execute the flow
335
     *
336
     * @param null|mixed $param The eventual init argument to the first node
337
     *                          or, in case of a branch, the last relevant
338
     *                          argument from upstream Flow
339
     *
340
     * @return mixed the last result of the
341
     *               last returning value node
342
     */
343
    public function exec($param = null)
344
    {
345
        try {
346
            $result = $this->rewind()
347
                    ->flowStart()
348
                    ->recurse($param);
349
            // set flowStatus to make sure that we have the proper
350
            // value in flowEnd even when overridden without (or when
351
            // improperly) calling parent
352
            if ($this->flowStatus->isRunning()) {
353
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
354
            }
355
356
            $this->flowEnd();
357
358
            return $result;
359
        } catch (\Exception $e) {
360
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION);
361
            $this->flowEnd();
362
            if ($e instanceof NodalFlowException) {
363
                throw $e;
364
            }
365
366
            throw new NodalFlowException('Flow execution failed', 0, $e, [
367
                'nodeMap' => $this->getNodeMap(),
368
            ]);
369
        }
370
    }
371
372
    /**
373
     * Computes a human readable duration string from floating seconds
374
     *
375
     * @param float $seconds
376
     *
377
     * @return array
0 ignored issues
show
Documentation introduced by
Consider making the return type a bit more specific; maybe use array<string,integer|string>.

This check looks for the generic type array as a return type and suggests a more specific type. This type is inferred from the actual code.

Loading history...
378
     */
379
    public function duration($seconds)
380
    {
381
        $result = [
382
            'hour'     => (int) \floor($seconds / 3600),
383
            'min'      => (int) \floor(($seconds / 60) % 60),
384
            'sec'      => $seconds % 60,
385
            'ms'       => (int) \round(\fmod($seconds, 1) * 1000),
386
        ];
387
388
        $durationStr = '';
389
        foreach ($result as $unit => $value) {
390
            if (!empty($value)) {
391
                $durationStr .= $value . "$unit ";
392
            }
393
        }
394
395
        $result['durationStr'] = \trim($durationStr);
396
397
        return $result;
398
    }
399
400
    /**
401
     * Resets Nodes stats, used prior to Flow's re-exec
402
     *
403
     * @return $this
404
     */
405
    public function resetNodeStats()
406
    {
407
        foreach ($this->nodeStats as &$nodeStat) {
408
            $nodeStat = \array_replace($nodeStat, $this->nodeStatsDefault);
409
        }
410
411
        return $this;
412
    }
413
414
    /**
415
     * Get the stats array with latest Node stats
416
     *
417
     * @return array
418
     */
419
    public function getStats()
420
    {
421
        foreach ($this->nodes as $node) {
422
            if (\is_a($node, BranchNode::class)) {
423
                $this->stats['branches'][$node->getPayload()->getFlowId()] = $node->getPayload()->getStats();
424
            }
425
        }
426
427
        return $this->stats;
428
    }
429
430
    /**
431
     * Return the Flow id as set during instantiation
432
     *
433
     * @return string
434
     */
435
    public function getFlowId()
436
    {
437
        return $this->flowId;
438
    }
439
440
    /**
441
     * Get the Node array
442
     *
443
     * @return array
444
     */
445
    public function getNodes()
446
    {
447
        return $this->nodes;
448
    }
449
450
    /**
451
     * Generate Node Map
452
     *
453
     * @return array
454
     */
455
    public function getNodeMap()
456
    {
457
        foreach ($this->nodes as $node) {
458
            if (\is_a($node, BranchNode::class)) {
459
                $this->nodeMap[$node->getNodeHash()]['nodes'] = $node->getPayload()->getNodeMap();
460
                continue;
461
            }
462
463
            if ($node instanceof AggregateNodeInterface) {
464
                foreach ($node->getNodeCollection() as $aggregatedNode) {
465
                    $this->nodeMap[$node->getNodeHash()]['nodes'][$aggregatedNode->getNodeHash()] = [
466
                        'class' => \get_class($aggregatedNode),
467
                        'hash'  => $aggregatedNode->getNodeHash(),
468
                    ];
469
                }
470
                continue;
471
            }
472
        }
473
474
        return $this->nodeMap;
475
    }
476
477
    /**
478
     * Get the Node stats
479
     *
480
     * @return array
481
     */
482
    public function getNodeStats()
483
    {
484
        foreach ($this->nodes as $nodeIdx => $node) {
485
            if (\is_a($node, BranchNode::class)) {
486
                $this->nodeStats[$nodeIdx]['nodes'] = $node->getPayload()->getNodeStats();
487
            }
488
        }
489
490
        return $this->nodeStats;
491
    }
492
493
    /**
494
     * Rewinds the Flow
495
     *
496
     * @return $this
497
     */
498
    public function rewind()
499
    {
500
        $this->nodeCount = count($this->nodes);
501
        $this->lastIdx   = $this->nodeCount - 1;
502
        $this->nodeIdx   = 0;
503
504
        return $this;
505
    }
506
507
    /**
508
     * Define the progress modulo, Progress Callback will be
509
     * triggered upon each iteration in the flow modulo $progressMod
510
     *
511
     * @param int $progressMod
512
     *
513
     * @return $this
514
     */
515
    public function setProgressMod($progressMod)
516
    {
517
        $this->progressMod = max(1, (int) $progressMod);
518
519
        return $this;
520
    }
521
522
    /**
523
     * Get current $progressMod
524
     *
525
     * @return int
526
     */
527
    public function getProgressMod()
528
    {
529
        return $this->progressMod;
530
    }
531
532
    /**
533
     * The Flow status can either indicate be:
534
     *      - clean (isClean()): everything went well
535
     *      - dirty (isDirty()): one Node broke the flow
536
     *      - exception (isException()): an exception was raised during the flow
537
     *
538
     * @return FlowStatusInterface
539
     */
540
    public function getFlowStatus()
541
    {
542
        return $this->flowStatus;
543
    }
544
545
    /**
546
     * Break the flow's execution, conceptuially similar to breaking
547
     * a regular loop
548
     *
549
     * @return $this
550
     */
551
    public function breakFlow()
552
    {
553
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_DIRTY);
554
555
        $this->break = true;
556
557
        return $this;
558
    }
559
560
    /**
561
     * Continue the flow's execution, conceptuially similar to continuing
562
     * a regular loop
563
     *
564
     * @return $this
565
     */
566
    public function continueFlow()
567
    {
568
        $this->continue = true;
569
570
        return $this;
571
    }
572
573
    /**
574
     * Triggered just before the flow starts
575
     *
576
     * @return $this
577
     */
578
    protected function flowStart()
579
    {
580
        ++$this->numExec;
581
        $this->triggerCallback(static::FLOW_START);
582
        $this->stats['start']                                = \microtime(true);
583
        $this->stats['invocations'][$this->numExec]['start'] = $this->stats['start'];
584
        // flow is started
585
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
586
587
        return $this;
588
    }
589
590
    /**
591
     * Triggered right after the flow stops
592
     *
593
     * @return $this
594
     */
595
    protected function flowEnd()
596
    {
597
        $this->stats['end']                                     = \microtime(true);
598
        $this->stats['invocations'][$this->numExec]['end']      = $this->stats['end'];
599
        $this->stats['duration']                                = $this->stats['end'] - $this->stats['start'];
600
        $this->stats['invocations'][$this->numExec]['duration'] = $this->stats['duration'];
601
        $this->stats['mib']                                     = \memory_get_peak_usage(true) / 1048576;
602
        $this->stats['invocations'][$this->numExec]['mib']      = $this->stats['mib'];
603
604
        $this->triggerCallback($this->flowStatus->isException() ? static::FLOW_FAIL : static::FLOW_SUCCESS);
605
606
        return $this;
607
    }
608
609
    /**
610
     * Return a simple nonce, fully valid within each flow
611
     *
612
     * @return int
613
     */
614
    protected function getNonce()
615
    {
616
        return self::$nonce++;
617
    }
618
619
    /**
620
     * Recurse over flows and nodes which may
621
     * as well be Traversable ...
622
     * Welcome to the abysses of recursion ^^
623
     *
624
     * @param mixed $param
625
     * @param int   $nodeIdx
626
     *
627
     * @return mixed the last value returned by the last
628
     *               returning value Node in the flow
629
     */
630
    protected function recurse($param = null, $nodeIdx = 0)
631
    {
632
        // the while construct here saves as many recursion depth
633
        // as there are exec nodes in the flow
634
        while ($nodeIdx <= $this->lastIdx) {
635
            $node      = $this->nodes[$nodeIdx];
636
            $nodeStat  = &$this->nodeStats[$nodeIdx];
637
            $returnVal = $node->isReturningVal();
638
639
            if ($node->isTraversable()) {
640
                foreach ($node->getTraversable($param) as $value) {
641
                    if ($returnVal) {
642
                        // pass current $value as next param
643
                        // else keep last $param
644
                        $param = $value;
645
                    }
646
647
                    ++$nodeStat['num_iterate'];
648
                    ++$this->numIterate;
649
                    if (!($this->numIterate % $this->progressMod)) {
650
                        $this->triggerCallback(static::FLOW_PROGRESS, $node);
651
                    }
652
653
                    // here this means that if a deeper child does return something
654
                    // its result will buble up to the first node as param in case
655
                    // one of the previous node is a Traversable
656
                    // It's of course up to each node to decide what to do with the
657
                    // input param.
658
                    $param = $this->recurse($param, $nodeIdx + 1);
659
                    if ($this->continue) {
660
                        // we drop one iteration
661
                        // could be because there is no matching join record from somewhere
662
                        $this->continue = false;
663
                        continue;
664
                    }
665
666
                    if ($this->break) {
667
                        // we drop all subsequent iterations
668
                        break;
669
                    }
670
                }
671
672
                // we reached the end of the flow
673
                ++$nodeStat['num_exec'];
674
675
                return $param;
676
            }
677
678
            $value = $node->exec($param);
679
            ++$nodeStat['num_exec'];
680
681
            if ($this->continue || $this->break) {
682
                return $param;
683
            }
684
685
            if ($returnVal) {
686
                // pass current $value as next param
687
                $param = $value;
688
            }
689
690
            ++$nodeIdx;
691
        }
692
693
        // we reached the end of the flow
694
        return $param;
695
    }
696
697
    /**
698
     * KISS helper to trigger Callback slots
699
     *
700
     * @param string             $which
701
     * @param null|NodeInterface $node
702
     *
703
     * @return $this
704
     */
705
    protected function triggerCallback($which, NodeInterface $node = null)
706
    {
707
        if (null !== $this->callBack) {
708
            $this->callBack->$which($this, $node);
709
        }
710
711
        return $this;
712
    }
713
}
714