Passed
Pull Request — master (#18)
by Harry
04:35 queued 23s
created

Pool::checkFinished()   A

Complexity

Conditions 6
Paths 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 6

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 9
rs 9.2222
c 0
b 0
f 0
ccs 8
cts 8
cp 1
cc 6
nc 3
nop 0
crap 6
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Exception;
17
use Graze\DataStructure\Collection\Collection;
18
use Graze\ParallelProcess\Event\EventDispatcherTrait;
19
use Graze\ParallelProcess\Event\PoolRunEvent;
20
use Graze\ParallelProcess\Event\RunEvent;
21
use InvalidArgumentException;
22
use Symfony\Component\Process\Process;
23
use Throwable;
24
25
/**
26
 * Class Pool
27
 *
28
 * A Pool is a arbitrary collection of runs that can be used to group runs together when displaying with a
29
 * Table
30
 *
31
 * @package Graze\ParallelProcess
32
 */
33
class Pool extends Collection implements RunInterface, PoolInterface
34
{
35
    use EventDispatcherTrait;
36
    use RunningStateTrait;
37
38
    /** @var RunInterface[] */
39
    protected $items = [];
40
    /** @var RunInterface[] */
41
    protected $waiting = [];
42
    /** @var RunInterface[] */
43
    protected $running = [];
44
    /** @var RunInterface[] */
45
    protected $complete = [];
46
    /** @var Exception[]|Throwable[] */
47
    private $exceptions = [];
48
    /** @var array */
49
    private $tags;
50
    /** @var float */
51
    private $priority;
52
53
    /**
54
     * RunCollection constructor.
55
     *
56
     * @param RunInterface[] $runs
57
     * @param array          $tags
58
     * @param float          $priority
59
     */
60 69
    public function __construct(array $runs = [], array $tags = [], $priority = 1.0)
61
    {
62 69
        parent::__construct([]);
63
64 69
        $this->tags = $tags;
65
66 69
        array_map([$this, 'add'], $runs);
67 69
        $this->priority = $priority;
68 69
    }
69
70
    /**
71
     * @param RunInterface|Process $item
72
     * @param array                $tags
73
     *
74
     * @return $this
75
     */
76 54
    public function add($item, array $tags = [])
77
    {
78 54
        if ($item instanceof Process) {
79 32
            return $this->add(new ProcessRun($item, $tags));
80
        }
81 54
        if (!$item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
82 2
            throw new InvalidArgumentException('item must implement `RunInterface`');
83
        }
84
85 52
        parent::add($item);
86 52
        $status = 'waiting';
87 52
        if ($item->isRunning()) {
88 1
            $status = 'running';
89 1
            $this->running[] = $item;
90 52
        } elseif ($item->hasStarted()) {
91
            $status = 'finished';
92
            $this->finished[] = $item;
93
        } else {
94 52
            $this->waiting[] = $item;
95
        }
96
97 52
        $item->addListener(RunEvent::STARTED, [$this, 'onRunStarted']);
98 52
        $item->addListener(RunEvent::COMPLETED, [$this, 'onRunCompleted']);
99 52
        $item->addListener(RunEvent::FAILED, [$this, 'onRunFailed']);
100
101 52
        $this->dispatch(PoolRunEvent::POOL_RUN_ADDED, new PoolRunEvent($this, $item));
102
103 52
        if ($status == 'running' || $status == 'finished') {
104 1
            if ($this->state == static::STATE_NOT_STARTED) {
105
                $this->setStarted();
106
                $this->dispatch(RunEvent::STARTED, new RunEvent($this));
107
            }
108
        }
109
110 52
        return $this;
111
    }
112
113
    /**
114
     * When a run starts, check our current state and start ourselves in required
115
     *
116
     * @param RunEvent $event
117
     */
118 36
    public function onRunStarted(RunEvent $event)
119
    {
120 36
        $index = array_search($event->getRun(), $this->waiting, true);
121 36
        if ($index !== false) {
122 36
            unset($this->waiting[$index]);
123
        }
124 36
        $this->running[] = $event->getRun();
125 36
        if ($this->state == static::STATE_NOT_STARTED) {
126 36
            $this->setStarted();
127 36
            $this->dispatch(RunEvent::STARTED, new RunEvent($this));
128
        }
129 36
        $this->dispatch(RunEvent::UPDATED, new RunEvent($this));
130 36
    }
131
132
    /**
133
     * When a run is completed, check if everything has finished
134
     *
135
     * @param RunEvent $event
136
     */
137 35
    public function onRunCompleted(RunEvent $event)
138
    {
139 35
        $index = array_search($event->getRun(), $this->running, true);
140 35
        if ($index !== false) {
141 34
            unset($this->running[$index]);
142
        }
143 35
        $this->complete[] = $event->getRun();
144 35
        $this->dispatch(RunEvent::UPDATED, new RunEvent($this));
145 35
        if (count($this->waiting) === 0 && count($this->running) === 0) {
146 34
            $this->setFinished();
147 34
            if ($this->isSuccessful()) {
148 28
                $this->dispatch(RunEvent::SUCCESSFUL, new RunEvent($this));
149
            } else {
150 6
                $this->dispatch(RunEvent::FAILED, new RunEvent($this));
151
            }
152 34
            $this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
153
        }
154 35
    }
155
156
    /**
157
     * Handle any errors returned from the child run
158
     *
159
     * @param RunEvent $event
160
     */
161 7
    public function onRunFailed(RunEvent $event)
162
    {
163 7
        $this->exceptions = array_merge($this->exceptions, $event->getRun()->getExceptions());
164 7
    }
165
166
    /**
167
     * Has this run been started before
168
     *
169
     * @return bool
170
     */
171 14
    public function hasStarted()
172
    {
173 14
        return $this->getState() !== static::STATE_NOT_STARTED;
174
    }
175
176
    /**
177
     * Start all non running children
178
     *
179
     * @return $this
180
     *
181
     * @throws \Graze\ParallelProcess\Exceptions\NotRunningException
182
     */
183 8
    public function start()
184
    {
185 8
        foreach ($this->items as $run) {
186 8
            if (!$run->hasStarted()) {
187 8
                $run->start();
188
            }
189
        }
190 8
        return $this;
191
    }
192
193
    /**
194
     * Was this run successful
195
     *
196
     * @return bool
197
     */
198 40
    public function isSuccessful()
199
    {
200 40
        if ($this->getState() === static::STATE_NOT_RUNNING) {
201 34
            foreach ($this->items as $run) {
202 34
                if (!$run->isSuccessful()) {
203 34
                    return false;
204
                }
205
            }
206 28
            return true;
207
        }
208 10
        return false;
209
    }
210
211
    /**
212
     * If the run was unsuccessful, get the error if applicable
213
     *
214
     * @return Exception[]|Throwable[]
215
     */
216 1
    public function getExceptions()
217
    {
218 1
        return $this->exceptions;
219
    }
220
221
    /**
222
     * We think this is running
223
     *
224
     * @return bool
225
     */
226 59
    public function isRunning()
227
    {
228 59
        return $this->getState() === static::STATE_RUNNING;
229
    }
230
231
    /**
232
     * Pools to see if this process is running
233
     *
234
     * @return bool
235
     */
236 34
    public function poll()
237
    {
238 34
        foreach ($this->running as $run) {
239 25
            $run->poll();
240
        }
241 34
        return $this->isRunning();
242
    }
243
244
    /**
245
     * Get a set of tags associated with this run
246
     *
247
     * @return array
248
     */
249 3
    public function getTags()
250
    {
251 3
        return $this->tags;
252
    }
253
254
    /**
255
     * @return float[]|null an array of values of the current position, max, and percentage. null if not applicable
256
     */
257 3
    public function getProgress()
258
    {
259 3
        return [count($this->complete), count($this->items), count($this->complete) / count($this->items)];
260
    }
261
262
    /**
263
     * @return float
264
     */
265 2
    public function getPriority()
266
    {
267 2
        return $this->priority;
268
    }
269
270
    /**
271
     * @param float $priority
272
     *
273
     * @return $this
274
     */
275
    public function setPriority($priority)
276
    {
277
        $this->priority = $priority;
278
        return $this;
279
    }
280
281
    /**
282
     * @return string[]
283
     */
284 58
    protected function getEventNames()
285
    {
286
        return [
287 58
            RunEvent::STARTED,
288
            RunEvent::COMPLETED,
289
            RunEvent::SUCCESSFUL,
290
            RunEvent::FAILED,
291
            RunEvent::UPDATED,
292 58
            PoolRunEvent::POOL_RUN_ADDED,
293
        ];
294
    }
295
296
    /**
297
     * Run this pool of runs and block until they are complete.
298
     *
299
     * Note this will run the parent pool
300
     *
301
     * @param float $interval
302
     *
303
     * @return bool `true` if all the runs were successful
304
     */
305 7
    public function run($interval = self::CHECK_INTERVAL)
306
    {
307 7
        $this->start();
308
309 7
        $sleep = (int) ($interval * 1000000);
310 7
        while ($this->poll()) {
311 1
            usleep($sleep);
312
        }
313
314 7
        return $this->isSuccessful();
315
    }
316
317
    /**
318
     * @return RunInterface[]
319
     */
320 7
    public function getWaiting()
321
    {
322 7
        return array_values($this->waiting);
323
    }
324
325
    /**
326
     * @return RunInterface[]
327
     */
328 6
    public function getRunning()
329
    {
330 6
        return array_values($this->running);
331
    }
332
333
    /**
334
     * @return RunInterface[]
335
     */
336 3
    public function getFinished()
337
    {
338 3
        return array_values($this->complete);
339
    }
340
}
341