1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace tomzx\Dataflow\Processor; |
4
|
|
|
|
5
|
|
|
use InvalidArgumentException; |
6
|
|
|
use Pool; |
7
|
|
|
use RuntimeException; |
8
|
|
|
use tomzx\Dataflow\Node; |
9
|
|
|
use tomzx\Dataflow\Processor\Parallel\Job; |
10
|
|
|
use tomzx\Dataflow\Result; |
11
|
|
|
use Worker; |
12
|
|
|
|
13
|
|
|
// TODO: Remove duplicate code from the Sequential processor <[email protected]> |
14
|
|
|
class Parallel extends Processor { |
15
|
|
|
/** |
16
|
|
|
* Node containing the callable and its result after an iteration of call to process(). |
17
|
|
|
* |
18
|
|
|
* @var array |
19
|
|
|
*/ |
20
|
|
|
private $processingNodes = []; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* The iteration at which they can be processed (useful for parallelization). |
24
|
|
|
* |
25
|
|
|
* @var array |
26
|
|
|
*/ |
27
|
|
|
private $processIteration = []; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var int |
31
|
|
|
*/ |
32
|
|
|
private $size; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @param int $size |
36
|
|
|
*/ |
37
|
|
|
public function __construct($size = 5) |
38
|
|
|
{ |
39
|
|
|
$this->size = $size; |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @param array $nodes |
44
|
|
|
*/ |
45
|
|
|
public function initialize(array $nodes) |
46
|
|
|
{ |
47
|
|
|
if ( ! extension_loaded('pthreads')) { |
48
|
|
|
throw new RuntimeException('Cannot use the Parallel processor without the pthreads extension.'); |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
parent::initialize($nodes); |
52
|
|
|
|
53
|
|
|
$this->validateNodes(); |
54
|
|
|
$this->buildProcessingNodes(); |
55
|
|
|
$this->prepareForProcessing(); |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @return void |
60
|
|
|
*/ |
61
|
|
View Code Duplication |
private function validateNodes() |
|
|
|
|
62
|
|
|
{ |
63
|
|
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
64
|
|
|
if (empty($nodes)) { |
65
|
|
|
throw new InvalidArgumentException('A node should have at least a callable function'); |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
$callable = $nodes[0]; |
69
|
|
|
if ( ! is_callable($callable)) { |
70
|
|
|
throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Expected type callable as first array value, but got type ' . gettype($callable) . '.'); |
71
|
|
|
} |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
$identifiers = array_keys($this->nodes); |
75
|
|
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
76
|
|
|
$testedNodes = array_slice($nodes, 1); |
77
|
|
|
$missingNodes = array_diff($testedNodes, $identifiers); |
78
|
|
|
if ( ! empty($missingNodes)) { |
79
|
|
|
throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Nodes [' . implode(', ', $missingNodes) . '] do not exist. Cannot create a graph to missing nodes.'); |
80
|
|
|
} |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @return void |
86
|
|
|
*/ |
87
|
|
View Code Duplication |
private function buildProcessingNodes() |
|
|
|
|
88
|
|
|
{ |
89
|
|
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
90
|
|
|
$callable = $nodes[0]; |
91
|
|
|
$this->processingNodes[$nodeIndex] = new Node($callable); |
92
|
|
|
} |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
/** |
96
|
|
|
* @return void |
97
|
|
|
*/ |
98
|
|
View Code Duplication |
private function prepareForProcessing() |
|
|
|
|
99
|
|
|
{ |
100
|
|
|
$dependents = []; |
101
|
|
|
$dependencies = []; |
102
|
|
|
$dependencyCount = []; |
103
|
|
|
foreach ($this->nodes as $nodeIndex => $nodes) { |
104
|
|
|
if ( ! array_key_exists($nodeIndex, $dependents)) { |
105
|
|
|
$dependents[$nodeIndex] = []; |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
$nodeDependencies = array_splice($nodes, 1); |
109
|
|
|
foreach ($nodeDependencies as $dependency) { |
110
|
|
|
$dependents[$dependency][] = $nodeIndex; |
111
|
|
|
} |
112
|
|
|
$dependencies[$nodeIndex] = array_flip($nodeDependencies); |
113
|
|
|
$dependencyCount[$nodeIndex] = count($nodeDependencies); |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
while ( ! empty($dependencyCount)) { |
117
|
|
|
// Order the nodes by their number of unresolved dependencies so that we may |
118
|
|
|
// indicate that the nodes which have no unresolved dependencies left are ready to be processed. |
119
|
|
|
asort($dependencyCount); |
120
|
|
|
|
121
|
|
|
if (reset($dependencyCount) !== 0) { |
122
|
|
|
throw new RuntimeException('Cannot create a sequence of execution for this graph.'); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
$iteration =& $this->processIteration[]; |
126
|
|
|
foreach ($dependencyCount as $nodeIndex => $count) { |
127
|
|
|
if ($count !== 0) { |
128
|
|
|
break; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
assert(empty($dependencies[$nodeIndex])); |
132
|
|
|
assert($dependencyCount[$nodeIndex] === 0); |
133
|
|
|
|
134
|
|
|
$iteration[] = $nodeIndex; |
135
|
|
|
|
136
|
|
|
unset($dependencies[$nodeIndex]); |
137
|
|
|
unset($dependencyCount[$nodeIndex]); |
138
|
|
|
foreach ($dependents[$nodeIndex] as $dependent) { |
139
|
|
|
unset($dependencies[$dependent][$nodeIndex]); |
140
|
|
|
--$dependencyCount[$dependent]; |
141
|
|
|
} |
142
|
|
|
unset($dependents[$nodeIndex]); |
143
|
|
|
} |
144
|
|
|
} |
145
|
|
|
} |
146
|
|
|
|
147
|
|
|
/** |
148
|
|
|
* @param array $arguments |
149
|
|
|
* @return \tomzx\Dataflow\Result |
150
|
|
|
*/ |
151
|
|
|
public function process(array $arguments) |
152
|
|
|
{ |
153
|
|
|
$pool = new Pool($this->size, Worker::class); |
154
|
|
|
|
155
|
|
|
/** @var \tomzx\Dataflow\Node $node */ |
156
|
|
|
foreach ($this->processIteration as $nodes) { |
157
|
|
|
foreach ($nodes as $nodeIndex) { |
158
|
|
|
$node = $this->processingNodes[$nodeIndex]; |
159
|
|
|
$arguments = $this->getArguments($nodeIndex) ?: $arguments; |
160
|
|
|
|
161
|
|
|
$pool->submit(new Job($node, $arguments)); |
162
|
|
|
} |
163
|
|
|
// TODO: Improve parallelization by creating threads based on dependency resolution |
164
|
|
|
// instead of by iteration <[email protected]> |
165
|
|
|
$pool->shutdown(); |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
return new Result($this->processingNodes); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @param int|string $nodeIndex |
173
|
|
|
* @return array |
174
|
|
|
*/ |
175
|
|
View Code Duplication |
private function getArguments($nodeIndex) |
|
|
|
|
176
|
|
|
{ |
177
|
|
|
$argumentsIndex = array_slice($this->nodes[$nodeIndex], 1); |
178
|
|
|
$arguments = []; |
179
|
|
|
foreach ($argumentsIndex as $nodeIndex) { |
180
|
|
|
$arguments[] = $this->processingNodes[$nodeIndex]->output(); |
181
|
|
|
} |
182
|
|
|
return $arguments; |
183
|
|
|
} |
184
|
|
|
} |
185
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.