1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace tomzx\Dataflow\Processor; |
4
|
|
|
|
5
|
|
|
use InvalidArgumentException; |
6
|
|
|
use RuntimeException; |
7
|
|
|
use tomzx\Dataflow\Node; |
8
|
|
|
use tomzx\Dataflow\Result; |
9
|
|
|
|
10
|
|
|
class Sequential extends Processor { |
11
|
|
|
/** |
12
|
|
|
* Node containing the callable and its result after an iteration of call to process(). |
13
|
|
|
* |
14
|
|
|
* @var array |
15
|
|
|
*/ |
16
|
|
|
private $processingNodes = []; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* The iteration at which they can be processed (useful for parallelization). |
20
|
|
|
* |
21
|
|
|
* @var array |
22
|
|
|
*/ |
23
|
|
|
private $processIteration = []; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @param array $nodes |
27
|
|
|
*/ |
28
|
10 |
|
public function initialize(array $nodes) |
29
|
|
|
{ |
30
|
10 |
|
parent::initialize($nodes); |
31
|
|
|
|
32
|
10 |
|
$this->validateNodes(); |
33
|
7 |
|
$this->buildProcessingNodes(); |
34
|
7 |
|
$this->prepareForProcessing(); |
35
|
5 |
|
} |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* @return void |
39
|
|
|
*/ |
40
|
10 |
View Code Duplication |
private function validateNodes() |
|
|
|
|
41
|
|
|
{ |
42
|
10 |
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
43
|
10 |
|
if (empty($nodes)) { |
44
|
1 |
|
throw new InvalidArgumentException('A node should have at least a callable function'); |
45
|
|
|
} |
46
|
|
|
|
47
|
9 |
|
$callable = $nodes[0]; |
48
|
9 |
|
if ( ! is_callable($callable)) { |
49
|
1 |
|
throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Expected type callable as first array value, but got type ' . gettype($callable) . '.'); |
50
|
|
|
} |
51
|
8 |
|
} |
52
|
|
|
|
53
|
8 |
|
$identifiers = array_keys($this->nodes); |
54
|
8 |
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
55
|
8 |
|
$testedNodes = array_slice($nodes, 1); |
56
|
8 |
|
$missingNodes = array_diff($testedNodes, $identifiers); |
57
|
8 |
|
if ( ! empty($missingNodes)) { |
58
|
1 |
|
throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Nodes [' . implode(', ', $missingNodes) . '] do not exist. Cannot create a graph to missing nodes.'); |
59
|
|
|
} |
60
|
7 |
|
} |
61
|
7 |
|
} |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* @return void |
65
|
|
|
*/ |
66
|
7 |
View Code Duplication |
private function buildProcessingNodes() |
|
|
|
|
67
|
|
|
{ |
68
|
7 |
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
69
|
7 |
|
$callable = $nodes[0]; |
70
|
7 |
|
$this->processingNodes[$nodeIndex] = new Node($callable); |
71
|
7 |
|
} |
72
|
7 |
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @return void |
76
|
|
|
*/ |
77
|
7 |
View Code Duplication |
private function prepareForProcessing() |
|
|
|
|
78
|
|
|
{ |
79
|
7 |
|
$dependents = []; |
80
|
7 |
|
$dependencies = []; |
81
|
7 |
|
$dependencyCount = []; |
82
|
7 |
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
83
|
7 |
|
if ( ! array_key_exists($nodeIndex, $dependents)) { |
84
|
7 |
|
$dependents[$nodeIndex] = []; |
85
|
7 |
|
} |
86
|
|
|
|
87
|
7 |
|
$nodeDependencies = array_splice($nodes, 1); |
88
|
7 |
|
foreach ($nodeDependencies as $dependency) { |
89
|
6 |
|
$dependents[$dependency][] = $nodeIndex; |
90
|
7 |
|
} |
91
|
7 |
|
$dependencies[$nodeIndex] = array_flip($nodeDependencies); |
92
|
7 |
|
$dependencyCount[$nodeIndex] = count($nodeDependencies); |
93
|
7 |
|
} |
94
|
|
|
|
95
|
7 |
|
while ( ! empty($dependencyCount)) { |
96
|
|
|
// Order the nodes by their number of unresolved dependencies so that we may |
97
|
|
|
// indicate that the nodes which have no unresolved dependencies left are ready to be processed. |
98
|
7 |
|
asort($dependencyCount); |
99
|
|
|
|
100
|
7 |
|
if (reset($dependencyCount) !== 0) { |
101
|
2 |
|
throw new RuntimeException('Cannot create a sequence of execution for this graph.'); |
102
|
|
|
} |
103
|
|
|
|
104
|
5 |
|
$iteration =& $this->processIteration[]; |
105
|
5 |
|
foreach ($dependencyCount as $nodeIndex => $count) { |
106
|
5 |
|
if ($count !== 0) { |
107
|
4 |
|
break; |
108
|
|
|
} |
109
|
|
|
|
110
|
5 |
|
assert(empty($dependencies[$nodeIndex])); |
111
|
5 |
|
assert($dependencyCount[$nodeIndex] === 0); |
112
|
|
|
|
113
|
5 |
|
$iteration[] = $nodeIndex; |
114
|
|
|
|
115
|
5 |
|
unset($dependencies[$nodeIndex]); |
116
|
5 |
|
unset($dependencyCount[$nodeIndex]); |
117
|
5 |
|
foreach ($dependents[$nodeIndex] as $dependent) { |
118
|
4 |
|
unset($dependencies[$dependent][$nodeIndex]); |
119
|
4 |
|
--$dependencyCount[$dependent]; |
120
|
5 |
|
} |
121
|
5 |
|
unset($dependents[$nodeIndex]); |
122
|
5 |
|
} |
123
|
5 |
|
} |
124
|
5 |
|
} |
125
|
|
|
|
126
|
|
|
/** |
127
|
|
|
* @param array $arguments |
128
|
|
|
* @return \tomzx\Dataflow\Result |
129
|
|
|
*/ |
130
|
5 |
|
public function process(array $arguments) |
131
|
|
|
{ |
132
|
|
|
/** @var \tomzx\Dataflow\Node $node */ |
133
|
5 |
|
foreach ($this->processIteration as $nodes) { |
134
|
5 |
|
foreach ($nodes as $nodeIndex) { |
135
|
5 |
|
$node = $this->processingNodes[$nodeIndex]; |
136
|
5 |
|
$arguments = $this->getArguments($nodeIndex) ?: $arguments; |
137
|
5 |
|
call_user_func_array([$node, 'process'], $arguments); |
138
|
5 |
|
} |
139
|
5 |
|
} |
140
|
|
|
|
141
|
5 |
|
return new Result($this->processingNodes); |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @param int|string $nodeIndex |
146
|
|
|
* @return array |
147
|
|
|
*/ |
148
|
5 |
View Code Duplication |
private function getArguments($nodeIndex) |
|
|
|
|
149
|
|
|
{ |
150
|
5 |
|
$argumentsIndex = array_slice($this->nodes[$nodeIndex], 1); |
151
|
5 |
|
$arguments = []; |
152
|
5 |
|
foreach ($argumentsIndex as $nodeIndex) { |
153
|
4 |
|
$arguments[] = $this->processingNodes[$nodeIndex]->output(); |
154
|
5 |
|
} |
155
|
5 |
|
return $arguments; |
156
|
|
|
} |
157
|
|
|
} |
158
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.