Completed
Pull Request — master (#12)
by Harry
01:54
created

Pool::setOnStart()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var callable|null */
39
    protected $onStart;
40
    /** @var int */
41
    private $maxSimultaneous = -1;
42
43
    /**
44
     * Pool constructor.
45
     *
46
     * Set the default callbacks here
47
     *
48
     * @param RunInterface[]|Process[] $items
49
     * @param callable|null            $onSuccess  function (Process $process, float $duration, string $last, string
50
     *                                             $lastType) : void
51
     * @param callable|null            $onFailure  function (Process $process, float $duration, string $last, string
52
     *                                             $lastType) : void
53
     * @param callable|null            $onProgress function (Process $process, float $duration, string $last, string
54
     *                                             $lastType) : void
55
     * @param callable|null            $onStart    function (Process $process, float $duration, string $last, string
56
     *                                             $lastType) : void
57
     * @param int                      $maxSimultaneous
58
     */
59 27 View Code Duplication
    public function __construct(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
60
        array $items = [],
61
        callable $onSuccess = null,
62
        callable $onFailure = null,
63
        callable $onProgress = null,
64
        callable $onStart = null,
65
        $maxSimultaneous = self::NO_MAX
66
    ) {
67 27
        parent::__construct($items);
68
69 27
        $this->onSuccess = $onSuccess;
70 27
        $this->onFailure = $onFailure;
71 27
        $this->onProgress = $onProgress;
72 27
        $this->onStart = $onStart;
73 27
        $this->maxSimultaneous = $maxSimultaneous;
74 27
    }
75
76
    /**
77
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last, string $lastType) :
78
     *                                 void
79
     *
80
     * @return $this
81
     */
82 1
    public function setOnSuccess($onSuccess)
83
    {
84 1
        $this->onSuccess = $onSuccess;
85 1
        return $this;
86
    }
87
88
    /**
89
     * @param callable|null $onFailure function (Process $process, float $duration, string $last, string $lastType) :
90
     *                                 void
91
     *
92
     * @return $this
93
     */
94 1
    public function setOnFailure($onFailure)
95
    {
96 1
        $this->onFailure = $onFailure;
97 1
        return $this;
98
    }
99
100
    /**
101
     * @param callable|null $onProgress function (Process $process, float $duration, string $last, string $lastType) :
102
     *                                  void
103
     *
104
     * @return $this
105
     */
106 1
    public function setOnProgress($onProgress)
107
    {
108 1
        $this->onProgress = $onProgress;
109 1
        return $this;
110
    }
111
112
    /**
113
     * @param callable|null $onStart function (Process $process, float $duration, string $last, string $lastType) :
114
     *                               void
115
     *
116
     * @return $this
117
     */
118 1
    public function setOnStart($onStart)
119
    {
120 1
        $this->onStart = $onStart;
121 1
        return $this;
122
    }
123
124
    /**
125
     * Add a new process to the pool
126
     *
127
     * @param RunInterface|Process $item
128
     *
129
     * @return $this
130
     */
131 36
    public function add($item)
132
    {
133 36
        if ($item instanceof Process) {
134 12
            return $this->addProcess($item);
135
        }
136
137 36
        if (!$item instanceof RunInterface) {
138 1
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
139
        }
140
141 35
        if (!$this->isRunning() && $item->isRunning()) {
142 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
143
        }
144
145 34
        parent::add($item);
146
147 34
        if ($this->isRunning()) {
148 2
            $this->startRun($item);
149 2
        }
150
151 34
        return $this;
152
    }
153
154
    /**
155
     * Add a new process to the pool using the default callbacks
156
     *
157
     * @param Process $process
158
     *
159
     * @return $this
160
     */
161 12
    protected function addProcess(Process $process)
162
    {
163 12
        return $this->add(
164 12
            new Run(
165 12
                $process,
166 12
                $this->onSuccess,
167 12
                $this->onFailure,
168 12
                $this->onProgress,
169 12
                $this->onStart
170 12
            )
171 12
        );
172
    }
173
174
    /**
175
     * Start all the processes running
176
     *
177
     * @return $this
178
     */
179 29
    public function start()
180
    {
181 29
        foreach ($this->items as $run) {
182 29
            $this->startRun($run);
183 29
        }
184
185 29
        return $this;
186
    }
187
188
    /**
189
     * Start a run (or queue it if we are running the maximum number of processes already)
190
     *
191
     * @param RunInterface $run
192
     */
193 29
    private function startRun(RunInterface $run)
194
    {
195 29
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
196 29
            $run->start();
197 29
            $this->running[] = $run;
198 29
        } else {
199 2
            $this->waiting[] = $run;
200
        }
201 29
    }
