Passed
Pull Request — master (#16)
by Harry
07:40 queued 02:51
created

Pool::getExceptions()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
cc 1
nc 1
nop 0
crap 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Exception;
17
use Graze\DataStructure\Collection\Collection;
18
use Graze\ParallelProcess\Event\EventDispatcherTrait;
19
use Graze\ParallelProcess\Event\PoolRunEvent;
20
use Graze\ParallelProcess\Event\RunEvent;
21
use Graze\ParallelProcess\Exceptions\NotRunningException;
22
use InvalidArgumentException;
23
use Symfony\Component\Process\Process;
24
use Throwable;
25
26
class Pool extends Collection implements RunInterface
27
{
28
    use EventDispatcherTrait;
29
30
    const CHECK_INTERVAL = 0.1;
31
    const NO_MAX         = -1;
32
33
    /** @var RunInterface[] */
34
    protected $items = [];
35
    /** @var RunInterface[] */
36
    protected $running = [];
37
    /** @var RunInterface[] */
38
    protected $waiting = [];
39
    /** @var RunInterface[] */
40
    protected $finished = [];
41
    /** @var float */
42
    private $started;
43
    /** @var float */
44
    private $finishedTime;
45
    /** @var int */
46
    private $maxSimultaneous = -1;
47
    /** @var bool */
48
    private $runInstantly = false;
49
    /** @var string[] */
50
    private $tags;
51
    /** @var Exception[]|Throwable[] */
52
    private $exceptions = [];
53
54
    /**
55
     * Pool constructor.
56
     *
57
     * Set the default callbacks here
58
     *
59
     * @param RunInterface[]|Process[] $items
60
     * @param int                      $maxSimultaneous Maximum number of simulatneous processes
61
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
62
     *                                                  running
63
     * @param array                    $tags
64
     */
65 33
    public function __construct(
66
        array $items = [],
67
        $maxSimultaneous = self::NO_MAX,
68
        $runInstantly = false,
69
        array $tags = []
70
    ) {
71 33
        parent::__construct($items);
72
73 33
        $this->maxSimultaneous = $maxSimultaneous;
74 33
        $this->runInstantly = $runInstantly;
75
76 33
        if ($this->runInstantly) {
77 1
            $this->start();
78
        }
79 33
        $this->tags = $tags;
80 33
    }
81
82
    /**
83
     * @return string[]
84
     */
85 49
    protected function getEventNames()
86
    {
87
        return [
88 49
            RunEvent::STARTED,
89 49
            RunEvent::COMPLETED,
90 49
            RunEvent::FAILED,
91 49
            RunEvent::UPDATED,
92 49
            PoolRunEvent::POOL_RUN_ADDED,
93
        ];
94
    }
95
96
    /**
97
     * Add a new process to the pool
98
     *
99
     * @param RunInterface|Process $item
100
     * @param array                $tags If a process is supplied, these are added to create a run.
101
     *                                   This is ignored when adding a run
102
     *
103
     * @return $this
104
     */
105 45
    public function add($item, array $tags = [])
106
    {
107 45
        if ($item instanceof Process) {
108 25
            return $this->addProcess($item, $tags);
109
        }
110
111 45
        if (!$item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
112 1
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
113
        }
114
115 44
        if (!($this->isRunning() || $this->runInstantly) && $item->isRunning()) {
116 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
117
        }
118
119 43
        parent::add($item);
120
121 43
        $this->dispatch(PoolRunEvent::POOL_RUN_ADDED, new PoolRunEvent($this, $item));
122 43
        $item->addListener(
123 43
            RunEvent::FAILED,
124 43
            function (RunEvent $event) {
125 7
                $this->exceptions += $event->getRun()->getExceptions();
126 43
            }
127
        );
128
129 43
        if ($this->isRunning() || $this->runInstantly) {
130 4
            $this->startRun($item);
131
        }
132
133 43
        return $this;
134
    }
135
136
    /**
137
     * Add a new process to the pool using the default callbacks
138
     *
139
     * @param Process $process
140
     * @param array   $tags
141
     *
142
     * @return $this
143
     */
144 25
    private function addProcess(Process $process, array $tags = [])
145
    {
146 25
        return $this->add(new Run($process, $tags));
147
    }
148
149
    /**
150
     * Start all the processes running
151
     *
152
     * @return $this
153
     */
154 34
    public function start()
155
    {
156 34
        foreach ($this->items as $run) {
157 34
            $this->startRun($run);
158
        }
159
160 34
        if (count($this->running) > 0) {
161 34
            $this->started = microtime(true);
162 34
            $this->dispatch(RunEvent::STARTED, new RunEvent($this));
163
        }
164
165 34
        return $this;
166
    }
167
168
    /**
169
     * Start a run (or queue it if we are running the maximum number of processes already)
170
     *
171
     * @param RunInterface $run
172
     */
173 36
    private function startRun(RunInterface $run)
174
    {
175 36
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
176 36
            $run->start();
177 36
            $this->running[] = $run;
178 36
            if (is_null($this->started)) {
0 ignored issues
show
introduced by
The condition is_null($this->started) is always false.
Loading history...
179 36
                $this->started = microtime(true);
180 36
                $this->dispatch(RunEvent::STARTED, new RunEvent($this));
181
            }
182
        } else {
183 2
            $this->waiting[] = $run;
184
        }
185 36
    }
186
187
    /**
188
     * Blocking call to run processes;
189
     *
190
     * @param float $checkInterval Seconds between checks
191
     *
192
     * @return bool true if all processes were successful
193
     */
194 30
    public function run($checkInterval = self::CHECK_INTERVAL)
195
    {
196 30
        $this->start();
197
198 30
        $interval = (int) ($checkInterval * 1000000);
199 30
        while ($this->poll()) {
200 30
            usleep($interval);
201
        }
202
203 30
        return $this->isSuccessful();
204
    }
205
206
    /**
207
     * Check when a run has finished, if there are processes waiting, start them
208
     */
209 34
    private function checkFinished()
210
    {
211 34
        if ($this->maxSimultaneous !== static::NO_MAX
212 34
            && count($this->waiting) > 0
213 34
            && count($this->running) < $this->maxSimultaneous) {
214 1
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
215 1
                $run = array_shift($this->waiting);
216 1
                $run->start();
217 1
                $this->running[] = $run;
218
            }
219
        }
220 34
    }
