Completed
Push — master ( 33b2da...23d19d )
by Harry
01:24
created

Pool::setMaxSimultaneous()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
c 0
b 0
f 0
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\DataStructure\Collection\Collection;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use InvalidArgumentException;
19
use Symfony\Component\Process\Process;
20
21
class Pool extends Collection implements RunInterface
22
{
23
    const CHECK_INTERVAL = 0.1;
24
    const NO_MAX         = -1;
25
26
    /** @var RunInterface[] */
27
    protected $items = [];
28
    /** @var RunInterface[] */
29
    protected $running = [];
30
    /** @var RunInterface[] */
31
    protected $waiting = [];
32
    /** @var callable|null */
33
    protected $onSuccess;
34
    /** @var callable|null */
35
    protected $onFailure;
36
    /** @var callable|null */
37
    protected $onProgress;
38
    /** @var int */
39
    private $maxSimultaneous = -1;
40
41
    /**
42
     * Pool constructor.
43
     *
44
     * Set the default callbacks here
45
     *
46
     * @param RunInterface[]|Process[] $items
47
     * @param callable|null            $onSuccess  function (Process $process, float $duration, string $last) : void
48
     * @param callable|null            $onFailure  function (Process $process, float $duration, string $last) : void
49
     * @param callable|null            $onProgress function (Process $process, float $duration, string $last) : void
50
     * @param int                      $maxSimultaneous
51
     */
52 View Code Duplication
    public function __construct(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
53
        array $items = [],
54
        callable $onSuccess = null,
55
        callable $onFailure = null,
56
        callable $onProgress = null,
57
        $maxSimultaneous = self::NO_MAX
58
    ) {
59
        parent::__construct($items);
60
61
        $this->onSuccess = $onSuccess;
62
        $this->onFailure = $onFailure;
63
        $this->onProgress = $onProgress;
64
        $this->maxSimultaneous = $maxSimultaneous;
65
    }
66
67
    /**
68
     * @param callable|null $onSuccess function (Process $process, float $duration, string $last) : void
69
     *
70
     * @return $this
71
     */
72
    public function setOnSuccess($onSuccess)
73
    {
74
        $this->onSuccess = $onSuccess;
75
        return $this;
76
    }
77
78
    /**
79
     * @param callable|null $onFailure function (Process $process, float $duration, string $last) : void
80
     *
81
     * @return $this
82
     */
83
    public function setOnFailure($onFailure)
84
    {
85
        $this->onFailure = $onFailure;
86
        return $this;
87
    }
88
89
    /**
90
     * @param callable|null $onProgress function (Process $process, float $duration, string $last) : void
91
     *
92
     * @return $this
93
     */
94
    public function setOnProgress($onProgress)
95
    {
96
        $this->onProgress = $onProgress;
97
        return $this;
98
    }
99
100
    /**
101
     * Add a new process to the pool
102
     *
103
     * @param RunInterface|Process $item
104
     *
105
     * @return $this
106
     */
107
    public function add($item)
108
    {
109
        if ($item instanceof Process) {
110
            return $this->addProcess($item);
111
        }
112
113
        if (!$item instanceof RunInterface) {
114
            throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
115
        }
116
117
        if (!$this->isRunning() && $item->isRunning()) {
118
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
119
        }
120
121
        parent::add($item);
122
123
        if ($this->isRunning()) {
124
            $this->startRun($item);
125
        }
126
127
        return $this;
128
    }
129
130
    /**
131
     * Add a new process to the pool using the default callbacks
132
     *
133
     * @param Process $process
134
     *
135
     * @return $this
136
     */
137
    protected function addProcess(Process $process)
138
    {
139
        return $this->add(new Run(
140
            $process,
141
            $this->onSuccess,
142
            $this->onFailure,
143
            $this->onProgress
144
        ));
145
    }
146
147
    /**
148
     * Start all the processes running
149
     *
150
     * @return $this
151
     */
152
    public function start()
153
    {
154
        foreach ($this->items as $run) {
155
            $this->startRun($run);
156
        }
157
158
        return $this;
159
    }
160
161
    /**
162
     * Start a run (or queue it if we are running the maximum number of processes already)
163
     *
164
     * @param RunInterface $run
165
     */
166
    private function startRun(RunInterface $run)
167
    {
168
        if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
169
            $run->start();
170
            $this->running[] = $run;
171
        } else {
172
            $this->waiting[] = $run;
173
        }
174
    }
175
176
    /**
177
     * Blocking call to run processes;
178
     *
179
     * @param float $checkInterval Seconds between checks
180
     *
181
     * @return bool true if all processes were successful
182
     */
183
    public function run($checkInterval = self::CHECK_INTERVAL)
184
    {
185
        $this->start();
186
        $interval = $checkInterval * 1000000;
187
188
        while ($this->poll()) {
189
            usleep($interval);
190
        }
191
192
        return $this->isSuccessful();
193
    }
194
195
    /**
196
     * Check when a run has finished, if there are processes waiting, start them
197
     */
198
    private function checkFinished()
199
    {
200
        if ($this->maxSimultaneous !== static::NO_MAX
201
            && count($this->waiting) > 0
202
            && count($this->running) < $this->maxSimultaneous) {
203
            for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
204
                $run = array_shift($this->waiting);
205
                $run->start();
206
                $this->running[] = $run;
207
            }
208
        }
209
    }
210
211
    /**
212
     * Determine if any item has run
213
     *
214
     * @return bool
215
     */
216
    public function hasStarted()
217
    {
218
        foreach ($this->items as $run) {
219
            if ($run->hasStarted()) {
220
                return true;
221
            }
222
        }
223
        return false;
224
    }
225
226
    /**
227
     * Are any of the processes running
228
     *
229
     * @return bool
230
     */
231
    public function poll()
232
    {
233
        /** @var Run[] $running */
234
        $this->running = array_filter($this->running, function (RunInterface $run) {
235
            return $run->poll();
236
        });
237
238
        $this->checkFinished();
239
240
        return $this->isRunning();
241
    }
242
243
    /**
244
     * @return bool
245
     */
246
    public function isRunning()
247
    {
248
        return count($this->running) > 0;
249
    }
250
251
    /**
252
     * Return if all runs have started and were successful
253
     *
254
     * @return bool
255
     */
256
    public function isSuccessful()
257
    {
258
        if (!$this->hasStarted()) {
259
            return false;
260
        }
261
262
        foreach ($this->items as $run) {
263
            if (!$run->isSuccessful()) {
264
                return false;
265
            }
266
        }
267
268
        return true;
269
    }
270
271
    /**
272
     * Get a list of all the currently running runs
273
     *
274
     * @return RunInterface[]
275
     */
276
    public function getRunning()
277
    {
278
        return $this->running;
279
    }
280
281
    /**
282
     * Get a list of all the current waiting runs
283
     *
284
     * @return RunInterface[]
285
     */
286
    public function getWaiting()
287
    {
288
        return $this->waiting;
289
    }
290
291
    /**
292
     * @return int
293
     */
294
    public function getMaxSimultaneous()
295
    {
296
        return $this->maxSimultaneous;
297
    }
298
299
    /**
300
     * @param int $maxSimultaneous
301
     *
302
     * @return $this
303
     */
304
    public function setMaxSimultaneous($maxSimultaneous)
305
    {
306
        $this->maxSimultaneous = $maxSimultaneous;
307
        return $this;
308
    }
309
}
310