Passed
Push — master ( 2f2d63...13c5ea )
by Fabrice
02:00
created

NodalFlow::flowEnd()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
c 0
b 0
f 0
rs 9.4285
cc 2
eloc 4
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use fab2s\NodalFlow\Flows\FlowAbstract;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowMap;
15
use fab2s\NodalFlow\Flows\FlowRegistry;
16
use fab2s\NodalFlow\Flows\FlowStatus;
17
use fab2s\NodalFlow\Flows\InterrupterInterface;
18
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
19
use fab2s\NodalFlow\Nodes\ExecNodeInterface;
20
use fab2s\NodalFlow\Nodes\NodeInterface;
21
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
22
23
/**
24
 * Class NodalFlow
25
 */
26
class NodalFlow extends FlowAbstract
27
{
28
    /**
29
     * Flow steps triggering callbacks
30
     */
31
    const FLOW_START    = 'start';
32
    const FLOW_PROGRESS = 'progress';
33
    const FLOW_SUCCESS  = 'success';
34
    const FLOW_FAIL     = 'fail';
35
36
    /**
37
     * @var array
38
     */
39
    protected $flowIncrements = [];
40
41
    /**
42
     * The current Node index
43
     *
44
     * @var int
45
     */
46
    protected $nodeIdx = 0;
47
48
    /**
49
     * The last index value
50
     *
51
     * @var int
52
     */
53
    protected $lastIdx = 0;
54
55
    /**
56
     * The number of Node in this Flow
57
     *
58
     * @var int
59
     */
60
    protected $nodeCount = 0;
61
62
    /**
63
     * Instantiate a Flow
64
     */
65
    public function __construct()
66
    {
67
        $this->flowMap  = new FlowMap($this, $this->flowIncrements);
68
        $this->registry = new FlowRegistry;
69
    }
70
71
    /**
72
     * Adds a Node to the flow
73
     *
74
     * @param NodeInterface $node
75
     *
76
     * @throws NodalFlowException
77
     *
78
     * @return $this
79
     */
80
    public function add(NodeInterface $node)
81
    {
82
        if ($node instanceof BranchNodeInterface) {
83
            // this node is a branch
84
            $childFlow = $node->getPayload();
85
            $this->branchFlowCheck($childFlow);
86
            $childFlow->setParent($this);
87
        }
88
89
        $node->setCarrier($this);
90
91
        $this->flowMap->register($node, $this->nodeIdx);
92
        $this->nodes[$this->nodeIdx] = $node;
93
94
        ++$this->nodeIdx;
95
96
        return $this;
97
    }
98
99
    /**
100
     * Adds a Payload Node to the Flow
101
     *
102
     * @param callable $payload
103
     * @param mixed    $isAReturningVal
104
     * @param mixed    $isATraversable
105
     *
106
     * @return $this
107
     */
108
    public function addPayload(callable $payload, $isAReturningVal, $isATraversable = false)
109
    {
110
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
111
112
        $this->add($node);
113
114
        return $this;
115
    }
116
117
    /**
118
     * Execute the flow
119
     *
120
     * @param null|mixed $param The eventual init argument to the first node
121
     *                          or, in case of a branch, the last relevant
122
     *                          argument from upstream Flow
123
     *
124
     * @throws NodalFlowException
125
     *
126
     * @return mixed the last result of the
127
     *               last returning value node
128
     */
129
    public function exec($param = null)
130
    {
131
        try {
132
            $result = $this->rewind()
133
                    ->flowStart()
134
                    ->recurse($param);
135
136
            // set flowStatus to make sure that we have the proper
137
            // value in flowEnd even when overridden without (or when
138
            // improperly) calling parent
139
            if ($this->flowStatus->isRunning()) {
140
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
141
            }
142
143
            $this->flowEnd();
144
145
            return $result;
146
        } catch (\Exception $e) {
147
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION);
148
            $this->flowEnd();
149
            if ($e instanceof NodalFlowException) {
150
                throw $e;
151
            }
152
153
            throw new NodalFlowException('Flow execution failed', 0, $e, [
154
                'nodeMap' => $this->getNodeMap(),
155
            ]);
156
        }
157
    }
158
159
    /**
160
     * Rewinds the Flow
161
     *
162
     * @return $this
163
     */
164
    public function rewind()
165
    {
166
        $this->nodeCount       = count($this->nodes);
167
        $this->lastIdx         = $this->nodeCount - 1;
168
        $this->break           = false;
169
        $this->continue        = false;
170
        $this->interruptNodeId = null;
171
172
        return $this;
173
    }
174
175
    /**
176
     * Break the flow's execution, conceptually similar to breaking
177
     * a regular loop
178
     *
179
     * @param InterrupterInterface|null $flowInterrupt
180
     *
181
     * @return $this
182
     */
183
    public function breakFlow(InterrupterInterface $flowInterrupt = null)
184
    {
185
        return $this->interruptFlow(InterrupterInterface::TYPE_BREAK, $flowInterrupt);
186
    }
187
188
    /**
189
     * Continue the flow's execution, conceptually similar to continuing
190
     * a regular loop
191
     *
192
     * @param InterrupterInterface|null $flowInterrupt
193
     *
194
     * @return $this
195
     */
