Completed
Pull Request — master (#15)
by Harry
06:15
created

Pool::isRunInstantly()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 0
cts 0
cp 0
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var callable|null */
39
    protected $onStart;
40
    /** @var int */
41
    private $maxSimultaneous = -1;
42
    /** @var bool */
43
    private $runInstantly = false;
44
45
    /**
46
     * Pool constructor.
47
     *
48
     * Set the default callbacks here
49
     *
50
     * @param RunInterface[]|Process[] $items
51
     * @param callable|null            $onSuccess       function (Process $process, float $duration, string $last,
52
     *                                                  string
53
     *                                                  $lastType) : void
54
     * @param callable|null            $onFailure       function (Process $process, float $duration, string $last,
55
     *                                                  string
56
     *                                                  $lastType) : void
57
     * @param callable|null            $onProgress      function (Process $process, float $duration, string $last,
58
     *                                                  string
59 27
     *                                                  $lastType) : void
60
     * @param callable|null            $onStart         function (Process $process, float $duration, string $last,
61
     *                                                  string
62
     *                                                  $lastType) : void
63
     * @param int                      $maxSimultaneous Maximum number of simulatneous processes
64
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
65
     *                                                  running
66
     */
67 27
    public function __construct(
68
        array $items = [],
69 27
        callable $onSuccess = null,
70 27
        callable $onFailure = null,
71 27
        callable $onProgress = null,
72 27
        callable $onStart = null,
73 27
        $maxSimultaneous = self::NO_MAX,
74 27
        $runInstantly = false
75
    ) {
76
        parent::__construct($items);
77
78
        $this->onSuccess = $onSuccess;
79
        $this->onFailure = $onFailure;
80
        $this->onProgress = $onProgress;
81
        $this->onStart = $onStart;
82 1
        $this->maxSimultaneous = $maxSimultaneous;
83
        $this->runInstantly = $runInstantly;
84 1
85 1
        if ($this->runInstantly) {
86
            $this->start();
87
        }
88
    }
89
90
    /**
91
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last, string $lastType) :
92
     *                                 void
93
     *
94 1
     * @return $this
95
     */
96 1
    public function setOnSuccess($onSuccess)
97 1
    {
98
        $this->onSuccess = $onSuccess;
99
        return $this;
100
    }
101
102
    /**
103
     * @param callable|null $onFailure function (Process $process, float $duration, string $last, string $lastType) :
104
     *                                 void
105
     *
106 1
     * @return $this
107
     */
108 1
    public function setOnFailure($onFailure)
109 1
    {
110
        $this->onFailure = $onFailure;
111
        return $this;
112
    }
113
114
    /**
115
     * @param callable|null $onProgress function (Process $process, float $duration, string $last, string $lastType) :
116
     *                                  void
117
     *
118 1
     * @return $this
119
     */
120 1
    public function setOnProgress($onProgress)
121 1
    {
122
        $this->onProgress = $onProgress;
123
        return $this;
124
    }
125
126
    /**
127
     * @param callable|null $onStart function (Process $process, float $duration, string $last, string $lastType) :
128
     *                               void
129
     *
130
     * @return $this
131 36
     */
132
    public function setOnStart($onStart)
133 36
    {
134 12
        $this->onStart = $onStart;
135
        return $this;
136
    }
137 36
138 1
    /**
139
     * Add a new process to the pool
140
     *
141 35
     * @param RunInterface|Process $item
142 1
     *
143
     * @return $this
144
     */
145 34
    public function add($item)
146
    {
147 34
        if ($item instanceof Process) {
148 2
            return $this->addProcess($item);
149 2
        }
150
151 34
        if (!$item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
152
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
153
        }
154
155
        if (!($this->isRunning() || $this->runInstantly) && $item->isRunning()) {
156
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
157
        }
158
159
        parent::add($item);
160
161 12
        if ($this->isRunning() || $this->runInstantly) {
162
            $this->startRun($item);
163 12
        }
164 12
165 12
        return $this;
166 12
    }
167 12
168 12
    /**
169 12
     * Add a new process to the pool using the default callbacks
170 12
     *
171 12
     * @param Process $process
172
     *
173
     * @return $this
174
     */
175
    protected function addProcess(Process $process)
176
    {
177
        return $this->add(
178
            new Run(
179 29
                $process,
180
                $this->onSuccess,
181 29
                $this->onFailure,
182 29
                $this->onProgress,
183 29
                $this->onStart
184
            )
185 29
        );
186
    }
187
188
    /**
189
     * Start all the processes running
190
     *
191
     * @return $this
192
     */
193 29
    public function start()
194
    {
195 29
        foreach ($this->items as $run) {
196 29
            $this->startRun($run);
197 29
        }
198 29
199 2
        return $this;
200
    }
201 29
202
    /**
203
     * Start a run (or queue it if we are running the maximum number of processes already)
204
     *
205
     * @param RunInterface $run
206
     */
207
    private function startRun(RunInterface $run)
208
    {
209
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
210 26
            $run->start();
211
            $this->running[] = $run;
212 26
        } else {
213 26
            $this->waiting[] = $run;
214
        }
215 26
    }
