Passed
Pull Request — master (#15)
by Harry
02:42
created

Pool::isRunInstantly()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 0
cts 2
cp 0
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var callable|null */
39
    protected $onStart;
40
    /** @var int */
41
    private $maxSimultaneous = -1;
42
    /** @var bool */
43
    private $runInstantly = false;
44
45
    /**
46
     * Pool constructor.
47
     *
48
     * Set the default callbacks here
49
     *
50
     * @param RunInterface[]|Process[] $items
51
     * @param callable|null            $onSuccess       function (Process $process, float $duration, string $last,
52
     *                                                  string
53
     *                                                  $lastType) : void
54
     * @param callable|null            $onFailure       function (Process $process, float $duration, string $last,
55
     *                                                  string
56
     *                                                  $lastType) : void
57
     * @param callable|null            $onProgress      function (Process $process, float $duration, string $last,
58
     *                                                  string
59
     *                                                  $lastType) : void
60
     * @param callable|null            $onStart         function (Process $process, float $duration, string $last,
61
     *                                                  string
62
     *                                                  $lastType) : void
63
     * @param int                      $maxSimultaneous Maximum number of simulatneous processes
64
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
65
     *                                                  running
66
     */
67 30
    public function __construct(
68
        array $items = [],
69
        callable $onSuccess = null,
70
        callable $onFailure = null,
71
        callable $onProgress = null,
72
        callable $onStart = null,
73
        $maxSimultaneous = self::NO_MAX,
74
        $runInstantly = false
75
    ) {
76 30
        parent::__construct($items);
77
78 30
        $this->onSuccess = $onSuccess;
79 30
        $this->onFailure = $onFailure;
80 30
        $this->onProgress = $onProgress;
81 30
        $this->onStart = $onStart;
82 30
        $this->maxSimultaneous = $maxSimultaneous;
83 30
        $this->runInstantly = $runInstantly;
84
85 30
        if ($this->runInstantly) {
86 1
            $this->start();
87
        }
88 30
    }
89
90
    /**
91
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last, string $lastType) :
92
     *                                 void
93
     *
94
     * @return $this
95
     */
96 1
    public function setOnSuccess($onSuccess)
97
    {
98 1
        $this->onSuccess = $onSuccess;
99 1
        return $this;
100
    }
101
102
    /**
103
     * @param callable|null $onFailure function (Process $process, float $duration, string $last, string $lastType) :
104
     *                                 void
105
     *
106
     * @return $this
107
     */
108 1
    public function setOnFailure($onFailure)
109
    {
110 1
        $this->onFailure = $onFailure;
111 1
        return $this;
112
    }
113
114
    /**
115
     * @param callable|null $onProgress function (Process $process, float $duration, string $last, string $lastType) :
116
     *                                  void
117
     *
118
     * @return $this
119
     */
120 1
    public function setOnProgress($onProgress)
121
    {
122 1
        $this->onProgress = $onProgress;
123 1
        return $this;
124
    }
125
126
    /**
127
     * @param callable|null $onStart function (Process $process, float $duration, string $last, string $lastType) :
128
     *                               void
129
     *
130
     * @return $this
131
     */
132 1
    public function setOnStart($onStart)
133
    {
134 1
        $this->onStart = $onStart;
135 1
        return $this;
136
    }
137
138
    /**
139
     * Add a new process to the pool
140
     *
141
     * @param RunInterface|Process $item
142
     *
143
     * @return $this
144
     */
145 39
    public function add($item)
146
    {
147 39
        if ($item instanceof Process) {
148 12
            return $this->addProcess($item);
149
        }
150
151 39
        if (!$item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
152 1
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
153
        }
154
155 38
        if (!($this->isRunning() || $this->runInstantly) && $item->isRunning()) {
156 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
157
        }
158
159 37
        parent::add($item);
160
161 37
        if ($this->isRunning() || $this->runInstantly) {
162 4
            $this->startRun($item);
163
        }
164
165 37
        return $this;
166
    }
167
168
    /**
169
     * Add a new process to the pool using the default callbacks
170
     *
171
     * @param Process $process
172
     *
173
     * @return $this
174
     */
175 12
    protected function addProcess(Process $process)
176
    {
177 12
        return $this->add(
178 12
            new Run(
179 12
                $process,
180 12
                $this->onSuccess,
181 12
                $this->onFailure,
182 12
                $this->onProgress,
183 12
                $this->onStart
184
            )
185
        );
186
    }
187
188
    /**
189
     * Start all the processes running
190
     *
191
     * @return $this
192
     */
193 30
    public function start()
194
    {
195 30
        foreach ($this->items as $run) {
196 30
            $this->startRun($run);
197
        }
198
199 30
        return $this;
200
    }
201
202
    /**
203
     * Start a run (or queue it if we are running the maximum number of processes already)
204
     *
205
     * @param RunInterface $run
206
     */
207 32
    private function startRun(RunInterface $run)
208
    {
209 32
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
210 32
            $run->start();
211 32
            $this->running[] = $run;
212
        } else {
213 2
            $this->waiting[] = $run;
214
        }
215 32
    }
