Completed
Pull Request — master (#7)
by Fabrice
02:29
created

NodalFlow::addPayload()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 3
1
<?php
2
3
/*
4
 * This file is part of NodalFlow.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/NodalFlow
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\NodalFlow;
11
12
use fab2s\NodalFlow\Flows\FlowAbstract;
13
use fab2s\NodalFlow\Flows\FlowInterface;
14
use fab2s\NodalFlow\Flows\FlowMap;
15
use fab2s\NodalFlow\Flows\FlowRegistry;
16
use fab2s\NodalFlow\Flows\FlowStatus;
17
use fab2s\NodalFlow\Nodes\BranchNodeInterface;
18
use fab2s\NodalFlow\Nodes\ExecNodeInterface;
19
use fab2s\NodalFlow\Nodes\NodeInterface;
20
use fab2s\NodalFlow\Nodes\TraversableNodeInterface;
21
22
/**
23
 * Class NodalFlow
24
 */
25
class NodalFlow extends FlowAbstract
26
{
27
    /**
28
     * Flow steps triggering callbacks
29
     */
30
    const FLOW_START    = 'start';
31
    const FLOW_PROGRESS = 'progress';
32
    const FLOW_SUCCESS  = 'success';
33
    const FLOW_FAIL     = 'fail';
34
35
    /**
36
     * @var array
37
     */
38
    protected $flowIncrements = [];
39
40
    /**
41
     * The current Node index
42
     *
43
     * @var int
44
     */
45
    protected $nodeIdx = 0;
46
47
    /**
48
     * The last index value
49
     *
50
     * @var int
51
     */
52
    protected $lastIdx = 0;
53
54
    /**
55
     * The number of Node in this Flow
56
     *
57
     * @var int
58
     */
59
    protected $nodeCount = 0;
60
61
    /**
62
     * Instantiate a Flow
63
     */
64
    public function __construct()
65
    {
66
        $this->flowMap  = new FlowMap($this, $this->flowIncrements);
67
        $this->registry = new FlowRegistry;
68
    }
69
70
    /**
71
     * Adds a Node to the flow
72
     *
73
     * @param NodeInterface $node
74
     *
75
     * @throws NodalFlowException
76
     *
77
     * @return $this
78
     */
79
    public function add(NodeInterface $node)
80
    {
81
        if ($node instanceof BranchNodeInterface) {
82
            // this node is a branch
83
            $childFlow = $node->getPayload();
84
            $this->branchFlowCheck($childFlow);
85
            $childFlow->setParent($this);
86
        }
87
88
        $node->setCarrier($this);
89
90
        $this->flowMap->register($node, $this->nodeIdx);
91
        $this->nodes[$this->nodeIdx] = $node;
92
93
        ++$this->nodeIdx;
94
95
        return $this;
96
    }
97
98
    /**
99
     * Adds a Payload Node to the Flow
100
     *
101
     * @param callable $payload
102
     * @param mixed    $isAReturningVal
103
     * @param mixed    $isATraversable
104
     *
105
     * @return $this
106
     */
107
    public function addPayload(callable $payload, $isAReturningVal, $isATraversable = false)
108
    {
109
        $node = PayloadNodeFactory::create($payload, $isAReturningVal, $isATraversable);
110
111
        $this->add($node);
112
113
        return $this;
114
    }
115
116
    /**
117
     * @param string|null $nodeId
118
     * @param mixed|null  $param
119
     *
120
     * @throws NodalFlowException
121
     *
122
     * @return mixed
123
     */
124
    public function sendTo($nodeId = null, $param = null)
125
    {
126
        $nodeIndex = 0;
127
        if ($nodeId !== null) {
128
            if (!($nodeIndex = $this->flowMap->getNodeIndex($nodeId))) {
129
                throw new NodalFlowException('Cannot sendTo without valid Node target', 1, null, [
130
                    'flowId' => $this->getId(),
131
                    'nodeId' => $nodeId,
132
                ]);
133
            }
134
        }
135
136
        return $this->rewind()->recurse($param, $nodeIndex);
137
    }
138
139
    /**
140
     * Execute the flow
141
     *
142
     * @param null|mixed $param The eventual init argument to the first node
143
     *                          or, in case of a branch, the last relevant
144
     *                          argument from upstream Flow
145
     *
146
     * @throws NodalFlowException
147
     *
148
     * @return mixed the last result of the
149
     *               last returning value node
150
     */
151
    public function exec($param = null)
152
    {
153
        try {
154
            $result = $this->rewind()
155
                    ->flowStart()
156
                    ->recurse($param);
157
158
            // set flowStatus to make sure that we have the proper
159
            // value in flowEnd even when overridden without (or when
160
            // improperly) calling parent
161
            if ($this->flowStatus->isRunning()) {
162
                $this->flowStatus = new FlowStatus(FlowStatus::FLOW_CLEAN);
163
            }
164
165
            $this->flowEnd();
166
167
            return $result;
168
        } catch (\Exception $e) {
169
            $this->flowStatus = new FlowStatus(FlowStatus::FLOW_EXCEPTION);
170
            $this->flowEnd();
171
            if ($e instanceof NodalFlowException) {
172
                throw $e;
173
            }
174
175
            throw new NodalFlowException('Flow execution failed', 0, $e, [
176
                'nodeMap' => $this->getNodeMap(),
177
            ]);
178
        }
179
    }
180
181
    /**
182
     * Rewinds the Flow
183
     *
184
     * @return $this
185
     */
186
    public function rewind()
187
    {
188
        $this->nodeCount       = count($this->nodes);
189
        $this->lastIdx         = $this->nodeCount - 1;
190
        $this->break           = false;
191
        $this->continue        = false;
192
        $this->interruptNodeId = null;
193
194
        return $this;
195
    }
196
197
    /**
198
     * @param FlowInterface $flow
199
     *
200
     * @throws NodalFlowException
201
     */
202
    protected function branchFlowCheck(FlowInterface $flow)
203
    {
204
        if (
205
            // this flow has parent already
206
            $flow->hasParent() ||
207
            // adding root flow in itself
208
            $this->getRootFlow($flow)->getId() === $this->getRootFlow($this)->getId()
209
        ) {
210
            throw new NodalFlowException('Cannot reuse Flow within Branches', 1, null, [
211
                'flowId'             => $this->getId(),
212
                'BranchFlowId'       => $flow->getId(),
213
                'BranchFlowParentId' => $flow->hasParent() ? $flow->getParent()->getId() : null,
214
            ]);
215
        }
216
    }
217
218
    /**
219
     * Triggered just before the flow starts
220
     *
221
     * @return $this
222
     */
223
    protected function flowStart()
224
    {
225
        $this->flowMap->incrementFlow('num_exec')->flowStart();
226
        $this->triggerCallback(static::FLOW_START);
227
228
        // flow is started
229
        $this->flowStatus = new FlowStatus(FlowStatus::FLOW_RUNNING);
230
231
        return $this;
232
    }
233
234
    /**
235
     * Triggered right after the flow stops
236
     *
237
     * @return $this
238
     */
239
    protected function flowEnd()
240
    {
241
        $this->flowMap->flowEnd();
242
243
        $this->triggerCallback($this->flowStatus->isException() ? static::FLOW_FAIL : static::FLOW_SUCCESS);
244
245
        return $this;
246
    }
247
248
    /**
249
     * Recurse over nodes which may as well be Flows and
250
     * Traversable ...
251
     * Welcome to the abysses of recursion or iter-recursion ^^
252
     *
253
     * `recurse` perform kind of an hybrid recursion as the
254
     * Flow is effectively iterating and recurring over its
255
     * Nodes, which may as well be seen as over itself
256
     *
257
     * Iterating tends to limit the amount of recursion levels:
258
     * recursion is only triggered when executing a Traversable
259
     * Node's downstream Nodes while every consecutive exec
260
     * Nodes are executed within a while loop.
261
     * And recursion keeps the size of the recursion context
262
     * to a minimum as pretty much everything is done by the
263
     * iterating instance
264
     *
265
     * @param mixed $param
266
     * @param int   $nodeIdx
267
     *
268
     * @return mixed the last value returned by the last
269
     *               returning value Node in the flow
270
     */
271
    protected function recurse($param = null, $nodeIdx = 0)
272
    {
273
        while ($nodeIdx <= $this->lastIdx) {
274
            $node      = $this->nodes[$nodeIdx];
275
            $nodeStats = &$this->flowMap->getNodeStat($node->getId());
276
277
            $returnVal = $node->isReturningVal();
278
279
            if ($node->isTraversable()) {
280
                /** @var TraversableNodeInterface $node */
281
                foreach ($node->getTraversable($param) as $value) {
282
                    if ($returnVal) {
283
                        // pass current $value as next param
284
                        $param = $value;
285
                    }
286
287
                    ++$nodeStats['num_iterate'];
288
                    if (!($nodeStats['num_iterate'] % $this->progressMod)) {
289
                        $this->triggerCallback(static::FLOW_PROGRESS, $node);
290
                    }
291
292
                    $param = $this->recurse($param, $nodeIdx + 1);
293
                    if ($this->continue) {
294
                        if ($this->continue = $this->interruptNode($node)) {
295
                            // since we want to bubble the continue upstream
296
                            // we break here waiting for next $param if any
297
                            ++$nodeStats['num_break'];
298
                            break;
299
                        }
300
301
                        // we drop one iteration
302
                        ++$nodeStats['num_continue'];
303
                        continue;
304
                    }
305
306
                    if ($this->break) {
307
                        // we drop all subsequent iterations
308
                        ++$nodeStats['num_break'];
309
                        $this->break = $this->interruptNode($node);
310
                        break;
311
                    }
312
                }
313
314
                // we reached the end of this Traversable and executed all its downstream Nodes
315
                ++$nodeStats['num_exec'];
316
317
                return $param;
318
            }
319
320
            /** @var ExecNodeInterface $node */
321
            $value = $node->exec($param);
322
            ++$nodeStats['num_exec'];
323
324
            if ($this->continue) {
325
                ++$nodeStats['num_continue'];
326
                // a continue does not need to bubble up unless
327
                // it specifically targets a node in this flow
328
                // or targets an upstream flow
329
                $this->continue = $this->interruptNode($node);
330
331
                return $param;
332
            }
333
334
            if ($this->break) {
335
                ++$nodeStats['num_break'];
336
                // a break always need to bubble up to the first upstream Traversable if any
337
                return $param;
338
            }
339
340
            if ($returnVal) {
341
                // pass current $value as next param
342
                $param = $value;
343
            }
344
345
            ++$nodeIdx;
346
        }
347
348
        // we reached the end of this recursion
349
        return $param;
350
    }
351
}
352