Failed Conditions
Push — master ( 76937d...f7687f )
by Alexander
03:05
created

ProcessIterator   B

Complexity

Total Complexity 36

Size/Duplication

Total Lines 359
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 36
c 3
b 0
f 0
lcom 1
cbo 2
dl 0
loc 359
ccs 107
cts 107
cp 1
rs 8.8

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 16 3
A runAll() 0 8 2
A addProcess() 0 22 3
A setUpdateInterval() 0 6 1
A limit() 0 6 1
A rewind() 0 7 1
C next() 0 61 12
A setProcessException() 0 4 1
A getProcessException() 0 6 2
A current() 0 8 2
A key() 0 9 2
A valid() 0 8 2
A updateWorkingSet() 0 18 4
1
<?php
2
/**
3
 * This file is part of the ProcessIterator library.
4
 * For the full copyright and license information, please view
5
 * the LICENSE file that was distributed with this source code.
6
 *
7
 * @copyright Alexander Obuhovich <[email protected]>
8
 * @link      https://github.com/console-helpers/process-iterator
9
 */
10
11
namespace ConsoleHelpers\ProcessIterator;
12
13
14
use Symfony\Component\Process\Exception\ProcessFailedException;
15
use Symfony\Component\Process\Process;
16
17
/**
18
 * ProcessIterator aggregates processes and allows you to respond to them
19
 * in the order they are executed. This is useful because it minimizes the amount of
20
 * time your program spends waiting on parallel processes.
21
 *
22
 *   $processes = array(
23
 *     'a.txt' => new \Symfony\Component\Process\Process('wc -c a.txt'),
24
 *     'b.txt' => new \Symfony\Component\Process\Process('wc -c b.txt'),
25
 *     'c.txt' => new \Symfony\Component\Process\Process('wc -c c.txt'),
26
 *   );
27
 *
28
 *   foreach (new ProcessIterator($processes) as $key => $process) {
29
 *     // IMPORTANT: Keys are preserved, but the order of elements is not.
30
 *     // Iteration is done over the processes in the order they are executed,
31
 *     // so the fastest process is the one you'll get first. This allows you
32
 *     // to start doing followup processing as soon as possible.
33
 *
34
 *     $stderr = $process->getErrorOutput();
35
 *     $stdout = $process->getOutput();
36
 *     do_some_processing($stdout);
37
 *   }
38
 */
