Issues (65)

src/Pool.php (1 issue)

Severity
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Exception;
17
use Graze\DataStructure\Collection\Collection;
18
use Graze\ParallelProcess\Event\EventDispatcherTrait;
19
use Graze\ParallelProcess\Event\PoolRunEvent;
20
use Graze\ParallelProcess\Event\PriorityChangedEvent;
21
use Graze\ParallelProcess\Event\RunEvent;
22
use InvalidArgumentException;
23
use Symfony\Component\Process\Process;
24
use Throwable;
25
26
/**
27
 * Class Pool
28
 *
29
 * A Pool is a arbitrary collection of runs that can be used to group runs together when displaying with a
30
 * Table
31
 *
32
 * A Pool can transition from `not_running` back to `running` again. But it cannot transition back to `not_started`.
33
 * This means that multiple `COMPLETED` and `STARTED` events can be sent out for a single pool.
34
 *
35
 * ```
36
 * not_started -> running <-> not_running
37
 * ```
38
 *
39
 * @package Graze\ParallelProcess
40
 */
41
class Pool extends Collection implements RunInterface, PoolInterface, PrioritisedInterface
42
{
43
    use EventDispatcherTrait;
44
    use RunningStateTrait;
45
    use PrioritisedTrait;
46
47
    /** @var RunInterface[] */
48
    protected $items = [];
49
    /** @var RunInterface[] */
50
    protected $waiting = [];
51
    /** @var RunInterface[] */
52
    protected $running = [];
53
    /** @var RunInterface[] */
54
    protected $complete = [];
55
    /** @var Exception[]|Throwable[] */
56
    private $exceptions = [];
57
    /** @var array */
58
    private $tags;
59
60
    /**
61
     * RunCollection constructor.
62
     *
63
     * @param RunInterface[] $runs
64
     * @param array          $tags
65
     * @param float          $priority
66
     */
67 73
    public function __construct(array $runs = [], array $tags = [], $priority = 1.0)
68
    {
69 73
        parent::__construct([]);
70
71 73
        $this->tags = $tags;
72
73 73
        array_map([$this, 'add'], $runs);
74 73
        $this->priority = $priority;
75 73
    }
76
77
    /**
78
     * @param RunInterface|Process $item
79
     * @param array                $tags
80
     *
81
     * @return $this
82
     */
83 57
    public function add($item, array $tags = [])
84
    {
85 57
        if ($item instanceof Process) {
86 32
            return $this->add(new ProcessRun($item, $tags));
87
        }
88 57
        if (!$item instanceof RunInterface) {
0 ignored issues
show
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
89 2
            throw new InvalidArgumentException('item must implement `RunInterface`');
90
        }
91
92 55
        parent::add($item);
93 55
        $status = 'waiting';
94 55
        if ($item->isRunning()) {
95 1
            $status = 'running';
96 1
            $this->running[] = $item;
97 55
        } elseif ($item->hasStarted()) {
98 1
            $status = 'finished';
99 1
            $this->complete[] = $item;
100 1
        } else {
101 54
            $this->waiting[] = $item;
102
        }
103
104 55
        $item->addListener(RunEvent::STARTED, [$this, 'onRunStarted']);
105 55
        $item->addListener(RunEvent::COMPLETED, [$this, 'onRunCompleted']);
106 55
        $item->addListener(RunEvent::FAILED, [$this, 'onRunFailed']);
107
108 55
        $this->dispatch(PoolRunEvent::POOL_RUN_ADDED, new PoolRunEvent($this, $item));
109
110 55
        if ($status != 'waiting' && $this->state != static::STATE_RUNNING) {
111 1
            $this->setStarted();
112 1
            $this->dispatch(RunEvent::STARTED, new RunEvent($this));
113 1
        }
114 55
        if ($status == 'finished' && $this->state != static::STATE_NOT_RUNNING) {
115 1
            $this->setFinished();
116 1
            $this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
117 1
        }
118
119 55
        return $this;
120
    }
121
122
    /**
123
     * When a run starts, check our current state and start ourselves in required
124
     *
125
     * @param RunEvent $event
126
     */
127 37
    public function onRunStarted(RunEvent $event)
128
    {
129 37
        $index = array_search($event->getRun(), $this->waiting, true);
130 37
        if ($index !== false) {
131 37
            unset($this->waiting[$index]);
132 37
        }
133 37
        $this->running[] = $event->getRun();
134 37
        if ($this->state == static::STATE_NOT_STARTED) {
135 37
            $this->setStarted();
136 37
            $this->dispatch(RunEvent::STARTED, new RunEvent($this));
137 37
        }
138 37
        $this->dispatch(RunEvent::UPDATED, new RunEvent($this));
139 37
    }
140
141
    /**
142
     * When a run is completed, check if everything has finished
143
     *
144
     * @param RunEvent $event
145
     */
146 36
    public function onRunCompleted(RunEvent $event)
147
    {
148 36
        $index = array_search($event->getRun(), $this->running, true);
149 36
        if ($index !== false) {
150 35
            unset($this->running[$index]);
151 35
        }
152 36
        $this->complete[] = $event->getRun();
153 36
        $this->dispatch(RunEvent::UPDATED, new RunEvent($this));
154 36
        if (count($this->waiting) === 0 && count($this->running) === 0) {
155 34
            $this->setFinished();
156 34
            if ($this->isSuccessful()) {
157 28
                $this->dispatch(RunEvent::SUCCESSFUL, new RunEvent($this));
158 28
            } else {
159 6
                $this->dispatch(RunEvent::FAILED, new RunEvent($this));
160
            }
161 34
            $this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
162 34
        }
163 36
    }
164
165
    /**
166
     * Handle any errors returned from the child run
167
     *
168
     * @param RunEvent $event
169
     */
170 7
    public function onRunFailed(RunEvent $event)
171
    {
172 7
        $this->exceptions = array_merge($this->exceptions, $event->getRun()->getExceptions());
173 7
    }
174
175
    /**
176
     * Has this run been started before
177
     *
178
     * @return bool
179
     */
180 14
    public function hasStarted()
181
    {
182 14
        return $this->getState() !== static::STATE_NOT_STARTED;
183
    }
184
185
    /**
186
     * Start all non running children
187
     *
188
     * @return $this
189
     *
190
     * @throws \Graze\ParallelProcess\Exceptions\NotRunningException
191
     */
192 8
    public function start()
193
    {
194 8
        foreach ($this->items as $run) {
195 8
            if (!$run->hasStarted()) {
196 8
                $run->start();
197 8
            }
198 8
        }
199 8
        return $this;
200
    }
201
202
    /**
203
     * Was this run successful
204
     *
205
     * @return bool
206
     */
207 40
    public function isSuccessful()
208
    {
209 40
        if ($this->getState() === static::STATE_NOT_RUNNING) {
210 34
            foreach ($this->items as $run) {
211 34
                if (!$run->isSuccessful()) {
212 6
                    return false;
213
                }
214 29
            }
215 28
            return true;
216
        }
217 10
        return false;
218
    }
219
220
    /**
221
     * If the run was unsuccessful, get the error if applicable
222
     *
223
     * @return Exception[]|Throwable[]
224
     */
225 1
    public function getExceptions()
226
    {
227 1
        return $this->exceptions;
228
    }
229
230
    /**
231
     * We think this is running
232
     *
233
     * @return bool
234
     */
235 62
    public function isRunning()
236
    {
237 62
        return $this->getState() === static::STATE_RUNNING;
238
    }
239
240
    /**
241
     * Pools to see if this process is running
242
     *
243
     * @return bool
244
     */
245 35
    public function poll()
246
    {
247 35
        foreach ($this->running as $run) {
248 25
            $run->poll();
249 35
        }
250 35
        return $this->isRunning();
251
    }
252
253
    /**
254
     * Get a set of tags associated with this run
255
     *
256
     * @return array
257
     */
258 3
    public function getTags()
259
    {
260 3
        return $this->tags;
261
    }
262
263
    /**
264
     * @return float[]|null an array of values of the current position, max, and percentage. null if not applicable
265
     */
266 3
    public function getProgress()
267
    {
268 3
        return [count($this->complete), count($this->items), count($this->complete) / count($this->items)];
269
    }
270
271
    /**
272
     * @return string[]
273
     */
274 62
    protected function getEventNames()
275
    {
276
        return [
277 62
            RunEvent::STARTED,
278 62
            RunEvent::COMPLETED,
279 62
            RunEvent::SUCCESSFUL,
280 62
            RunEvent::FAILED,
281 62
            RunEvent::UPDATED,
282 62
            PoolRunEvent::POOL_RUN_ADDED,
283 62
            PriorityChangedEvent::CHANGED,
284 62
        ];
285
    }
286
287
    /**
288
     * Run this pool of runs and block until they are complete.
289
     *
290
     * Note this will run the parent pool
291
     *
292
     * @param float $interval
293
     *
294
     * @return bool `true` if all the runs were successful
295
     */
296 7
    public function run($interval = self::CHECK_INTERVAL)
297
    {
298 7
        $this->start();
299
300 7
        $sleep = (int) ($interval * 1000000);
301 7
        while ($this->poll()) {
302 1
            usleep($sleep);
303 1
        }
304
305 7
        return $this->isSuccessful();
306
    }
307
308
    /**
309
     * @return RunInterface[]
310
     */
311 7
    public function getWaiting()
312
    {
313 7
        return array_values($this->waiting);
314
    }
315
316
    /**
317
     * @return RunInterface[]
318
     */
319 6
    public function getRunning()
320
    {
321 6
        return array_values($this->running);
322
    }
323
324
    /**
325
     * @return RunInterface[]
326
     */
327 6
    public function getFinished()
328
    {
329 6
        return array_values($this->complete);
330
    }
331
}
332