Parallel::prepareForProcessing()   C
last analyzed

Complexity

Conditions 9
Paths 20

Size

Total Lines 48
Code Lines 29

Duplication

Lines 48
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 48
loc 48
ccs 0
cts 39
cp 0
rs 5.5102
cc 9
eloc 29
nc 20
nop 0
crap 90
1
<?php
2
3
namespace tomzx\Dataflow\Processor;
4
5
use InvalidArgumentException;
6
use Pool;
7
use RuntimeException;
8
use tomzx\Dataflow\Node;
9
use tomzx\Dataflow\Processor\Parallel\Job;
10
use tomzx\Dataflow\Result;
11
use Worker;
12
13
// TODO: Remove duplicate code from the Sequential processor <[email protected]>
14
class Parallel extends Processor {
15
	/**
16
	 * Node containing the callable and its result after an iteration of call to process().
17
	 *
18
	 * @var array
19
	 */
20
	private $processingNodes = [];
21
22
	/**
23
	 * The iteration at which they can be processed (useful for parallelization).
24
	 *
25
	 * @var array
26
	 */
27
	private $processIteration = [];
28
29
	/**
30
	 * @var int
31
	 */
32
	private $size;
33
34
	/**
35
	 * @param int $size
36
	 */
37
	public function __construct($size = 5)
38
	{
39
		$this->size = $size;
40
	}
41
42
	/**
43
	 * @param array $nodes
44
	 */
45
	public function initialize(array $nodes)
46
	{
47
		if ( ! extension_loaded('pthreads')) {
48
			throw new RuntimeException('Cannot use the Parallel processor without the pthreads extension.');
49
		}
50
51
		parent::initialize($nodes);
52
53
		$this->validateNodes();
54
		$this->buildProcessingNodes();
55
		$this->prepareForProcessing();
56
	}
57
58
	/**
59
	 * @return void
60
	 */
61 View Code Duplication
	private function validateNodes()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
62
	{
63
		foreach ($this->nodes as $nodeIndex => $nodes) {
64
			if (empty($nodes)) {
65
				throw new InvalidArgumentException('A node should have at least a callable function');
66
			}
67
68
			$callable = $nodes[0];
69
			if ( ! is_callable($callable)) {
70
				throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Expected type callable as first array value, but got type ' . gettype($callable) . '.');
71
			}
72
		}
73
74
		$identifiers = array_keys($this->nodes);
75
		foreach ($this->nodes as $nodeIndex => $nodes) {
76
			$testedNodes = array_slice($nodes, 1);
77
			$missingNodes = array_diff($testedNodes, $identifiers);
78
			if ( ! empty($missingNodes)) {
79
				throw new InvalidArgumentException('Node index = ' . $nodeIndex . '. Nodes [' . implode(', ', $missingNodes) . '] do not exist. Cannot create a graph to missing nodes.');
80
			}
81
		}
82
	}
83
84
	/**
85
	 * @return void
86
	 */
87 View Code Duplication
	private function buildProcessingNodes()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
88
	{
89
		foreach ($this->nodes as $nodeIndex => $nodes) {
90
			$callable = $nodes[0];
91
			$this->processingNodes[$nodeIndex] = new Node($callable);
92
		}
93
	}
94
95
	/**
96
	 * @return void
97
	 */
98 View Code Duplication
	private function prepareForProcessing()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
99
	{
100
		$dependents = [];
101
		$dependencies = [];
102
		$dependencyCount = [];
103
		foreach ($this->nodes as $nodeIndex => $nodes) {
104
			if ( ! array_key_exists($nodeIndex, $dependents)) {
105
				$dependents[$nodeIndex] = [];
106
			}
107
108
			$nodeDependencies = array_splice($nodes, 1);
109
			foreach ($nodeDependencies as $dependency) {
110
				$dependents[$dependency][] = $nodeIndex;
111
			}
112
			$dependencies[$nodeIndex] = array_flip($nodeDependencies);
113
			$dependencyCount[$nodeIndex] = count($nodeDependencies);
114
		}
115
116
		while ( ! empty($dependencyCount)) {
117
			// Order the nodes by their number of unresolved dependencies so that we may
118
			// indicate that the nodes which have no unresolved dependencies left are ready to be processed.
119
			asort($dependencyCount);
120
121
			if (reset($dependencyCount) !== 0) {
122
				throw new RuntimeException('Cannot create a sequence of execution for this graph.');
123
			}
124
125
			$iteration =& $this->processIteration[];
126
			foreach ($dependencyCount as $nodeIndex => $count) {
127
				if ($count !== 0) {
128
					break;
129
				}
130
131
				assert(empty($dependencies[$nodeIndex]));
132
				assert($dependencyCount[$nodeIndex] === 0);
133
134
				$iteration[] = $nodeIndex;
135
136
				unset($dependencies[$nodeIndex]);
137
				unset($dependencyCount[$nodeIndex]);
138
				foreach ($dependents[$nodeIndex] as $dependent) {
139
					unset($dependencies[$dependent][$nodeIndex]);
140
					--$dependencyCount[$dependent];
141
				}
142
				unset($dependents[$nodeIndex]);
143
			}
144
		}
145
	}
146
147
	/**
148
	 * @param array $arguments
149
	 * @return \tomzx\Dataflow\Result
150
	 */
151
	public function process(array $arguments)
152
	{
153
		$pool = new Pool($this->size, Worker::class);
154
155
		/** @var \tomzx\Dataflow\Node $node */
156
		foreach ($this->processIteration as $nodes) {
157
			foreach ($nodes as $nodeIndex) {
158
				$node = $this->processingNodes[$nodeIndex];
159
				$arguments = $this->getArguments($nodeIndex) ?: $arguments;
160
161
				$pool->submit(new Job($node, $arguments));
162
			}
163
			// TODO: Improve parallelization by creating threads based on dependency resolution
164
			// instead of by iteration <[email protected]>
165
			$pool->shutdown();
166
		}
167
168
		return new Result($this->processingNodes);
169
	}
170
171
	/**
172
	 * @param int|string $nodeIndex
173
	 * @return array
174
	 */
175 View Code Duplication
	private function getArguments($nodeIndex)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
176
	{
177
		$argumentsIndex = array_slice($this->nodes[$nodeIndex], 1);
178
		$arguments = [];
179
		foreach ($argumentsIndex as $nodeIndex) {
180
			$arguments[] = $this->processingNodes[$nodeIndex]->output();
181
		}
182
		return $arguments;
183
	}
184
}
185