Completed
Pull Request — master (#1)
by Fabrice
06:06
created

NodalFlow::interruptFlow()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 22
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 22
c 0
b 0
f 0
rs 8.9197
cc 4
eloc 16
nc 5
nop 2
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use fab2s\NodalFlow\Callbacks\CallbackInterface;
13
use fab2s\NodalFlow\Flows\FlowAbstract;
14
use fab2s\NodalFlow\Flows\FlowMap;
15
use fab2s\NodalFlow\Flows\FlowStatus;
16
use fab2s\NodalFlow\Flows\InterrupterInterface;
17
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
18
use fab2s\NodalFlow\Nodes\ExecNodeInterface;
19
use fab2s\NodalFlow\Nodes\NodeInterface;
20
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
21
22
/**
23
 * Class NodalFlow
24
 */
25
class NodalFlow extends FlowAbstract
26
{
27
    /**
28
     * Flow steps triggering callbacks
29
     */
30
    const FLOW_START    = 'start';
31
    const FLOW_PROGRESS = 'progress';
32
    const FLOW_SUCCESS  = 'success';
33
    const FLOW_FAIL     = 'fail';
34
35
    /**
36
     * @var array
37
     */
38
    protected $flowIncrements = [];
39
40
    /**
41
     * The current Node index
42
     *
43
     * @var int
44
     */
45
    protected $nodeIdx = 0;
46
47
    /**
48
     * The last index value
49
     *
50
     * @var int
51
     */
52
    protected $lastIdx = 0;
53
54
    /**
55
     * The number of Node in this Flow
56
     *
57
     * @var int
58
     */
59
    protected $nodeCount = 0;
60
61
    /**
62
     * The number of iteration within this Flow
63
     *
64
     * @var int
65
     */
66
    protected $numIterate = 0;
67
68
    /**
69
     * The current registered Callback class if any
70
     *
71
     * @var CallbackInterface|null
72
     */
73
    protected $callBack;
74
75
    /**
76
     * Instantiate a Flow
77
     */
78
    public function __construct()
79
    {
80
        $this->flowMap = new FlowMap($this, $this->flowIncrements);
81
    }
82
83
    /**
84
     * Adds a Node to the flow
85
     *
86
     * @param NodeInterface $node
87
     *
88
     * @throws NodalFlowException
89
     *
90
     * @return $this
91
     */
92
    public function add(NodeInterface $node)
93
    {
94
        if ($node instanceof BranchNodeInterface) {
95
            // this node is a branch, set it's parent
96
            $node->getPayload()->setParent($this);
97
        }
98
99
        $node->setCarrier($this);
100
101
        $this->flowMap->register($node, $this->nodeIdx);
102
        $this->nodes[$this->nodeIdx] = $node;
103
104
        ++$this->nodeIdx;
105
106
        return $this;
107
    }
108
109
    /**
110
     * Adds a Payload Node to the Flow
111
     *
112
     * @param callable $payload
113
     * @param mixed    $isAReturningVal
114
     * @param mixed    $isATraversable
115
     *
116
     * @return $this
117
     */
118
    public function addPayload(callable $payload, $isAReturningVal, $isATraversable = false)
119
    {
120
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
121
122
        $this->add($node);
123
124
        return $this;
125
    }
126
127
    /**
128
     * Register callback class
129
     *
130
     * @param CallbackInterface $callBack
131
     *
132
     * @return $this
133
     */
134
    public function setCallBack(CallbackInterface $callBack)
135
    {
136
        $this->callBack = $callBack;
137
138
        return $this;
139
    }
140
141
    /**
142
     * Execute the flow
143
     *
144
     * @param null|mixed $param The eventual init argument to the first node
145
     *                          or, in case of a branch, the last relevant
146
     *                          argument from upstream Flow
147
     *
148
     * @throws NodalFlowException
149
     *
150
     * @return mixed the last result of the
151
     *               last returning value node
152
     */
153
    public function exec($param = null)
154
    {
155
        try {
156
            $result = $this->rewind()
157
                    ->flowStart()
158
                    ->recurse($param);
159
160
            // set flowStatus to make sure that we have the proper
161
            // value in flowEnd even when overridden without (or when
162
            // improperly) calling parent
163
            if ($this->flowStatus->isRunning()) {
164
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
165
            }
166
167
            $this->flowEnd();
168
169
            return $result;
170
        } catch (\Exception $e) {
171
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION);
172
            $this->flowEnd();
173
            if ($e instanceof NodalFlowException) {
174
                throw $e;
175
            }
176
177
            throw new NodalFlowException('Flow execution failed', 0, $e, [
178
                'nodeMap' => $this->getNodeMap(),
179
            ]);
180
        }
181
    }
182
183
    /**
184
     * Rewinds the Flow
185
     *
186
     * @return $this
187
     */
188
    public function rewind()
189
    {
190
        $this->nodeCount       = count($this->nodes);
191
        $this->lastIdx         = $this->nodeCount - 1;
192
        $this->break           = false;
193
        $this->continue        = false;
194
        $this->interruptNodeId = null;
195
196
        return $this;
197
    }
198
199
    /**
200
     * Break the flow's execution, conceptually similar to breaking
201
     * a regular loop
202
     *
203
     * @param InterrupterInterface|null $flowInterrupt
204
     *
205
     * @return $this
206
     */
207
    public function breakFlow(InterrupterInterface $flowInterrupt = null)
208
    {
209
        return $this->interruptFlow(InterrupterInterface::TYPE_BREAK, $flowInterrupt);
210
    }
211
212
    /**
213
     * Continue the flow's execution, conceptually similar to continuing
214
     * a regular loop
215
     *
216
     * @param InterrupterInterface|null $flowInterrupt
217
     *
218
     * @return $this
219
     */
220
    public function continueFlow(InterrupterInterface $flowInterrupt = null)
221
    {
222
        return $this->interruptFlow(InterrupterInterface::TYPE_CONTINUE, $flowInterrupt);
223
    }
224
225
    /**
226
     * Triggered just before the flow starts
227
     *
228
     * @return $this
229
     */
230
    protected function flowStart()
231
    {
232
        $this->flowMap->incrementFlow('num_exec')->flowStart();
233
        $this->triggerCallback(static::FLOW_START);
234
235
        // flow is started
236
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
237
238
        return $this;
239
    }
240
241
    /**
242
     * Triggered right after the flow stops
243
     *
244
     * @return $this
245
     */
246
    protected function flowEnd()
247
    {
248
        $this->flowMap->flowEnd();
249
250
        $this->triggerCallback($this->flowStatus->isException() ? static::FLOW_FAIL : static::FLOW_SUCCESS);
251
252
        return $this;
253
    }
254
255
    /**
256
     * Recurse over nodes which may as well be Flows and
257
     * Traversable ...
258
     * Welcome to the abysses of recursion or iter-recursion ^^
259
     *
260
     * `recurse` perform kind of an hybrid recursion as the
261
     * Flow is effectively iterating and recurring over its
262
     * Nodes, which may as well be seen as over itself
263
     *
264
     * Iterating tends to limit the amount of recursion levels:
265
     * recursion is only triggered when executing a Traversable
266
     * Node's downstream Nodes while every consecutive exec
267
     * Nodes are executed within a while loop.
268
     * And recursion keeps the size of the recursion context
269
     * to a minimum as pretty much everything is done by the
270
     * iterating instance
271
     *
272
     * @param mixed $param
273
     * @param int   $nodeIdx
274
     *
275
     * @return mixed the last value returned by the last
276
     *               returning value Node in the flow
277
     */
278
    protected function recurse($param = null, $nodeIdx = 0)
279
    {
280
        while ($nodeIdx <= $this->lastIdx) {
281
            $node      = $this->nodes[$nodeIdx];
282
            $nodeStats = &$this->flowMap->getNodeStat($node->getId());
283
284
            $returnVal = $node->isReturningVal();
285
286
            if ($node->isTraversable()) {
287
                /** @var TraversableNodeInterface $node */
288
                foreach ($node->getTraversable($param) as $value) {
289
                    if ($returnVal) {
290
                        // pass current $value as next param
291
                        $param = $value;
292
                    }
293
294
                    ++$nodeStats['num_iterate'];
295
                    ++$this->numIterate;
296
                    if (!($this->numIterate % $this->progressMod)) {
297
                        $this->triggerCallback(static::FLOW_PROGRESS, $node);
298
                    }
299
300
                    $param = $this->recurse($param, $nodeIdx + 1);
301
                    if ($this->continue) {
302
                        if ($this->continue = $this->interruptNode($node)) {
303
                            // since we want to bubble the continue upstream
304
                            // we break here waiting for next $param if any
305
                            ++$nodeStats['num_break'];
306
                            break;
307
                        }
308
309
                        // we drop one iteration
310
                        ++$nodeStats['num_continue'];
311
                        continue;
312
                    }
313
314
                    if ($this->break) {
315
                        // we drop all subsequent iterations
316
                        ++$nodeStats['num_break'];
317
                        $this->break = $this->interruptNode($node);
318
                        break;
319
                    }
320
                }
321
322
                // we reached the end of this Traversable and executed all its downstream Nodes
323
                ++$nodeStats['num_exec'];
324
325
                return $param;
326
            }
327
328
            /** @var ExecNodeInterface $node */
329
            $value = $node->exec($param);
330
            ++$nodeStats['num_exec'];
331
332
            if ($this->continue) {
333
                ++$nodeStats['num_continue'];
334
                // a continue does not need to bubble up unless
335
                // it specifically targets a node in this flow
336
                // or targets an upstream flow
337
                $this->continue = $this->interruptNode($node);
338
339
                return $param;
340
            }
341
342
            if ($this->break) {
343
                ++$nodeStats['num_break'];
344
                // a break always need to bubble up to the first upstream Traversable if any
345
                return $param;
346
            }
347
348
            if ($returnVal) {
349
                // pass current $value as next param
350
                $param = $value;
351
            }
352
353
            ++$nodeIdx;
354
        }
355
356
        // we reached the end of this recursion
357
        return $param;
358
    }
359
360
    /**
361
     * KISS helper to trigger Callback slots
362
     *
363
     * @param string             $which
364
     * @param null|NodeInterface $node
365
     *
366
     * @return $this
367
     */
368
    protected function triggerCallback($which, NodeInterface $node = null)
369
    {
370
        if (null !== $this->callBack) {
371
            $this->callBack->$which($this, $node);
372
        }
373
374
        return $this;
375
    }
376
}
377