39
class ProcessIterator implements \Iterator
40
{
41
42
	/**
43
	 * Key of the processes, that are waiting to be executed.
44
	 *
45
	 * @var array
46
	 */
47
	protected $waitingQueue = array();
48
49
	/**
50
	 * Keys of the processes, that are currently running.
51
	 *
52
	 * @var array
53
	 */
54
	protected $runningQueue = array();
55
56
	/**
57
	 * Processes to iterate over.
58
	 *
59
	 * @var Process[]
60
	 */
61
	protected $processes = array();
62
63
	/**
64
	 * All of the processes must be executed successfully.
65
	 *
66
	 * @var boolean
67
	 */
68
	protected $mustRun = false;
69
70
	/**
71
	 * Last exception, thrown by each process.
72
	 *
73
	 * @var \Exception[]
74
	 */
75
	protected $exceptions = array();
76
77
	/**
78
	 * Current process key.
79
	 *
80
	 * @var mixed
81
	 */
82
	protected $key;
83
84
	/**
85
	 * Maximal number of simultaneously executing processes.
86
	 *
87
	 * @var integer
88
	 */
89
	protected $limit;
90
91
	/**
92
	 * Maximal amount of time to wait before iterator will yield the result.
93
	 *
94
	 * @var float
95
	 */
96
	protected $timeout;
97
98
	/**
99
	 * Result waiting timeout was reached.
100
	 *
101
	 * @var boolean
102
	 */
103
	protected $isTimeout = false;
104
105
	/**
106
	 * Create a new iterator over a list of processes.
107
	 *
108
	 * @param array   $processes List of processes to execute.
109
	 * @param boolean $must_run  All of the processes must be executed successfully.
110
	 *
111
	 * @throws \InvalidArgumentException When unknown elements are present in $processes array.
112
	 */
113
	public function __construct(array $processes, $must_run = false)
114
	{
115 14
		$filtered_processes = array_filter($processes, function ($process) {
116 14
			return ($process instanceof Process) && !$process->isRunning();
117 14
		});
118
119 14
		if ( count($filtered_processes) !== count($processes) ) {
120 2
			throw new \InvalidArgumentException(sprintf(
121 2
				'The $processes argument must be an array of non-running instances of "%s" class.',
122
				'\Symfony\Component\Process\Process'
123 2
			));
124
		}
125
126 12
		$this->processes = $processes;
127 12
		$this->mustRun = $must_run;
128 12
	}
129
130
	/**
131
	 * Block until all processes have executed.
132
	 *
133
	 * @return self
134
	 */
135 1
	public function runAll()
136
	{
137 1
		foreach ( $this as $process ) {
138 1
			$process->wait();
139 1
		}
140
141 1
		return $this;
142
	}
143
144
	/**
145
	 * Add another process to the set of processes. This is useful if you have a
146
	 * set of processes to run mostly in parallel, but some processes depend on
147
	 * others.
148
	 *
149
	 * @param Process $process Process to add to iterator.
150
	 * @param mixed   $key     Key.
151
	 *
152
	 * @return self
153
	 * @throws \InvalidArgumentException When given key is already in use.
154
	 */
155 4
	public function addProcess(Process $process, $key = null)
156
	{
157 4
		if ( $key === null ) {
158 1
			$this->processes[] = $process;
159 1
			end($this->processes);
160 1
			$this->waitingQueue[] = key($this->processes);
161 1
		}
162
		else {
163 3
			if ( !isset($this->processes[$key]) ) {
164 1
				$this->processes[$key] = $process;
165 1
				$this->waitingQueue[] = $key;
166 1
			}
167
			else {
168 2
				throw new \InvalidArgumentException('The "' . $key . '" key is already in use.');
169
			}
170
		}
171
172
		// Start running the process if we don't have $this->limit processes running already.
173 2
		$this->updateWorkingSet();
174
175 2
		return $this;
176
	}
177
178
	/**
179
	 * Set a maximum amount of time you want to wait before the iterator will
180
	 * yield a result. If no process has executed yet, the iterator will yield
181
	 * empty string for key and null for value. Among other potential uses, you
182
	 * can use this to show some busy indicator:
183
	 *   $processes = (new ProcessIterator($processes))
184
	 *     ->setUpdateInterval(1);
185
	 *   foreach ($processes as $process) {
186
	 *     if ($process === null) {
187
	 *       echo "Still working...\n";
188
	 *     } else {
189
	 *       // ...
190
	 *     }
191
	 *   }
192
	 * This will echo "Still working..." once per second as long as processes are
193
	 * resolving. By default, ProcessIterator never yields null.
194
	 *
195
	 * @param float $interval Maximum number of seconds to block waiting on processes before yielding null.
196
	 *
197
	 * @return self
198
	 */
199 1
	public function setUpdateInterval($interval)
200
	{
201 1
		$this->timeout = $interval;
202
203 1
		return $this;
204
	}
205
206
	/**
207
	 * Limit the number of simultaneously executing processes.
208
	 *  $processes = (new ProcessIterator($processes))
209
	 *    ->limit(4);
210
	 *  foreach ($processes as $process) {
211
	 *    // Run no more than 4 processes simultaneously.
212
	 *  }
213
	 *
214
	 * @param integer $max Maximum number of simultaneously running processes allowed.
215
	 *
216
	 * @return self
217
	 */
218 3
	public function limit($max)
219
	{
220 3
		$this->limit = $max;
221
222 3
		return $this;
223
	}
224
225
	/**
226
	 * Rewind the Iterator to the first element.
227
	 *
228
	 * @return void
229
	 */
230 10
	public function rewind()
231
	{
232 10
		$this->waitingQueue = array_keys($this->processes);
233 10
		$this->runningQueue = array();
234 10
		$this->updateWorkingSet();
235 10
		$this->next();
236 10
	}
237
238
	/**
239
	 * Move forward to next element.
240
	 *
241
	 * @return void
242
	 */
0 ignored issues
show
Coding Style Documentation introduced by
Missing @throws tag in function comment
Loading history...
243 10
	public function next()
244
	{
245 10
		$this->key = null;
246
247 10
		if ( !count($this->waitingQueue) ) {
248 10
			return;
249
		}
250
251 10
		$start = microtime(true);
252 10
		$timeout = $this->timeout;
253 10
		$this->isTimeout = false;
254
255 10
		$executed_index = null;
256
257
		do {
258 10
			foreach ( $this->runningQueue as $index => $process_key ) {
259 10
				$process = $this->processes[$process_key];
260
261
				try {
262 10
					$process->checkTimeout();
263
264 10
					if ( $process->isTerminated() ) {
265
						// Mimics behavior of "Process::mustRun" method.
266 10
						if ( $this->mustRun && $process->getExitCode() !== 0 ) {
267 1
							throw new ProcessFailedException($process);
268
						}
269
270 10
						if ( $executed_index === null ) {
271 10
							$executed_index = $index;
272 10
						}
273
274 10
						continue;
275
					}
276
				}
277 8
				catch ( \Exception $exception ) {
278 2
					$this->setProcessException($process_key, $exception);
279 2
					$executed_index = $index;
280 2
					break;
281
				}
282 10
			}
283
284 10
			if ( $executed_index === null ) {
285
				// Check for a setUpdateInterval() timeout.
286 7
				if ( $timeout !== null ) {
287 1
					$elapsed = microtime(true) - $start;
288
289 1
					if ( $elapsed > $timeout ) {
290 1
						$this->isTimeout = true;
291
292 1
						return;
293
					}
294 1
				}
295
296 7
				usleep(1000);
297 7
			}
298 10
		} while ( $executed_index === null );
299
300 10
		$this->key = $this->waitingQueue[$executed_index];
301 10
		unset($this->waitingQueue[$executed_index]);
302 10
		$this->updateWorkingSet();
303 10
	}
304
305
	/**
306
	 * Remembers exception, associated with a process.
307
	 *
308
	 * @param mixed      $key       Process key.
309
	 * @param \Exception $exception Exception.
310
	 *
311
	 * @return void
312
	 */
313 2
	protected function setProcessException($key, \Exception $exception)
314
	{
315 2
		$this->exceptions[$key] = $exception;
316 2
	}
317
318
	/**
319
	 * Gets exception, associated with a process.
320
	 *
321
	 * @return \Exception|null
322
	 */
323 4
	public function getProcessException()
324
	{
325 4
		$key = $this->key();
326
327 4
		return isset($this->exceptions[$key]) ? $this->exceptions[$key] : null;
328
	}
329
330
	/**
331
	 * Return the current element.
332
	 *
333
	 * @return Process|null
334
	 */
335 10
	public function current()
336
	{
337 10
		if ( $this->isTimeout ) {
338 1
			return null;
339
		}
340
341 10
		return $this->processes[$this->key];
342
	}
343
344
	/**
345
	 * Return the key of the current element.
346
	 *
347
	 * @return mixed|null
348
	 */
349 8
	public function key()
350
	{
351 8
		if ( $this->isTimeout ) {
352
			// Returning "null" only works since PHP 5.5, so return empty string instead.
353 1
			return '';
354
		}
355
356 8
		return $this->key;
357
	}
358
359
	/**
360
	 * Checks if current position is valid.
361
	 *
362
	 * @return boolean
363
	 */
364 10
	public function valid()
365
	{
366 10
		if ( $this->isTimeout ) {
367 1
			return true;
368
		}
369
370 10
		return ($this->key !== null);
371
	}
372
373
	/**
374
	 * Ensure, that needed number of processes are running in parallel.
375
	 *
376
	 * @return void
377
	 */
378 10
	protected function updateWorkingSet()
379
	{
380 10
		$old_running_queue = $this->runningQueue;
381
382 10
		if ( $this->limit ) {
383 3
			$this->runningQueue = array_slice($this->waitingQueue, 0, $this->limit, true);
384 3
		}
385
		else {
386 7
			$this->runningQueue = $this->waitingQueue;
387
		}
388
389
		// Start processes, that were just added to the running queue.
390 10
		foreach ( $this->runningQueue as $index => $process_key ) {
391 10
			if ( !isset($old_running_queue[$index]) ) {
392 10
				$this->processes[$process_key]->start();
393 10
			}
394 10
		}
395 10
	}
396
397
}
398