Completed
Pull Request — master (#12)
by Harry
11:21
created

Pool::setOnStart()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var callable|null */
39
    private $onStart;
40
    /** @var int */
41
    private $maxSimultaneous = -1;
42
43
    /**
44
     * Pool constructor.
45
     *
46
     * Set the default callbacks here
47
     *
48
     * @param RunInterface[]|Process[] $items
49
     * @param callable|null            $onSuccess  function (Process $process, float $duration, string $last, string
50
     *                                             $lastType) : void
51
     * @param callable|null            $onFailure  function (Process $process, float $duration, string $last, string
52
     *                                             $lastType) : void
53
     * @param callable|null            $onProgress function (Process $process, float $duration, string $last, string
54
     *                                             $lastType) : void
55
     * @param callable|null            $onStart    function (Process $process, float $duration, string $last, string
56
     *                                             $lastType) : void
57
     * @param int                      $maxSimultaneous
58
     */
59 27 View Code Duplication
    public function __construct(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
60 20
        array $items = [],
61 23
        callable $onSuccess = null,
62 24
        callable $onFailure = null,
63 25
        callable $onProgress = null,
64 26
        callable $onStart = null,
65 26
        $maxSimultaneous = self::NO_MAX
66
    ) {
67
        parent::__construct($items);
68
69 27
        $this->onSuccess = $onSuccess;
70 27
        $this->onFailure = $onFailure;
71 27
        $this->onProgress = $onProgress;
72 27
        $this->onStart = $onStart;
73 27
        $this->maxSimultaneous = $maxSimultaneous;
74
    }
75
76
    /**
77
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last, string $lastType) :
78
     *                                 void
79
     *
80
     * @return $this
81
     */
82 1
    public function setOnSuccess($onSuccess)
83
    {
84 1
        $this->onSuccess = $onSuccess;
85 1
        return $this;
86
    }
87
88
    /**
89
     * @param callable|null $onFailure function (Process $process, float $duration, string $last, string $lastType) :
90
     *                                 void
91
     *
92
     * @return $this
93
     */
94 1
    public function setOnFailure($onFailure)
95
    {
96 1
        $this->onFailure = $onFailure;
97 1
        return $this;
98
    }
99
100
    /**
101
     * @param callable|null $onProgress function (Process $process, float $duration, string $last, string $lastType) :
102
     *                                  void
103
     *
104
     * @return $this
105
     */
106 1
    public function setOnProgress($onProgress)
107
    {
108 1
        $this->onProgress = $onProgress;
109 1
        return $this;
110
    }
111
112
    /**
113
     * @param callable|null $onStart function (Process $process, float $duration, string $last, string $lastType) :
114
     *                               void
115
     *
116
     * @return $this
117
     */
118 1
    public function setOnStart($onStart)
119
    {
120 1
        $this->onStart = $onStart;
121 1
        return $this;
122
    }
123
124
    /**
125
     * Add a new process to the pool
126
     *
127
     * @param RunInterface|Process $item
128
     *
129
     * @return $this
130
     */
131 27
    public function add($item)
132
    {
133 17
        if ($item instanceof Process) {
134
            return $this->addProcess($item);
135
        }
136
137 27
        if (!$item instanceof RunInterface) {
138
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
139
        }
140
141
        if (!$this->isRunning() && $item->isRunning()) {
142
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
143
        }
144
145
        parent::add($item);
146
147
        if ($this->isRunning()) {
148
            $this->startRun($item);
149
        }
150
151 25
        return $this;
152
    }
153
154
    /**
155
     * Add a new process to the pool using the default callbacks
156
     *
157
     * @param Process $process
158
     *
159
     * @return $this
160
     */
161 10
    protected function addProcess(Process $process)
162
    {
163 10
        return $this->add(new Run(
164
                              $process,
165 10
                              $this->onSuccess,
166 10
                              $this->onFailure,
167 10
                              $this->onProgress,
168 10
                              $this->onStart
169
                          )
170
        );
171 10
    }
172
173
    /**
174
     * Start all the processes running
175
     *
176
     * @return $this
177
     */
178 28
    public function start()
179
    {
180 28
        foreach ($this->items as $run) {
181
            $this->startRun($run);
182
        }
183
184 28
        return $this;
185
    }
186
187
    /**
188
     * Start a run (or queue it if we are running the maximum number of processes already)
189
     *
190
     * @param RunInterface $run
191
     */
192 23
    private function startRun(RunInterface $run)
193
    {
194 21
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
195
            $run->start();
196 23
            $this->running[] = $run;
197
        } else {
198 2
            $this->waiting[] = $run;
199 23
        }
200
    }