216
217
    /**
218
     * Blocking call to run processes;
219
     *
220
     * @param float $checkInterval Seconds between checks
221
     *
222
     * @return bool true if all processes were successful
223
     */
224 26
    public function run($checkInterval = self::CHECK_INTERVAL)
225
    {
226 26
        $this->start();
227 26
        $interval = $checkInterval * 1000000;
228
229 26
        while ($this->poll()) {
230 22
            usleep($interval);
0 ignored issues
show
Bug introduced by
$interval of type double is incompatible with the type integer expected by parameter $micro_seconds of usleep(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

230
            usleep(/** @scrutinizer ignore-type */ $interval);
Loading history...
231
        }
232
233 26
        return $this->isSuccessful();
234
    }
235
236
    /**
237
     * Check when a run has finished, if there are processes waiting, start them
238
     */
239 30
    private function checkFinished()
240
    {
241 30
        if ($this->maxSimultaneous !== static::NO_MAX
242 30
            && count($this->waiting) > 0
243 30
            && count($this->running) < $this->maxSimultaneous) {
244 1
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
245 1
                $run = array_shift($this->waiting);
246 1
                $run->start();
247 1
                $this->running[] = $run;
248
            }
249
        }
250 30
    }
251
252
    /**
253
     * Determine if any item has run
254
     *
255
     * @return bool
256
     */
257 31
    public function hasStarted()
258
    {
259 31
        foreach ($this->items as $run) {
260 30
            if ($run->hasStarted()) {
261 30
                return true;
262
            }
263
        }
264 3
        return false;
265
    }
266
267
    /**
268
     * Are any of the processes running
269
     *
270
     * @return bool
271
     */
272 30
    public function poll()
273
    {
274
        /** @var Run[] $running */
275 30
        $this->running = array_filter(
276 30
            $this->running,
277 30
            function (RunInterface $run) {
278 30
                return $run->poll();
279 30
            }
280
        );
281
282 30
        $this->checkFinished();
283
284 30
        return $this->isRunning();
285
    }
286
287
    /**
288
     * @return bool
289
     */
290 39
    public function isRunning()
291
    {
292 39
        return count($this->running) > 0;
293
    }
294
295
    /**
296
     * Return if all runs have started and were successful
297
     *
298
     * @return bool
299
     */
300 31
    public function isSuccessful()
301
    {
302 31
        if (!$this->hasStarted()) {
303 2
            return false;
304
        }
305
306 29
        foreach ($this->items as $run) {
307 29
            if (!$run->isSuccessful()) {
308 29
                return false;
309
            }
310
        }
311
312 20
        return true;
313
    }
314
315
    /**
316
     * Get a list of all the currently running runs
317
     *
318
     * @return RunInterface[]
319
     */
320 4
    public function getRunning()
321
    {
322 4
        return $this->running;
323
    }
324
325
    /**
326
     * Get a list of all the current waiting runs
327
     *
328
     * @return RunInterface[]
329
     */
330 4
    public function getWaiting()
331
    {
332 4
        return $this->waiting;
333
    }
334
335
    /**
336
     * @return int
337
     */
338 3
    public function getMaxSimultaneous()
339
    {
340 3
        return $this->maxSimultaneous;
341
    }
342
343
    /**
344
     * @param int $maxSimultaneous
345
     *
346
     * @return $this
347
     */
348 2
    public function setMaxSimultaneous($maxSimultaneous)
349
    {
350 2
        $this->maxSimultaneous = $maxSimultaneous;
351 2
        return $this;
352
    }
353
354
    /**
355
     * @return bool
356
     */
357
    public function isRunInstantly()
358
    {
359
        return $this->runInstantly;
360
    }
361
362
    /**
363
     * @param bool $runInstantly
364
     *
365
     * @return Pool
366
     */
367 2
    public function setRunInstantly($runInstantly)
368
    {
369 2
        $this->runInstantly = $runInstantly;
370 2
        return $this;
371
    }
372
}
373