216 22
217 22
    /**
218
     * Blocking call to run processes;
219 26
     *
220
     * @param float $checkInterval Seconds between checks
221
     *
222
     * @return bool true if all processes were successful
223
     */
224
    public function run($checkInterval = self::CHECK_INTERVAL)
225 27
    {
226
        $this->start();
227 27
228 27
        $interval = (int) ($checkInterval * 1000000);
229 27
        while ($this->poll()) {
230 1
            usleep($interval);
231 1
        }
232 1
233 1
        return $this->isSuccessful();
234 1
    }
235 1
236 27
    /**
237
     * Check when a run has finished, if there are processes waiting, start them
238
     */
239
    private function checkFinished()
240
    {
241
        if ($this->maxSimultaneous !== static::NO_MAX
242
            && count($this->waiting) > 0
243 28
            && count($this->running) < $this->maxSimultaneous) {
244
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
245 28
                $run = array_shift($this->waiting);
246 27
                $run->start();
247 26
                $this->running[] = $run;
248
            }
249 3
        }
250 3
    }
251
252
    /**
253
     * Determine if any item has run
254
     *
255
     * @return bool
256
     */
257
    public function hasStarted()
258 27
    {
259
        foreach ($this->items as $run) {
260
            if ($run->hasStarted()) {
261 27
                return true;
262 27
            }
263 27
        }
264 27
        return false;
265
    }
266 27
267
    /**
268 27
     * Are any of the processes running
269
     *
270 27
     * @return bool
271
     */
272
    public function poll()
273
    {
274
        /** @var Run[] $running */
275
        $this->running = array_filter(
276 36
            $this->running,
277
            function (RunInterface $run) {
278 36
                return $run->poll();
279
            }
280
        );
281
282
        $this->checkFinished();
283
284
        return $this->isRunning();
285
    }
286 28
287
    /**
288 28
     * @return bool
289 2
     */
290
    public function isRunning()
291
    {
292 26
        return count($this->running) > 0;
293 26
    }
294 9
295
    /**
296 18
     * Return if all runs have started and were successful
297
     *
298 17
     * @return bool
299
     */
300
    public function isSuccessful()
301
    {
302
        if (!$this->hasStarted()) {
303
            return false;
304
        }
305
306 4
        foreach ($this->items as $run) {
307
            if (!$run->isSuccessful()) {
308 4
                return false;
309
            }
310
        }
311
312
        return true;
313
    }
314
315
    /**
316 4
     * Get a list of all the currently running runs
317
     *
318 4
     * @return RunInterface[]
319
     */
320
    public function getRunning()
321
    {
322
        return $this->running;
323
    }
324 3
325
    /**
326 3
     * Get a list of all the current waiting runs
327
     *
328
     * @return RunInterface[]
329
     */
330
    public function getWaiting()
331
    {
332
        return $this->waiting;
333
    }
334 2
335
    /**
336 2
     * @return int
337 2
     */
338
    public function getMaxSimultaneous()
339
    {
340
        return $this->maxSimultaneous;
341
    }
342
343
    /**
344
     * @param int $maxSimultaneous
345
     *
346
     * @return $this
347
     */
348
    public function setMaxSimultaneous($maxSimultaneous)
349
    {
350
        $this->maxSimultaneous = $maxSimultaneous;
351
        return $this;
352
    }
353
354
    /**
355
     * @return bool
356
     */
357
    public function isRunInstantly()
358
    {
359
        return $this->runInstantly;
360
    }
361
362
    /**
363
     * @param bool $runInstantly
364
     *
365
     * @return Pool
366
     */
367
    public function setRunInstantly($runInstantly)
368
    {
369
        $this->runInstantly = $runInstantly;
370
        return $this;
371
    }
372
}
373