Passed
Push — master ( c649ab...71b580 )
by Harry
02:19
created

Pool::setRunInstantly()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var callable|null */
39
    protected $onStart;
40
    /** @var int */
41
    private $maxSimultaneous = -1;
42
    /** @var bool */
43
    private $runInstantly = false;
44
45
    /**
46
     * Pool constructor.
47
     *
48
     * Set the default callbacks here
49
     *
50
     * @param RunInterface[]|Process[] $items
51
     * @param callable|null            $onSuccess       function (Process $process, float $duration, string $last,
52
     *                                                  string $lastType) : void
53
     * @param callable|null            $onFailure       function (Process $process, float $duration, string $last,
54
     *                                                  string $lastType) : void
55
     * @param callable|null            $onProgress      function (Process $process, float $duration, string $last,
56
     *                                                  string $lastType) : void
57
     * @param callable|null            $onStart         function (Process $process, float $duration, string $last,
58
     *                                                  string $lastType) : void
59
     * @param int                      $maxSimultaneous Maximum number of simulatneous processes
60
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
61
     *                                                  running
62
     */
63 31
    public function __construct(
64
        array $items = [],
65
        callable $onSuccess = null,
66
        callable $onFailure = null,
67
        callable $onProgress = null,
68
        callable $onStart = null,
69
        $maxSimultaneous = self::NO_MAX,
70
        $runInstantly = false
71
    ) {
72 31
        parent::__construct($items);
73
74 31
        $this->onSuccess = $onSuccess;
75 31
        $this->onFailure = $onFailure;
76 31
        $this->onProgress = $onProgress;
77 31
        $this->onStart = $onStart;
78 31
        $this->maxSimultaneous = $maxSimultaneous;
79 31
        $this->runInstantly = $runInstantly;
80
81 31
        if ($this->runInstantly) {
82 1
            $this->start();
83
        }
84 31
    }
85
86
    /**
87
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last, string $lastType) :
88
     *                                 void
89
     *
90
     * @return $this
91
     */
92 1
    public function setOnSuccess($onSuccess)
93
    {
94 1
        $this->onSuccess = $onSuccess;
95 1
        return $this;
96
    }
97
98
    /**
99
     * @param callable|null $onFailure function (Process $process, float $duration, string $last, string $lastType) :
100
     *                                 void
101
     *
102
     * @return $this
103
     */
104 1
    public function setOnFailure($onFailure)
105
    {
106 1
        $this->onFailure = $onFailure;
107 1
        return $this;
108
    }
109
110
    /**
111
     * @param callable|null $onProgress function (Process $process, float $duration, string $last, string $lastType) :
112
     *                                  void
113
     *
114
     * @return $this
115
     */
116 1
    public function setOnProgress($onProgress)
117
    {
118 1
        $this->onProgress = $onProgress;
119 1
        return $this;
120
    }
121
122
    /**
123
     * @param callable|null $onStart function (Process $process, float $duration, string $last, string $lastType) :
124
     *                               void
125
     *
126
     * @return $this
127
     */
128 1
    public function setOnStart($onStart)
129
    {
130 1
        $this->onStart = $onStart;
131 1
        return $this;
132
    }
133
134
    /**
135
     * Add a new process to the pool
136
     *
137
     * @param RunInterface|Process $item
138
     *
139
     * @return $this
140
     */
141 39
    public function add($item)
142
    {
143 39
        if ($item instanceof Process) {
144 12
            return $this->addProcess($item);
145
        }
146
147 39
        if (!$item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
148 1
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
149
        }
150
151 38
        if (!($this->isRunning() || $this->runInstantly) && $item->isRunning()) {
152 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
153
        }
154
155 37
        parent::add($item);
156
157 37
        if ($this->isRunning() || $this->runInstantly) {
158 4
            $this->startRun($item);
159
        }
160
161 37
        return $this;
162
    }
163
164
    /**
165
     * Add a new process to the pool using the default callbacks
166
     *
167
     * @param Process $process
168
     *
169
     * @return $this
170
     */
171 12
    protected function addProcess(Process $process)
172
    {
173 12
        return $this->add(
174 12
            new Run(
175 12
                $process,
176 12
                $this->onSuccess,
177 12
                $this->onFailure,
178 12
                $this->onProgress,
179 12
                $this->onStart
180
            )
181
        );
182
    }