202
203
    /**
204
     * Blocking call to run processes;
205
     *
206
     * @param float $checkInterval Seconds between checks
207
     *
208
     * @return bool true if all processes were successful
209
     */
210 26
    public function run($checkInterval = self::CHECK_INTERVAL)
211
    {
212 26
        $this->start();
213 26
        $interval = $checkInterval * 1000000;
214
215 26
        while ($this->poll()) {
216 22
            usleep($interval);
217 22
        }
218
219 26
        return $this->isSuccessful();
220
    }
221
222
    /**
223
     * Check when a run has finished, if there are processes waiting, start them
224
     */
225 27
    private function checkFinished()
226
    {
227 27
        if ($this->maxSimultaneous !== static::NO_MAX
228 27
            && count($this->waiting) > 0
229 27
            && count($this->running) < $this->maxSimultaneous) {
230 1
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
231 1
                $run = array_shift($this->waiting);
232 1
                $run->start();
233 1
                $this->running[] = $run;
234 1
            }
235 1
        }
236 27
    }
237
238
    /**
239
     * Determine if any item has run
240
     *
241
     * @return bool
242
     */
243 28
    public function hasStarted()
244
    {
245 28
        foreach ($this->items as $run) {
246 27
            if ($run->hasStarted()) {
247 26
                return true;
248
            }
249 3
        }
250 3
        return false;
251
    }
252
253
    /**
254
     * Are any of the processes running
255
     *
256
     * @return bool
257
     */
258 27
    public function poll()
259
    {
260
        /** @var Run[] $running */
261 27
        $this->running = array_filter(
262 27
            $this->running,
263 27
            function (RunInterface $run) {
264 27
                return $run->poll();
265
            }
266 27
        );
267
268 27
        $this->checkFinished();
269
270 27
        return $this->isRunning();
271
    }
272
273
    /**
274
     * @return bool
275
     */
276 36
    public function isRunning()
277
    {
278 36
        return count($this->running) > 0;
279
    }
280
281
    /**
282
     * Return if all runs have started and were successful
283
     *
284
     * @return bool
285
     */
286 28
    public function isSuccessful()
287
    {
288 28
        if (!$this->hasStarted()) {
289 2
            return false;
290
        }
291
292 26
        foreach ($this->items as $run) {
293 26
            if (!$run->isSuccessful()) {
294 9
                return false;
295
            }
296 18
        }
297
298 17
        return true;
299
    }
300
301
    /**
302
     * Get a list of all the currently running runs
303
     *
304
     * @return RunInterface[]
305
     */
306 4
    public function getRunning()
307
    {
308 4
        return $this->running;
309
    }
310
311
    /**
312
     * Get a list of all the current waiting runs
313
     *
314
     * @return RunInterface[]
315
     */
316 4
    public function getWaiting()
317
    {
318 4
        return $this->waiting;
319
    }
320
321
    /**
322
     * @return int
323
     */
324 3
    public function getMaxSimultaneous()
325
    {
326 3
        return $this->maxSimultaneous;
327
    }
328
329
    /**
330
     * @param int $maxSimultaneous
331
     *
332
     * @return $this
333
     */
334 2
    public function setMaxSimultaneous($maxSimultaneous)
335
    {
336 2
        $this->maxSimultaneous = $maxSimultaneous;
337 2
        return $this;
338
    }
339
}
340