Completed
Pull Request — master (#1)
by Fabrice
04:13
created

NodalFlow::getId()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use fab2s\NodalFlow\Callbacks\CallbackInterface;
13
use fab2s\NodalFlow\Flows\FlowIdTrait;
14
use fab2s\NodalFlow\Flows\FlowInterface;
15
use fab2s\NodalFlow\Flows\FlowMap;
16
use fab2s\NodalFlow\Flows\FlowMapInterface;
17
use fab2s\NodalFlow\Flows\FlowStatus;
18
use fab2s\NodalFlow\Flows\FlowStatusInterface;
19
use fab2s\NodalFlow\Flows\InterrupterInterface;
20
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
21
use fab2s\NodalFlow\Nodes\ExecNodeInterface;
22
use fab2s\NodalFlow\Nodes\NodeInterface;
23
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
24
25
/**
26
 * Class NodalFlow
27
 */
28
class NodalFlow implements FlowInterface
29
{
30
    use FlowIdTrait;
31
32
    /**
33
     * Flow steps triggering callbacks
34
     */
35
    const FLOW_START    = 'start';
36
    const FLOW_PROGRESS = 'progress';
37
    const FLOW_SUCCESS  = 'success';
38
    const FLOW_FAIL     = 'fail';
39
40
    /**
41
     * The parent Flow, only set when branched
42
     *
43
     * @var FlowInterface
44
     */
45
    public $parent;
46
47
    /**
48
     * @var array
49
     */
50
    protected $flowIncrements = [];
51
52
    /**
53
     * The underlying node structure
54
     *
55
     * @var NodeInterface[]
56
     */
57
    protected $nodes = [];
58
59
    /**
60
     * The current Node index
61
     *
62
     * @var int
63
     */
64
    protected $nodeIdx = 0;
65
66
    /**
67
     * The last index value
68
     *
69
     * @var int
70
     */
71
    protected $lastIdx = 0;
72
73
    /**
74
     * The number of Node in this Flow
75
     *
76
     * @var int
77
     */
78
    protected $nodeCount = 0;
79
80
    /**
81
     * The number of iteration within this Flow
82
     *
83
     * @var int
84
     */
85
    protected $numIterate = 0;
86
87
    /**
88
     * The current registered Callback class if any
89
     *
90
     * @var CallbackInterface|null
91
     */
92
    protected $callBack;
93
94
    /**
95
     * Progress modulo to apply
96
     * Set to x if you want to trigger
97
     * progress every x iterations in flow
98
     *
99
     * @var int
100
     */
101
    protected $progressMod = 1024;
102
103
    /**
104
     * Continue flag
105
     *
106
     * @var bool
107
     */
108
    protected $continue = false;
109
110
    /**
111
     * Break Flag
112
     *
113
     * @var bool
114
     */
115
    protected $break = false;
116
117
    /**
118
     * Current Flow Status
119
     *
120
     * @var FlowStatusInterface
121
     */
122
    protected $flowStatus;
123
124
    /**
125
     * @var FlowMapInterface
126
     */
127
    protected $flowMap;
128
129
    /**
130
     * @var string|bool
131
     */
132
    protected $interruptNodeId;
133
134
    /**
135
     * Instantiate a Flow
136
     */
137
    public function __construct()
138
    {
139
        $this->flowMap = new FlowMap($this, $this->flowIncrements);
140
    }
141
142
    /**
143
     * Adds a Node to the flow
144
     *
145
     * @param NodeInterface $node
146
     *
147
     * @throws NodalFlowException
148
     *
149
     * @return $this
150
     */
151
    public function add(NodeInterface $node)
152
    {
153
        if ($node instanceof BranchNodeInterface) {
154
            // this node is a branch, set it's parent
155
            $node->getPayload()->setParent($this);
156
        }
157
158
        $node->setCarrier($this);
159
160
        $this->flowMap->register($node, $this->nodeIdx);
161
        $this->nodes[$this->nodeIdx] = $node;
162
163
        ++$this->nodeIdx;
164
165
        return $this;
166
    }
167
168
    /**
169
     * Adds a Payload Node to the Flow
170
     *
171
     * @param callable $payload
172
     * @param mixed    $isAReturningVal
173
     * @param mixed    $isATraversable
174
     *
175
     * @return $this
176
     */
177
    public function addPayload(callable $payload, $isAReturningVal, $isATraversable = false)
178
    {
179
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
180
181
        $this->add($node);
182
183
        return $this;
184
    }
185
186
    /**
187
     * Register callback class
188
     *
189
     * @param CallbackInterface $callBack
190
     *
191
     * @return $this
192
     */
193
    public function setCallBack(CallbackInterface $callBack)
194
    {
195
        $this->callBack = $callBack;
196
197
        return $this;
198
    }
199
200
    /**
201
     * Used to set the eventual Node Target of an Interrupt signal
202
     * set to :
203
     * - A node hash to target
204
     * - true to interrupt every upstream nodes
205
     *     in this Flow
206
     * - false to only interrupt up to the first
207
     *     upstream Traversable in this Flow
208
     *
209
     * @param string|bool $interruptNodeId
210
     *
211
     * @return $this
212
     */
213
    public function setInterruptNodeId($interruptNodeId)
214
    {
215
        $this->interruptNodeId = $interruptNodeId;
216
217
        return $this;
218
    }
219
220
    /**
221
     * Set parent Flow, happens only when branched
222
     *
223
     * @param FlowInterface $flow
224
     *
225
     * @return $this
226
     */
227
    public function setParent(FlowInterface $flow)
228
    {
229
        $this->parent = $flow;
230
231
        return $this;
232
    }
233
234
    /**
235
     * Get eventual parent Flow
236
     *
237
     * @return FlowInterface
238
     */
239
    public function getParent()
240
    {
241
        return $this->parent;
242
    }
243
244
    /**
245
     * Tells if this flow has a parent
246
     *
247
     * @return bool
248
     */
249
    public function hasParent()
250
    {
251
        return !empty($this->parent);
252
    }
253
254
    /**
255
     * Execute the flow
256
     *
257
     * @param null|mixed $param The eventual init argument to the first node
258
     *                          or, in case of a branch, the last relevant
259
     *                          argument from upstream Flow
260
     *
261
     * @throws NodalFlowException
262
     *
263
     * @return mixed the last result of the
264
     *               last returning value node
265
     */
266
    public function exec($param = null)
267
    {
268
        try {
269
            $result = $this->rewind()
270
                    ->flowStart()
271
                    ->recurse($param);
272
273
            // set flowStatus to make sure that we have the proper
274
            // value in flowEnd even when overridden without (or when
275
            // improperly) calling parent
276
            if ($this->flowStatus->isRunning()) {
277
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
278
            }
279
280
            $this->flowEnd();
281
282
            return $result;
283
        } catch (\Exception $e) {
284
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION);
285
            $this->flowEnd();
286
            if ($e instanceof NodalFlowException) {
287
                throw $e;
288
            }
289
290
            throw new NodalFlowException('Flow execution failed', 0, $e, [
291
                'nodeMap' => $this->getNodeMap(),
292
            ]);
293
        }
294
    }
295
296
    /**
297
     * Get the stats array with latest Node stats
298
     *
299
     * @return array
300
     */
301
    public function getStats()
302
    {
303
        return $this->flowMap->getStats();
304
    }
305
306
    /**
307
     * Get the stats array with latest Node stats
308
     *
309
     * @return FlowMapInterface
310
     */
311
    public function getFlowMap()
312
    {
313
        return $this->flowMap;
314
    }
315
316
    /**
317
     * getId() alias for backward compatibility
318
     *
319
     * @deprecated use `getId` instead
320
     *
321
     * @return string
322
     */
323
    public function getFlowId()
324
    {
325
        return $this->getId();
326
    }
327
328
    /**
329
     * Get the Node array
330
     *
331
     * @return NodeInterface[]
332
     */
333
    public function getNodes()
334
    {
335
        return $this->nodes;
336
    }
337
338
    /**
339
     * Get/Generate Node Map
340
     *
341
     * @return array
342
     */
343
    public function getNodeMap()
344
    {
345
        return $this->flowMap->getNodeMap();
346
    }
347
348
    /**
349
     * Rewinds the Flow
350
     *
351
     * @return $this
352
     */
353
    public function rewind()
354
    {
355
        $this->nodeCount       = count($this->nodes);
356
        $this->lastIdx         = $this->nodeCount - 1;
357
        $this->break           = false;
358
        $this->continue        = false;
359
        $this->interruptNodeId = null;
360
361
        return $this;
362
    }
363
364
    /**
365
     * Define the progress modulo, Progress Callback will be
366
     * triggered upon each iteration in the flow modulo $progressMod
367
     *
368
     * @param int $progressMod
369
     *
370
     * @return $this
371
     */
372
    public function setProgressMod($progressMod)
373
    {
374
        $this->progressMod = max(1, (int) $progressMod);
375
376
        return $this;
377
    }
378
379
    /**
380
     * Get current $progressMod
381
     *
382
     * @return int
383
     */
384
    public function getProgressMod()
385
    {
386
        return $this->progressMod;
387
    }
388
389
    /**
390
     * The Flow status can either indicate be:
391
     *      - clean (isClean()): everything went well
392
     *      - dirty (isDirty()): one Node broke the flow
393
     *      - exception (isException()): an exception was raised during the flow
394
     *
395
     * @return FlowStatusInterface
396
     */
397
    public function getFlowStatus()
398
    {
399
        return $this->flowStatus;
400
    }
401
402
    /**
403
     * Break the flow's execution, conceptually similar to breaking
404
     * a regular loop
405
     *
406
     * @param InterrupterInterface|null $flowInterrupt
407
     *
408
     * @return $this
409
     */
410
    public function breakFlow(InterrupterInterface $flowInterrupt = null)
411
    {
412
        return $this->interruptFlow(InterrupterInterface::TYPE_BREAK, $flowInterrupt);
413
    }
414
415
    /**
416
     * Continue the flow's execution, conceptually similar to continuing
417
     * a regular loop
418
     *
419
     * @param InterrupterInterface|null $flowInterrupt
420
     *
421
     * @return $this
422
     */
423
    public function continueFlow(InterrupterInterface $flowInterrupt = null)
424
    {
425
        return $this->interruptFlow(InterrupterInterface::TYPE_CONTINUE, $flowInterrupt);
426
    }
427
428
    /**
429
     * @param string                    $interruptType
430
     * @param InterrupterInterface|null $flowInterrupt
431
     *
432
     * @throws NodalFlowException
433
     *
434
     * @return $this
435
     */
436
    public function interruptFlow($interruptType, InterrupterInterface $flowInterrupt = null)
437
    {
438
        switch ($interruptType) {
439
            case InterrupterInterface::TYPE_CONTINUE:
440
                $this->continue = true;
441
                $this->flowMap->incrementFlow('num_continue');
442
                break;
443
            case InterrupterInterface::TYPE_BREAK:
444
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_DIRTY);
445
                $this->break      = true;
446
                $this->flowMap->incrementFlow('num_break');
447
                break;
448
            default:
449
                throw new NodalFlowException('FlowInterrupt Type missing');
450
        }
451
452
        if ($flowInterrupt) {
453
            $flowInterrupt->setType($interruptType)->propagate($this);
454
        }
455
456
        return $this;
457
    }