201
202
    /**
203
     * Blocking call to run processes;
204
     *
205
     * @param float $checkInterval Seconds between checks
206
     *
207
     * @return bool true if all processes were successful
208
     */
209 25
    public function run($checkInterval = self::CHECK_INTERVAL)
210
    {
211
        $this->start();
212 25
        $interval = $checkInterval * 1000000;
213
214
        while ($this->poll()) {
215
            usleep($interval);
216
        }
217
218
        return $this->isSuccessful();
219
    }
220
221
    /**
222
     * Check when a run has finished, if there are processes waiting, start them
223
     */
224 4
    private function checkFinished()
225
    {
226 4
        if ($this->maxSimultaneous !== static::NO_MAX
227
            && count($this->waiting) > 0
228
            && count($this->running) < $this->maxSimultaneous) {
229
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
230
                $run = array_shift($this->waiting);
231
                $run->start();
232 1
                $this->running[] = $run;
233
            }
234
        }
235 4
    }
236
237
    /**
238
     * Determine if any item has run
239
     *
240
     * @return bool
241
     */
242 22
    public function hasStarted()
243
    {
244 22
        foreach ($this->items as $run) {
245
            if ($run->hasStarted()) {
246 22
                return true;
247
            }
248 22
        }
249 1
        return false;
250
    }
251
252
    /**
253
     * Are any of the processes running
254
     *
255
     * @return bool
256
     */
257
    public function poll()
258
    {
259
        /** @var Run[] $running */
260
        $this->running = array_filter($this->running, function (RunInterface $run) {
261
            return $run->poll();
262
        }
263
        );
264
265
        $this->checkFinished();
266
267
        return $this->isRunning();
268
    }
269
270
    /**
271
     * @return bool
272
     */
273
    public function isRunning()
274
    {
275
        return count($this->running) > 0;
276
    }
277
278
    /**
279
     * Return if all runs have started and were successful
280
     *
281
     * @return bool
282
     */
283 24
    public function isSuccessful()
284
    {
285
        if (!$this->hasStarted()) {
286 2
            return false;
287
        }
288
289 24
        foreach ($this->items as $run) {
290
            if (!$run->isSuccessful()) {
291 9
                return false;
292
            }
293 8
        }
294
295 15
        return true;
296
    }
297
298
    /**
299
     * Get a list of all the currently running runs
300
     *
301
     * @return RunInterface[]
302
     */
303
    public function getRunning()
304
    {
305
        return $this->running;
306
    }
307
308
    /**
309
     * Get a list of all the current waiting runs
310
     *
311
     * @return RunInterface[]
312
     */
313
    public function getWaiting()
314
    {
315
        return $this->waiting;
316
    }
317
318
    /**
319
     * @return int
320
     */
321 2
    public function getMaxSimultaneous()
322
    {
323 2
        return $this->maxSimultaneous;
324
    }
325
326
    /**
327
     * @param int $maxSimultaneous
328
     *
329
     * @return $this
330
     */
331 2
    public function setMaxSimultaneous($maxSimultaneous)
332
    {
333 2
        $this->maxSimultaneous = $maxSimultaneous;
334 2
        return $this;
335
    }
336
}
337