NodalFlow   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 337
Duplicated Lines 0 %

Importance

Changes 30
Bugs 4 Features 4
Metric Value
eloc 108
c 30
b 4
f 4
dl 0
loc 337
rs 9.84
wmc 32

11 Methods

Rating   Name   Duplication   Size   Complexity  
A addPayload() 0 7 1
A replace() 0 14 2
A add() 0 17 2
A __construct() 0 4 1
A sendTo() 0 13 3
C recurse() 0 79 12
A exec() 0 22 3
A flowEnd() 0 15 2
A rewind() 0 9 1
A flowStart() 0 10 1
A branchFlowCheck() 0 12 4
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use Exception;
13
use fab2s\NodalFlow\Events\FlowEventInterface;
14
use fab2s\NodalFlow\Flows\FlowAbstract;
15
use fab2s\NodalFlow\Flows\FlowInterface;
16
use fab2s\NodalFlow\Flows\FlowMap;
17
use fab2s\NodalFlow\Flows\FlowRegistry;
18
use fab2s\NodalFlow\Flows\FlowStatus;
19
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
20
use fab2s\NodalFlow\Nodes\ExecNodeInterface;
21
use fab2s\NodalFlow\Nodes\NodeInterface;
22
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
23
24
/**
25
 * Class NodalFlow
26
 */
27
class NodalFlow extends FlowAbstract
28
{
29
    /**
30
     * @var array
31
     */
32
    protected $flowIncrements = [];
33
34
    /**
35
     * The number of Node in this Flow
36
     *
37
     * @var int
38
     */
39
    protected $nodeCount = 0;
40
41
    /**
42
     * Instantiate a Flow
43
     *
44
     * @throws NodalFlowException
45
     */
46
    public function __construct()
47
    {
48
        $this->flowMap     = new FlowMap($this, $this->flowIncrements);
49
        $this->registry    = new FlowRegistry;
50
    }
51
52
    /**
53
     * Adds a Node to the flow
54
     *
55
     * @param NodeInterface $node
56
     *
57
     * @throws NodalFlowException
58
     *
59
     * @return $this
60
     */
61
    public function add(NodeInterface $node): FlowInterface
62
    {
63
        if ($node instanceof BranchNodeInterface) {
64
            // this node is a branch
65
            $childFlow = $node->getPayload();
66
            $this->branchFlowCheck($childFlow);
67
            $childFlow->setParent($this);
68
        }
69
70
        $node->setCarrier($this);
71
72
        $this->flowMap->register($node, $this->nodeIdx);
73
        $this->nodes[$this->nodeIdx] = $node;
74
75
        ++$this->nodeIdx;
76
77
        return $this;
78
    }
79
80
    /**
81
     * Adds a Payload Node to the Flow
82
     *
83
     * @param callable $payload
84
     * @param mixed    $isAReturningVal
85
     * @param mixed    $isATraversable
86
     *
87
     * @throws NodalFlowException
88
     *
89
     * @return $this
90
     */
91
    public function addPayload(callable $payload, bool $isAReturningVal, bool $isATraversable = false): FlowInterface
92
    {
93
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
94
95
        $this->add($node);
96
97
        return $this;
98
    }
99
100
    /**
101
     * Replaces a node with another one
102
     *
103
     * @param int           $nodeIdx
104
     * @param NodeInterface $node
105
     *
106
     * @throws NodalFlowException
107
     *
108
     * @return static
109
     */
110
    public function replace(int $nodeIdx, NodeInterface $node): FlowInterface
111
    {
112
        if (!isset($this->nodes[$nodeIdx])) {
113
            throw new NodalFlowException('Argument 1 should be a valid index in nodes', 1, null, [
114
                'nodeIdx' => $nodeIdx,
115
                'node'    => get_class($node),
116
            ]);
117
        }
118
119
        $node->setCarrier($this);
120
        $this->nodes[$nodeIdx] = $node;
121
        $this->flowMap->register($node, $nodeIdx, true);
122
123
        return $this;
124
    }
125
126
    /**
127
     * @param string|null $nodeId
128
     * @param mixed|null  $param
129
     *
130
     * @throws Exception
131
     * @throws NodalFlowException
132
     *
133
     * @return mixed
134
     */
135
    public function sendTo(string $nodeId = null, $param = null)
136
    {
137
        $nodeIndex = 0;
138
        if ($nodeId !== null) {
139
            if (!($nodeIndex = $this->flowMap->getNodeIndex($nodeId))) {
140
                throw new NodalFlowException('Cannot sendTo without valid Node target', 1, null, [
141
                    'flowId' => $this->getId(),
142
                    'nodeId' => $nodeId,
143
                ]);
144
            }
145
        }
146
147
        return $this->rewind()->recurse($param, $nodeIndex);
148
    }
149
150
    /**
151
     * Execute the flow
152
     *
153
     * @param null|mixed $param The eventual init argument to the first node
154
     *                          or, in case of a branch, the last relevant
155
     *                          argument from upstream Flow
156
     *
157
     * @throws NodalFlowException
158
     *
159
     * @return mixed the last result of the
160
     *               last returning value node
161
     */
162
    public function exec($param = null)
163
    {
164
        try {
165
            $result = $this->rewind()
166
                ->flowStart()
167
                ->recurse($param);
168
169
            // set flowStatus to make sure that we have the proper
170
            // value in flowEnd even when overridden without (or when
171
            // improperly) calling parent
172
            if ($this->flowStatus->isRunning()) {
173
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
174
            }
175
176
            $this->flowEnd();
177
178
            return $result;
179
        } catch (Exception $e) {
180
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION, $e);
181
            $this->flowEnd();
182
183
            throw $e;
184
        }
185
    }