458
459
    /**
460
     * @param NodeInterface $node
461
     *
462
     * @return bool
463
     */
464
    protected function interruptNode(NodeInterface $node)
465
    {
466
        // if we have an interruptNodeId, bubble up until we match a node
467
        // else stop propagation
468
        return $this->interruptNodeId ? $this->interruptNodeId !== $node->getId() : false;
469
    }
470
471
    /**
472
     * Triggered just before the flow starts
473
     *
474
     * @return $this
475
     */
476
    protected function flowStart()
477
    {
478
        $this->flowMap->incrementFlow('num_exec')->flowStart();
479
        $this->triggerCallback(static::FLOW_START);
480
481
        // flow is started
482
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
483
484
        return $this;
485
    }
486
487
    /**
488
     * Triggered right after the flow stops
489
     *
490
     * @return $this
491
     */
492
    protected function flowEnd()
493
    {
494
        $this->flowMap->flowEnd();
495
496
        $this->triggerCallback($this->flowStatus->isException() ? static::FLOW_FAIL : static::FLOW_SUCCESS);
497
498
        return $this;
499
    }
500
501
    /**
502
     * Recurse over nodes which may as well be Flows and
503
     * Traversable ...
504
     * Welcome to the abysses of recursion or iter-recursion ^^
505
     *
506
     * `recurse` perform kind of an hybrid recursion as the
507
     * Flow is effectively iterating and recurring over its
508
     * Nodes, which may as well be seen as over itself
509
     *
510
     * Iterating tends to limit the amount of recursion levels:
511
     * recursion is only triggered when executing a Traversable
512
     * Node's downstream Nodes while every consecutive exec
513
     * Nodes are executed within a while loop.
514
     * And recursion keeps the size of the recursion context
515
     * to a minimum as pretty much everything is done by the
516
     * iterating instance
517
     *
518
     * @param mixed $param
519
     * @param int   $nodeIdx
520
     *
521
     * @return mixed the last value returned by the last
522
     *               returning value Node in the flow
523
     */