221
222
    /**
223
     * Determine if any item has run
224
     *
225
     * @return bool
226
     */
227 36
    public function hasStarted()
228
    {
229 36
        foreach ($this->items as $run) {
230 35
            if ($run->hasStarted()) {
231 35
                return true;
232
            }
233
        }
234 5
        return false;
235
    }
236
237
    /**
238
     * Are any of the processes running
239
     *
240
     * @return bool
241
     */
242 34
    public function poll()
243
    {
244 34
        foreach ($this->running as $i => $run) {
245 34
            if (!$run->poll()) {
246 34
                $this->finished[] = $run;
247 34
                $this->running[$i] = null;
248
            }
249
        }
250 34
        $this->running = array_filter($this->running);
251
252 34
        $this->checkFinished();
253
254 34
        $this->dispatch(RunEvent::UPDATED, new RunEvent($this));
255
256 34
        if (!$this->isRunning()) {
257 34
            $this->finishedTime = microtime(true);
258 34
            if ($this->isSuccessful()) {
259 26
                $this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
260
            } else {
261 8
                $this->dispatch(RunEvent::FAILED, new RunEvent($this));
262
            }
263 34
            return false;
264
        }
265
266 31
        return true;
267
    }
268
269
    /**
270
     * @return bool
271
     */
272 45
    public function isRunning()
273
    {
274 45
        return count($this->running) > 0;
275
    }
276
277
    /**
278
     * Return if all runs have started and were successful
279
     *
280
     * @return bool
281
     */
282 36
    public function isSuccessful()
283
    {
284 36
        if (!$this->hasStarted()) {
285 3
            return false;
286
        }
287
288 33
        foreach ($this->items as $run) {
289 33
            if (!$run->isSuccessful()) {
290 33
                return false;
291
            }
292
        }
293
294 26
        return true;
295
    }
296
297
    /**
298
     * Get a list of all the currently running runs
299
     *
300
     * @return RunInterface[]
301
     */
302 4
    public function getRunning()
303
    {
304 4
        return $this->running;
305
    }
306
307
    /**
308
     * Get a list of all the current waiting runs
309
     *
310
     * @return RunInterface[]
311
     */
312 4
    public function getWaiting()
313
    {
314 4
        return $this->waiting;
315
    }
316
317
    /**
318
     * Get a list of all the current waiting runs
319
     *
320
     * @return RunInterface[]
321
     */
322 1
    public function getFinished()
323
    {
324 1
        return $this->finished;
325
    }
326
327
    /**
328
     * @return int
329
     */
330 3
    public function getMaxSimultaneous()
331
    {
332 3
        return $this->maxSimultaneous;
333
    }
334
335
    /**
336
     * @param int $maxSimultaneous
337
     *
338
     * @return $this
339
     */
340 2
    public function setMaxSimultaneous($maxSimultaneous)
341
    {
342 2
        $this->maxSimultaneous = $maxSimultaneous;
343 2
        return $this;
344
    }
345
346
    /**
347
     * @return bool
348
     */
349 2
    public function isRunInstantly()
350
    {
351 2
        return $this->runInstantly;
352
    }
353
354
    /**
355
     * @param bool $runInstantly
356
     *
357
     * @return Pool
358
     */
359 3
    public function setRunInstantly($runInstantly)
360
    {
361 3
        $this->runInstantly = $runInstantly;
362 3
        return $this;
363
    }
364
365
    /**
366
     * Get a set of tags associated with this run
367
     *
368
     * @return array
369
     */
370 1
    public function getTags()
371
    {
372 1
        return $this->tags;
373
    }
374
375
    /**
376
     * @return float number of seconds this run has been running or did run for (0 for not started)
377
     */
378 1
    public function getDuration()
379
    {
380 1
        if ($this->isRunning()) {
381 1
            return microtime(true) - $this->started;
382 1
        } elseif (!$this->hasStarted()) {
383 1
            return 0;
384
        }
385 1
        return $this->finishedTime - $this->started;
386
    }
387
388
    /**
389
     * @return float[]|null an array of values of the current position, max, and percentage. null if not applicable
390
     */
391 1
    public function getProgress()
392
    {
393 1
        return [count($this->finished), count($this->items), count($this->finished) / count($this->items)];
394
    }
395
396
    /**
397
     * If the run was unsuccessful, get the error if applicable
398
     *
399
     * @return Exception[]|Throwable[]
400
     */
401 1
    public function getExceptions()
402
    {
403 1
        return $this->exceptions;
404
    }
405
}
406