186
187
    /**
188
     * Rewinds the Flow
189
     *
190
     * @return $this
191
     */
192
    public function rewind(): FlowInterface
193
    {
194
        $this->nodeCount       = count($this->nodes);
195
        $this->lastIdx         = $this->nodeCount - 1;
196
        $this->break           = false;
197
        $this->continue        = false;
198
        $this->interruptNodeId = null;
199
200
        return $this;
201
    }
202
203
    /**
204
     * @param FlowInterface $flow
205
     *
206
     * @throws NodalFlowException
207
     */
208
    protected function branchFlowCheck(FlowInterface $flow)
209
    {
210
        if (
211
            // this flow has parent already
212
            $flow->hasParent() ||
213
            // adding root flow in itself
214
            $this->getRootFlow($flow)->getId() === $this->getRootFlow($this)->getId()
215
        ) {
216
            throw new NodalFlowException('Cannot reuse Flow within Branches', 1, null, [
217
                'flowId'             => $this->getId(),
218
                'BranchFlowId'       => $flow->getId(),
219
                'BranchFlowParentId' => $flow->hasParent() ? $flow->getParent()->getId() : null,
220
            ]);
221
        }
222
    }
223
224
    /**
225
     * Triggered just before the flow starts
226
     *
227
     *
228
     * @return $this
229
     */
230
    protected function flowStart(): self
231
    {
232
        $this->flowMap->incrementFlow('num_exec')->flowStart();
233
        $this->listActiveEvent(!$this->hasParent())->triggerEvent(FlowEventInterface::FLOW_START);
234
        // flow started status kicks in after Event start to hint eventual children
235
        // this way, root flow is only running when a record hits a branch
236
        // and triggers a child flow flowStart() call
237
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
238
239
        return $this;
240
    }
241
242
    /**
243
     * Triggered right after the flow stops
244
     *
245
     * @return $this
246
     */
247
    protected function flowEnd(): self