196
    public function continueFlow(InterrupterInterface $flowInterrupt = null)
197
    {
198
        return $this->interruptFlow(InterrupterInterface::TYPE_CONTINUE, $flowInterrupt);
199
    }
200
201
    /**
202
     * @param FlowInterface $flow
203
     *
204
     * @throws NodalFlowException
205
     */
206
    protected function branchFlowCheck(FlowInterface $flow)
207
    {
208
        if (
209
            // this flow has parent already
210
            $flow->hasParent() ||
211
            // adding root flow in itself
212
            $this->getRootFlow($flow)->getId() === $this->getRootFlow($this)->getId()
213
        ) {
214
            throw new NodalFlowException('Cannot reuse Flow within Branches', 1, null, [
215
                'flowId'             => $this->getId(),
216
                'BranchFlowId'       => $flow->getId(),
217
                'BranchFlowParentId' => $flow->hasParent() ? $flow->getParent()->getId() : null,
218
            ]);
219
        }
220
    }
221
222
    /**
223
     * Triggered just before the flow starts
224
     *
225
     * @return $this
226
     */
227
    protected function flowStart()
228
    {
229
        $this->flowMap->incrementFlow('num_exec')->flowStart();
230
        $this->triggerCallback(static::FLOW_START);
231
232
        // flow is started
233
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
234
235
        return $this;
236
    }
237
238
    /**
239
     * Triggered right after the flow stops
240
     *
241
     * @return $this
242
     */
243
    protected function flowEnd()
244
    {
245
        $this->flowMap->flowEnd();
246
247
        $this->triggerCallback($this->flowStatus->isException() ? static::FLOW_FAIL : static::FLOW_SUCCESS);
248
249
        return $this;
250
    }
251
252
    /**
253
     * Recurse over nodes which may as well be Flows and
254
     * Traversable ...
255
     * Welcome to the abysses of recursion or iter-recursion ^^
256
     *
257
     * `recurse` perform kind of an hybrid recursion as the
258
     * Flow is effectively iterating and recurring over its
259
     * Nodes, which may as well be seen as over itself
260
     *
261
     * Iterating tends to limit the amount of recursion levels:
262
     * recursion is only triggered when executing a Traversable
263
     * Node's downstream Nodes while every consecutive exec
264
     * Nodes are executed within a while loop.
265
     * And recursion keeps the size of the recursion context
266
     * to a minimum as pretty much everything is done by the
267
     * iterating instance
268
     *
269
     * @param mixed $param
270
     * @param int   $nodeIdx
271
     *
272
     * @return mixed the last value returned by the last
273
     *               returning value Node in the flow
274
     */
275
    protected function recurse($param = null, $nodeIdx = 0)
276
    {
277
        while ($nodeIdx <= $this->lastIdx) {
278
            $node      = $this->nodes[$nodeIdx];
279
            $nodeStats = &$this->flowMap->getNodeStat($node->getId());
280
281
            $returnVal = $node->isReturningVal();
282
283
            if ($node->isTraversable()) {
284
                /** @var TraversableNodeInterface $node */
285
                foreach ($node->getTraversable($param) as $value) {
286
                    if ($returnVal) {
287
                        // pass current $value as next param
288
                        $param = $value;
289
                    }
290
291
                    ++$nodeStats['num_iterate'];
292
                    if (!($nodeStats['num_iterate'] % $this->progressMod)) {
293
                        $this->triggerCallback(static::FLOW_PROGRESS, $node);
294
                    }
295
296
                    $param = $this->recurse($param, $nodeIdx + 1);
297
                    if ($this->continue) {
298
                        if ($this->continue = $this->interruptNode($node)) {
299
                            // since we want to bubble the continue upstream
300
                            // we break here waiting for next $param if any
301
                            ++$nodeStats['num_break'];
302
                            break;
303
                        }
304
305
                        // we drop one iteration
306
                        ++$nodeStats['num_continue'];
307
                        continue;
308
                    }
309
310
                    if ($this->break) {
311
                        // we drop all subsequent iterations
312
                        ++$nodeStats['num_break'];
313
                        $this->break = $this->interruptNode($node);
314
                        break;
315
                    }
316
                }
317
318
                // we reached the end of this Traversable and executed all its downstream Nodes
319
                ++$nodeStats['num_exec'];
320
321
                return $param;
322
            }
323
324
            /** @var ExecNodeInterface $node */
325
            $value = $node->exec($param);
326
            ++$nodeStats['num_exec'];
327
328
            if ($this->continue) {
329
                ++$nodeStats['num_continue'];
330
                // a continue does not need to bubble up unless
331
                // it specifically targets a node in this flow
332
                // or targets an upstream flow
333
                $this->continue = $this->interruptNode($node);
334
335
                return $param;
336
            }
337
338
            if ($this->break) {
339
                ++$nodeStats['num_break'];
340
                // a break always need to bubble up to the first upstream Traversable if any
341
                return $param;
342
            }
343
344
            if ($returnVal) {
345
                // pass current $value as next param
346
                $param = $value;
347
            }
348
349
            ++$nodeIdx;
350
        }
351
352
        // we reached the end of this recursion
353
        return $param;
354
    }
355
}
356