183
184
    /**
185
     * Start all the processes running
186
     *
187
     * @return $this
188
     */
189 30
    public function start()
190
    {
191 30
        foreach ($this->items as $run) {
192 30
            $this->startRun($run);
193
        }
194
195 30
        return $this;
196
    }
197
198
    /**
199
     * Start a run (or queue it if we are running the maximum number of processes already)
200
     *
201
     * @param RunInterface $run
202
     */
203 32
    private function startRun(RunInterface $run)
204
    {
205 32
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
206 32
            $run->start();
207 32
            $this->running[] = $run;
208
        } else {
209 2
            $this->waiting[] = $run;
210
        }
211 32
    }
212
213
    /**
214
     * Blocking call to run processes;
215
     *
216
     * @param float $checkInterval Seconds between checks
217
     *
218
     * @return bool true if all processes were successful
219
     */
220 26
    public function run($checkInterval = self::CHECK_INTERVAL)
221
    {
222 26
        $this->start();
223
224 26
        $interval = (int) ($checkInterval * 1000000);
225 26
        while ($this->poll()) {
226 22
            usleep($interval);
227
        }
228
229 26
        return $this->isSuccessful();
230
    }
231
232
    /**
233
     * Check when a run has finished, if there are processes waiting, start them
234
     */
235 30
    private function checkFinished()
236
    {
237 30
        if ($this->maxSimultaneous !== static::NO_MAX
238 30
            && count($this->waiting) > 0
239 30
            && count($this->running) < $this->maxSimultaneous) {
240 1
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
241 1
                $run = array_shift($this->waiting);
242 1
                $run->start();
243 1
                $this->running[] = $run;
244
            }
245
        }
246 30
    }
247
248
    /**
249
     * Determine if any item has run
250
     *
251
     * @return bool
252
     */
253 31
    public function hasStarted()
254
    {
255 31
        foreach ($this->items as $run) {
256 30
            if ($run->hasStarted()) {
257 30
                return true;
258
            }
259
        }
260 3
        return false;
261
    }
262
263
    /**
264
     * Are any of the processes running
265
     *
266
     * @return bool
267
     */
268 30
    public function poll()
269
    {
270
        /** @var Run[] $running */
271 30
        $this->running = array_filter(
272 30
            $this->running,
273 30
            function (RunInterface $run) {
274 30
                return $run->poll();
275 30
            }
276
        );
277
278 30
        $this->checkFinished();
279
280 30
        return $this->isRunning();
281
    }
282
283
    /**
284
     * @return bool
285
     */
286 39
    public function isRunning()
287
    {
288 39
        return count($this->running) > 0;
289
    }
290
291
    /**
292
     * Return if all runs have started and were successful
293
     *
294
     * @return bool
295
     */
296 31
    public function isSuccessful()
297
    {
298 31
        if (!$this->hasStarted()) {
299 2
            return false;
300
        }
301
302 29
        foreach ($this->items as $run) {
303 29
            if (!$run->isSuccessful()) {
304 29
                return false;
305
            }
306
        }
307
308 20
        return true;
309
    }
310
311
    /**
312
     * Get a list of all the currently running runs
313
     *
314
     * @return RunInterface[]
315
     */
316 4
    public function getRunning()
317
    {
318 4
        return $this->running;
319
    }
320
321
    /**
322
     * Get a list of all the current waiting runs
323
     *
324
     * @return RunInterface[]
325
     */
326 4
    public function getWaiting()
327
    {
328 4
        return $this->waiting;
329
    }
330
331
    /**
332
     * @return int
333
     */
334 3
    public function getMaxSimultaneous()
335
    {
336 3
        return $this->maxSimultaneous;
337
    }
338
339
    /**
340
     * @param int $maxSimultaneous
341
     *
342
     * @return $this
343
     */
344 2
    public function setMaxSimultaneous($maxSimultaneous)
345
    {
346 2
        $this->maxSimultaneous = $maxSimultaneous;
347 2
        return $this;
348
    }
349
350
    /**
351
     * @return bool
352
     */
353 2
    public function isRunInstantly()
354
    {
355 2
        return $this->runInstantly;
356
    }
357
358
    /**
359
     * @param bool $runInstantly
360
     *
361
     * @return Pool
362
     */
363 3
    public function setRunInstantly($runInstantly)
364
    {
365 3
        $this->runInstantly = $runInstantly;
366 3
        return $this;
367
    }
368
}
369