524
    protected function recurse($param = null, $nodeIdx = 0)
525
    {
526
        while ($nodeIdx <= $this->lastIdx) {
527
            $node      = $this->nodes[$nodeIdx];
528
            $nodeStats = &$this->flowMap->getNodeStat($node->getId());
529
530
            $returnVal = $node->isReturningVal();
531
532
            if ($node->isTraversable()) {
533
                /** @var TraversableNodeInterface $node */
534
                foreach ($node->getTraversable($param) as $value) {
535
                    if ($returnVal) {
536
                        // pass current $value as next param
537
                        $param = $value;
538
                    }
539
540
                    ++$nodeStats['num_iterate'];
541
                    ++$this->numIterate;
542
                    if (!($this->numIterate % $this->progressMod)) {
543
                        $this->triggerCallback(static::FLOW_PROGRESS, $node);
544
                    }
545
546
                    $param = $this->recurse($param, $nodeIdx + 1);
547
                    if ($this->continue) {
548
                        if ($this->continue = $this->interruptNode($node)) {
549
                            // since we want to bubble the continue upstream
550
                            // we break here waiting for next $param if any
551
                            ++$nodeStats['num_break'];
552
                            break;
553
                        }
554
555
                        // we drop one iteration
556
                        ++$nodeStats['num_continue'];
557
                        continue;
558
                    }
559
560
                    if ($this->break) {
561
                        // we drop all subsequent iterations
562
                        ++$nodeStats['num_break'];
563
                        $this->break = $this->interruptNode($node);
564
                        break;
565
                    }
566
                }
567
568
                // we reached the end of this Traversable and executed all its downstream Nodes
569
                ++$nodeStats['num_exec'];
570
571
                return $param;
572
            }
573
574
            /** @var ExecNodeInterface $node */
575
            $value = $node->exec($param);
576
            ++$nodeStats['num_exec'];
577
578
            if ($this->continue) {
579
                ++$nodeStats['num_continue'];
580
                // a continue does not need to bubble up unless
581
                // it specifically targets a node in this flow
582
                // or targets an upstream flow
583
                $this->continue = $this->interruptNode($node);
584
585
                return $param;
586
            }
587
588
            if ($this->break) {
589
                ++$nodeStats['num_break'];
590
                // a break always need to bubble up to the first upstream Traversable if any
591
                return $param;
592
            }
593
594
            if ($returnVal) {
595
                // pass current $value as next param
596
                $param = $value;
597
            }
598
599
            ++$nodeIdx;
600
        }
601
602
        // we reached the end of this recursion
603
        return $param;
604
    }
605
606
    /**
607
     * KISS helper to trigger Callback slots
608
     *
609
     * @param string             $which
610
     * @param null|NodeInterface $node
611
     *
612
     * @return $this
613
     */
614
    protected function triggerCallback($which, NodeInterface $node = null)
615
    {
616
        if (null !== $this->callBack) {
617
            $this->callBack->$which($this, $node);
618
        }
619
620
        return $this;
621
    }
622
}
623