Completed
Pull Request — master (#2)
by Alexander
02:18
created

ProcessIterator::setUpdateInterval()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * This file is part of the ProcessIterator library.
4
 * For the full copyright and license information, please view
5
 * the LICENSE file that was distributed with this source code.
6
 *
7
 * @copyright Alexander Obuhovich <[email protected]>
8
 * @link      https://github.com/console-helpers/process-iterator
9
 */
10
11
namespace ConsoleHelpers\ProcessIterator;
12
13
14
use Symfony\Component\Process\Process;
15
16
/**
17
 * ProcessIterator aggregates processes and allows you to respond to them
18
 * in the order they are executed. This is useful because it minimizes the amount of
19
 * time your program spends waiting on parallel processes.
20
 *
21
 *   $processes = array(
22
 *     'a.txt' => new \Symfony\Component\Process\Process('wc -c a.txt'),
23
 *     'b.txt' => new \Symfony\Component\Process\Process('wc -c b.txt'),
24
 *     'c.txt' => new \Symfony\Component\Process\Process('wc -c c.txt'),
25
 *   );
26
 *
27
 *   foreach (new ProcessIterator($processes) as $key => $process) {
28
 *     // IMPORTANT: Keys are preserved, but the order of elements is not.
29
 *     // Iteration is done over the processes in the order they are executed,
30
 *     // so the fastest process is the one you'll get first. This allows you
31
 *     // to start doing followup processing as soon as possible.
32
 *
33
 *     $stderr = $process->getErrorOutput();
34
 *     $stdout = $process->getOutput();
35
 *     do_some_processing($stdout);
36
 *   }
37
 */
38
class ProcessIterator implements \Iterator
39
{
40
41
	/**
42
	 * Key of the processes, that are waiting to be executed.
43
	 *
44
	 * @var array
45
	 */
46
	protected $waitingQueue = array();
47
48
	/**
49
	 * Keys of the processes, that are currently running.
50
	 *
51
	 * @var array
52
	 */
53
	protected $runningQueue = array();
54
55
	/**
56
	 * Processes to iterate over.
57
	 *
58
	 * @var Process[]
59
	 */
60
	protected $processes = array();
61
62
	/**
63
	 * Last exception, thrown by each process.
64
	 *
65
	 * @var \Exception[]
66
	 */
67
	protected $exceptions = array();
68
69
	/**
70
	 * Current process key.
71
	 *
72
	 * @var mixed
73
	 */
74
	protected $key;
75
76
	/**
77
	 * Maximal number of simultaneously executing processes.
78
	 *
79
	 * @var integer
80
	 */
81
	protected $limit;
82
83
	/**
84
	 * Maximal amount of time to wait before iterator will yield the result.
85
	 *
86
	 * @var integer
87
	 */
88
	protected $timeout;
89
90
	/**
91
	 * Result waiting timeout was reached.
92
	 *
93
	 * @var boolean
94
	 */
95
	protected $isTimeout = false;
96
97
	/**
98
	 * Create a new iterator over a list of processes.
99
	 *
100
	 * @param array $processes List of processes to execute.
101
	 *
102
	 * @throws \InvalidArgumentException When unknown elements are present in $processes array.
103
	 */
104
	public function __construct(array $processes)
105
	{
106 11
		$filtered_processes = array_filter($processes, function ($process) {
107 11
			return ($process instanceof Process) && !$process->isRunning();
108 11
		});
109
110 11
		if ( count($filtered_processes) !== count($processes) ) {
111 2
			throw new \InvalidArgumentException(sprintf(
112 2
				'The $processes argument must be an array of non-running instances of "%s" class.',
113
				'\Symfony\Component\Process\Process'
114 2
			));
115
		}
116
117 9
		$this->processes = $processes;
118 9
	}
119
120
	/**
121
	 * Block until all processes have executed.
122
	 *
123
	 * @return self
124
	 */
125 1
	public function runAll()
126
	{
127 1
		foreach ( $this as $process ) {
128 1
			$process->wait();
129 1
		}
130
131 1
		return $this;
132
	}
133
134
	/**
135
	 * Add another process to the set of processes. This is useful if you have a
136
	 * set of processes to run mostly in parallel, but some processes depend on
137
	 * others.
138
	 *
139
	 * @param Process $process Process to add to iterator.
140
	 * @param mixed   $key     Key.
141
	 *
142
	 * @return self
143
	 * @throws \InvalidArgumentException When given key is already in use.
144
	 */
145 4
	public function addProcess(Process $process, $key = null)
146
	{
147 4
		if ( $key === null ) {
148 1
			$this->processes[] = $process;
149 1
			end($this->processes);
150 1
			$this->waitingQueue[] = key($this->processes);
151 1
		}
152
		else {
153 3
			if ( !isset($this->processes[$key]) ) {
154 1
				$this->processes[$key] = $process;
155 1
				$this->waitingQueue[] = $key;
156 1
			}
157
			else {
158 2
				throw new \InvalidArgumentException('The "' . $key . '" key is already in use.');
159
			}
160
		}
161
162
		// Start running the process if we don't have $this->limit processes running already.
163 2
		$this->updateWorkingSet();
164
165 2
		return $this;
166
	}
167
168
	/**
169
	 * Set a maximum amount of time you want to wait before the iterator will
170
	 * yield a result. If no process has executed yet, the iterator will yield
171
	 * empty string for key and null for value. Among other potential uses, you
172
	 * can use this to show some busy indicator:
173
	 *   $processes = (new ProcessIterator($processes))
174
	 *     ->setUpdateInterval(1);
175
	 *   foreach ($processes as $process) {
176
	 *     if ($process === null) {
177
	 *       echo "Still working...\n";
178
	 *     } else {
179
	 *       // ...
180
	 *     }
181
	 *   }
182
	 * This will echo "Still working..." once per second as long as processes are
183
	 * resolving. By default, ProcessIterator never yields null.
184
	 *
185
	 * @param float $interval Maximum number of seconds to block waiting on processes before yielding null.
186
	 *
187
	 * @return self
188
	 */
189 1
	public function setUpdateInterval($interval)
190
	{
191 1
		$this->timeout = $interval;
0 ignored issues
show
Documentation Bug introduced by
The property $timeout was declared of type integer, but $interval is of type double. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
192
193 1
		return $this;
194
	}
195
196
	/**
197
	 * Limit the number of simultaneously executing processes.
198
	 *  $processes = (new ProcessIterator($processes))
199
	 *    ->limit(4);
200
	 *  foreach ($processes as $process) {
201
	 *    // Run no more than 4 processes simultaneously.
202
	 *  }
203
	 *
204
	 * @param integer $max Maximum number of simultaneously running processes allowed.
205
	 *
206
	 * @return self
207
	 */
208 3
	public function limit($max)
209
	{
210 3
		$this->limit = $max;
211
212 3
		return $this;
213
	}
214
215
	/**
216
	 * Rewind the Iterator to the first element.
217
	 *
218
	 * @return void
219
	 */
220 7
	public function rewind()
221
	{
222 7
		$this->waitingQueue = array_keys($this->processes);
223 7
		$this->runningQueue = array();
224 7
		$this->updateWorkingSet();
225 7
		$this->next();
226 7
	}
227
228
	/**
229
	 * Move forward to next element.
230
	 *
231
	 * @return void
232
	 */
233 7
	public function next()
234
	{
235 7
		$this->key = null;
236
237 7
		if ( !count($this->waitingQueue) ) {
238 7
			return;
239
		}
240
241 7
		$start = microtime(true);
242 7
		$timeout = $this->timeout;
243 7
		$this->isTimeout = false;
244
245 7
		$executed_index = null;
246
247
		do {
248 7
			foreach ( $this->runningQueue as $index => $process_key ) {
249 7
				$process = $this->processes[$process_key];
250
251
				try {
252 7
					$process->checkTimeout();
253
254 7
					if ( $process->isTerminated() ) {
255 7
						if ( $executed_index === null ) {
256 7
							$executed_index = $index;
257 7
						}
258
259 7
						continue;
260
					}
261
				}
262 7
				catch ( \Exception $exception ) {
263 1
					$this->setProcessException($process_key, $exception);
264 1
					$executed_index = $index;
265 1
					break;
266
				}
267 7
			}
268
269 7
			if ( $executed_index === null ) {
270
				// Check for a setUpdateInterval() timeout.
271 7
				if ( $timeout !== null ) {
272 1
					$elapsed = microtime(true) - $start;
273
274 1
					if ( $elapsed > $timeout ) {
275 1
						$this->isTimeout = true;
276
277 1
						return;
278
					}
279 1
				}
280
281 7
				usleep(1000);
282 7
			}
283 7
		} while ( $executed_index === null );
284
285 7
		$this->key = $this->waitingQueue[$executed_index];
286 7
		unset($this->waitingQueue[$executed_index]);
287 7
		$this->updateWorkingSet();
288 7
	}
289
290
	/**
291
	 * Remembers exception, associated with a process.
292
	 *
293
	 * @param mixed      $key       Process key.
294
	 * @param \Exception $exception Exception.
295
	 *
296
	 * @return void
297
	 */
298 1
	protected function setProcessException($key, \Exception $exception)
299
	{
300 1
		$this->exceptions[$key] = $exception;
301 1
	}
302
303
	/**
304
	 * Gets exception, associated with a process.
305
	 *
306
	 * @return \Exception|null
307
	 */
308 1
	public function getProcessException()
309
	{
310 1
		$key = $this->key();
311
312 1
		return isset($this->exceptions[$key]) ? $this->exceptions[$key] : null;
313
	}
314
315
	/**
316
	 * Return the current element.
317
	 *
318
	 * @return Process|null
319
	 */
320 7
	public function current()
321
	{
322 7
		if ( $this->isTimeout ) {
323 1
			return null;
324
		}
325
326 7
		return $this->processes[$this->key];
327
	}
328
329
	/**
330
	 * Return the key of the current element.
331
	 *
332
	 * @return mixed|null
333
	 */
334 5
	public function key()
335
	{
336 5
		if ( $this->isTimeout ) {
337
			// Returning "null" only works since PHP 5.5, so return empty string instead.
338 1
			return '';
339
		}
340
341 5
		return $this->key;
342
	}
343
344
	/**
345
	 * Checks if current position is valid.
346
	 *
347
	 * @return boolean
348
	 */
349 7
	public function valid()
350
	{
351 7
		if ( $this->isTimeout ) {
352 1
			return true;
353
		}
354
355 7
		return ($this->key !== null);
356
	}
357
358
	/**
359
	 * Ensure, that needed number of processes are running in parallel.
360
	 *
361
	 * @return void
362
	 */
363 7
	protected function updateWorkingSet()
364
	{
365 7
		$old_running_queue = $this->runningQueue;
366
367 7
		if ( $this->limit ) {
368 3
			$this->runningQueue = array_slice($this->waitingQueue, 0, $this->limit, true);
369 3
		}
370
		else {
371 4
			$this->runningQueue = $this->waitingQueue;
372
		}
373
374
		// Start processes, that were just added to the running queue.
375 7
		foreach ( $this->runningQueue as $index => $process_key ) {
376 7
			if ( !isset($old_running_queue[$index]) ) {
377 7
				$this->processes[$process_key]->start();
378 7
			}
379 7
		}
380 7
	}
381
382
}
383