248
    {
249
        $this->flowMap->flowEnd();
250
        $eventName = FlowEventInterface::FLOW_SUCCESS;
251
        $node      = null;
252
        if ($this->flowStatus->isException()) {
253
            $eventName = FlowEventInterface::FLOW_FAIL;
254
            $node      = $this->nodes[$this->nodeIdx];
255
        }
256
257
        // restore nodeIdx
258
        $this->nodeIdx = $this->lastIdx + 1;
259
        $this->triggerEvent($eventName, $node);
260
261
        return $this;
262
    }
263
264
    /**
265
     * Recurse over nodes which may as well be Flows and Traversable ...
266
     * Welcome to the abysses of recursion or iter-recursion ^^
267
     *
268
     * `recurse` perform kind of an hybrid recursion as the Flow
269
     * is effectively iterating and recurring over its Nodes,
270
     * which may as well be seen as over itself
271
     *
272
     * Iterating tends to limit the amount of recursion levels:
273
     * recursion is only triggered when executing a Traversable
274
     * Node's downstream Nodes while every consecutive exec
275
     * Nodes are executed within the while loop.
276
     * The size of the recursion context is kept to a minimum
277
     * as pretty much everything is done by the iterating instance
278
     *
279
     * @param mixed $param
280
     * @param int   $nodeIdx
281
     *
282
     * @return mixed the last value returned by the last
283
     *               returning value Node in the flow
284
     */
285
    protected function recurse($param = null, int $nodeIdx = 0)
286
    {
287
        while ($nodeIdx <= $this->lastIdx) {
288
            $node          = $this->nodes[$nodeIdx];
289
            $this->nodeIdx = $nodeIdx;
290
            $nodeStats     = &$this->flowMap->getNodeStat($node->getId());
291
            $returnVal     = $node->isReturningVal();
292
293
            if ($node->isTraversable()) {
294
                /** @var TraversableNodeInterface $node */
295
                foreach ($node->getTraversable($param) as $value) {
296
                    if ($returnVal) {
297
                        // pass current $value as next param
298
                        $param = $value;
299
                    }
300
301
                    ++$nodeStats['num_iterate'];
302
                    if (!($nodeStats['num_iterate'] % $this->progressMod)) {
303
                        $this->triggerEvent(FlowEventInterface::FLOW_PROGRESS, $node);
304
                    }
305
306
                    $param = $this->recurse($param, $nodeIdx + 1);
307
                    if ($this->continue) {
308
                        if ($this->continue = $this->interruptNode($node)) {
309
                            // since we want to bubble the continue upstream
310
                            // we break here waiting for next $param if any
311
                            ++$nodeStats['num_break'];
312
                            break;
313
                        }
314
315
                        // we drop one iteration
316
                        ++$nodeStats['num_continue'];
317
                        continue;
318
                    }
319
320
                    if ($this->break) {
321
                        // we drop all subsequent iterations
322
                        ++$nodeStats['num_break'];
323
                        $this->break = $this->interruptNode($node);
324
                        break;
325
                    }
326
                }
327
328
                // we reached the end of this Traversable and executed all its downstream Nodes
329
                ++$nodeStats['num_exec'];
330
331
                return $param;
332
            }
333
334
            /** @var ExecNodeInterface $node */
335
            $value = $node->exec($param);
336
            ++$nodeStats['num_exec'];
337
338
            if ($this->continue) {
339
                ++$nodeStats['num_continue'];
340
                // a continue does not need to bubble up unless
341
                // it specifically targets a node in this flow
342
                // or targets an upstream flow
343
                $this->continue = $this->interruptNode($node);
344
345
                return $param;
346
            }
347
348
            if ($this->break) {
349
                ++$nodeStats['num_break'];
350
                // a break always need to bubble up to the first upstream Traversable if any
351
                return $param;
352
            }
353
354
            if ($returnVal) {
355
                // pass current $value as next param
356
                $param = $value;
357
            }
358
359
            ++$nodeIdx;
360
        }
361
362
        // we reached the end of this recursion
363
        return $param;
364
    }
365
}
366