| 1 | <?php |
||
| 2 | /** |
||
| 3 | * This file is part of graze/parallel-process. |
||
| 4 | * |
||
| 5 | * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com> |
||
| 6 | * |
||
| 7 | * For the full copyright and license information, please view the LICENSE |
||
| 8 | * file that was distributed with this source code. |
||
| 9 | * |
||
| 10 | * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md |
||
| 11 | * @link https://github.com/graze/parallel-process |
||
| 12 | */ |
||
| 13 | |||
| 14 | namespace Graze\ParallelProcess; |
||
| 15 | |||
| 16 | use Exception; |
||
| 17 | use Graze\DataStructure\Collection\Collection; |
||
| 18 | use Graze\ParallelProcess\Event\EventDispatcherTrait; |
||
| 19 | use Graze\ParallelProcess\Event\PoolRunEvent; |
||
| 20 | use Graze\ParallelProcess\Event\PriorityChangedEvent; |
||
| 21 | use Graze\ParallelProcess\Event\RunEvent; |
||
| 22 | use InvalidArgumentException; |
||
| 23 | use Symfony\Component\Process\Process; |
||
| 24 | use Throwable; |
||
| 25 | |||
| 26 | /** |
||
| 27 | * Class Pool |
||
| 28 | * |
||
| 29 | * A Pool is a arbitrary collection of runs that can be used to group runs together when displaying with a |
||
| 30 | * Table |
||
| 31 | * |
||
| 32 | * A Pool can transition from `not_running` back to `running` again. But it cannot transition back to `not_started`. |
||
| 33 | * This means that multiple `COMPLETED` and `STARTED` events can be sent out for a single pool. |
||
| 34 | * |
||
| 35 | * ``` |
||
| 36 | * not_started -> running <-> not_running |
||
| 37 | * ``` |
||
| 38 | * |
||
| 39 | * @package Graze\ParallelProcess |
||
| 40 | */ |
||
| 41 | class Pool extends Collection implements RunInterface, PoolInterface, PrioritisedInterface |
||
| 42 | { |
||
| 43 | use EventDispatcherTrait; |
||
| 44 | use RunningStateTrait; |
||
| 45 | use PrioritisedTrait; |
||
| 46 | |||
| 47 | /** @var RunInterface[] */ |
||
| 48 | protected $items = []; |
||
| 49 | /** @var RunInterface[] */ |
||
| 50 | protected $waiting = []; |
||
| 51 | /** @var RunInterface[] */ |
||
| 52 | protected $running = []; |
||
| 53 | /** @var RunInterface[] */ |
||
| 54 | protected $complete = []; |
||
| 55 | /** @var Exception[]|Throwable[] */ |
||
| 56 | private $exceptions = []; |
||
| 57 | /** @var array */ |
||
| 58 | private $tags; |
||
| 59 | |||
| 60 | /** |
||
| 61 | * RunCollection constructor. |
||
| 62 | * |
||
| 63 | * @param RunInterface[] $runs |
||
| 64 | * @param array $tags |
||
| 65 | * @param float $priority |
||
| 66 | */ |
||
| 67 | 73 | public function __construct(array $runs = [], array $tags = [], $priority = 1.0) |
|
| 68 | { |
||
| 69 | 73 | parent::__construct([]); |
|
| 70 | |||
| 71 | 73 | $this->tags = $tags; |
|
| 72 | |||
| 73 | 73 | array_map([$this, 'add'], $runs); |
|
| 74 | 73 | $this->priority = $priority; |
|
| 75 | 73 | } |
|
| 76 | |||
| 77 | /** |
||
| 78 | * @param RunInterface|Process $item |
||
| 79 | * @param array $tags |
||
| 80 | * |
||
| 81 | * @return $this |
||
| 82 | */ |
||
| 83 | 57 | public function add($item, array $tags = []) |
|
| 84 | { |
||
| 85 | 57 | if ($item instanceof Process) { |
|
| 86 | 32 | return $this->add(new ProcessRun($item, $tags)); |
|
| 87 | } |
||
| 88 | 57 | if (!$item instanceof RunInterface) { |
|
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
| 89 | 2 | throw new InvalidArgumentException('item must implement `RunInterface`'); |
|
| 90 | } |
||
| 91 | |||
| 92 | 55 | parent::add($item); |
|
| 93 | 55 | $status = 'waiting'; |
|
| 94 | 55 | if ($item->isRunning()) { |
|
| 95 | 1 | $status = 'running'; |
|
| 96 | 1 | $this->running[] = $item; |
|
| 97 | 55 | } elseif ($item->hasStarted()) { |
|
| 98 | 1 | $status = 'finished'; |
|
| 99 | 1 | $this->complete[] = $item; |
|
| 100 | 1 | } else { |
|
| 101 | 54 | $this->waiting[] = $item; |
|
| 102 | } |
||
| 103 | |||
| 104 | 55 | $item->addListener(RunEvent::STARTED, [$this, 'onRunStarted']); |
|
| 105 | 55 | $item->addListener(RunEvent::COMPLETED, [$this, 'onRunCompleted']); |
|
| 106 | 55 | $item->addListener(RunEvent::FAILED, [$this, 'onRunFailed']); |
|
| 107 | |||
| 108 | 55 | $this->dispatch(PoolRunEvent::POOL_RUN_ADDED, new PoolRunEvent($this, $item)); |
|
| 109 | |||
| 110 | 55 | if ($status != 'waiting' && $this->state != static::STATE_RUNNING) { |
|
| 111 | 1 | $this->setStarted(); |
|
| 112 | 1 | $this->dispatch(RunEvent::STARTED, new RunEvent($this)); |
|
| 113 | 1 | } |
|
| 114 | 55 | if ($status == 'finished' && $this->state != static::STATE_NOT_RUNNING) { |
|
| 115 | 1 | $this->setFinished(); |
|
| 116 | 1 | $this->dispatch(RunEvent::COMPLETED, new RunEvent($this)); |
|
| 117 | 1 | } |
|
| 118 | |||
| 119 | 55 | return $this; |
|
| 120 | } |
||
| 121 | |||
| 122 | /** |
||
| 123 | * When a run starts, check our current state and start ourselves in required |
||
| 124 | * |
||
| 125 | * @param RunEvent $event |
||
| 126 | */ |
||
| 127 | 37 | public function onRunStarted(RunEvent $event) |
|
| 128 | { |
||
| 129 | 37 | $index = array_search($event->getRun(), $this->waiting, true); |
|
| 130 | 37 | if ($index !== false) { |
|
| 131 | 37 | unset($this->waiting[$index]); |
|
| 132 | 37 | } |
|
| 133 | 37 | $this->running[] = $event->getRun(); |
|
| 134 | 37 | if ($this->state == static::STATE_NOT_STARTED) { |
|
| 135 | 37 | $this->setStarted(); |
|
| 136 | 37 | $this->dispatch(RunEvent::STARTED, new RunEvent($this)); |
|
| 137 | 37 | } |
|
| 138 | 37 | $this->dispatch(RunEvent::UPDATED, new RunEvent($this)); |
|
| 139 | 37 | } |
|
| 140 | |||
| 141 | /** |
||
| 142 | * When a run is completed, check if everything has finished |
||
| 143 | * |
||
| 144 | * @param RunEvent $event |
||
| 145 | */ |
||
| 146 | 36 | public function onRunCompleted(RunEvent $event) |
|
| 147 | { |
||
| 148 | 36 | $index = array_search($event->getRun(), $this->running, true); |
|
| 149 | 36 | if ($index !== false) { |
|
| 150 | 35 | unset($this->running[$index]); |
|
| 151 | 35 | } |
|
| 152 | 36 | $this->complete[] = $event->getRun(); |
|
| 153 | 36 | $this->dispatch(RunEvent::UPDATED, new RunEvent($this)); |
|
| 154 | 36 | if (count($this->waiting) === 0 && count($this->running) === 0) { |
|
| 155 | 34 | $this->setFinished(); |
|
| 156 | 34 | if ($this->isSuccessful()) { |
|
| 157 | 28 | $this->dispatch(RunEvent::SUCCESSFUL, new RunEvent($this)); |
|
| 158 | 28 | } else { |
|
| 159 | 6 | $this->dispatch(RunEvent::FAILED, new RunEvent($this)); |
|
| 160 | } |
||
| 161 | 34 | $this->dispatch(RunEvent::COMPLETED, new RunEvent($this)); |
|
| 162 | 34 | } |
|
| 163 | 36 | } |
|
| 164 | |||
| 165 | /** |
||
| 166 | * Handle any errors returned from the child run |
||
| 167 | * |
||
| 168 | * @param RunEvent $event |
||
| 169 | */ |
||
| 170 | 7 | public function onRunFailed(RunEvent $event) |
|
| 171 | { |
||
| 172 | 7 | $this->exceptions = array_merge($this->exceptions, $event->getRun()->getExceptions()); |
|
| 173 | 7 | } |
|
| 174 | |||
| 175 | /** |
||
| 176 | * Has this run been started before |
||
| 177 | * |
||
| 178 | * @return bool |
||
| 179 | */ |
||
| 180 | 14 | public function hasStarted() |
|
| 181 | { |
||
| 182 | 14 | return $this->getState() !== static::STATE_NOT_STARTED; |
|
| 183 | } |
||
| 184 | |||
| 185 | /** |
||
| 186 | * Start all non running children |
||
| 187 | * |
||
| 188 | * @return $this |
||
| 189 | * |
||
| 190 | * @throws \Graze\ParallelProcess\Exceptions\NotRunningException |
||
| 191 | */ |
||
| 192 | 8 | public function start() |
|
| 193 | { |
||
| 194 | 8 | foreach ($this->items as $run) { |
|
| 195 | 8 | if (!$run->hasStarted()) { |
|
| 196 | 8 | $run->start(); |
|
| 197 | 8 | } |
|
| 198 | 8 | } |
|
| 199 | 8 | return $this; |
|
| 200 | } |
||
| 201 | |||
| 202 | /** |
||
| 203 | * Was this run successful |
||
| 204 | * |
||
| 205 | * @return bool |
||
| 206 | */ |
||
| 207 | 40 | public function isSuccessful() |
|
| 208 | { |
||
| 209 | 40 | if ($this->getState() === static::STATE_NOT_RUNNING) { |
|
| 210 | 34 | foreach ($this->items as $run) { |
|
| 211 | 34 | if (!$run->isSuccessful()) { |
|
| 212 | 6 | return false; |
|
| 213 | } |
||
| 214 | 29 | } |
|
| 215 | 28 | return true; |
|
| 216 | } |
||
| 217 | 10 | return false; |
|
| 218 | } |
||
| 219 | |||
| 220 | /** |
||
| 221 | * If the run was unsuccessful, get the error if applicable |
||
| 222 | * |
||
| 223 | * @return Exception[]|Throwable[] |
||
| 224 | */ |
||
| 225 | 1 | public function getExceptions() |
|
| 226 | { |
||
| 227 | 1 | return $this->exceptions; |
|
| 228 | } |
||
| 229 | |||
| 230 | /** |
||
| 231 | * We think this is running |
||
| 232 | * |
||
| 233 | * @return bool |
||
| 234 | */ |
||
| 235 | 62 | public function isRunning() |
|
| 236 | { |
||
| 237 | 62 | return $this->getState() === static::STATE_RUNNING; |
|
| 238 | } |
||
| 239 | |||
| 240 | /** |
||
| 241 | * Pools to see if this process is running |
||
| 242 | * |
||
| 243 | * @return bool |
||
| 244 | */ |
||
| 245 | 35 | public function poll() |
|
| 246 | { |
||
| 247 | 35 | foreach ($this->running as $run) { |
|
| 248 | 25 | $run->poll(); |
|
| 249 | 35 | } |
|
| 250 | 35 | return $this->isRunning(); |
|
| 251 | } |
||
| 252 | |||
| 253 | /** |
||
| 254 | * Get a set of tags associated with this run |
||
| 255 | * |
||
| 256 | * @return array |
||
| 257 | */ |
||
| 258 | 3 | public function getTags() |
|
| 259 | { |
||
| 260 | 3 | return $this->tags; |
|
| 261 | } |
||
| 262 | |||
| 263 | /** |
||
| 264 | * @return float[]|null an array of values of the current position, max, and percentage. null if not applicable |
||
| 265 | */ |
||
| 266 | 3 | public function getProgress() |
|
| 267 | { |
||
| 268 | 3 | return [count($this->complete), count($this->items), count($this->complete) / count($this->items)]; |
|
| 269 | } |
||
| 270 | |||
| 271 | /** |
||
| 272 | * @return string[] |
||
| 273 | */ |
||
| 274 | 62 | protected function getEventNames() |
|
| 275 | { |
||
| 276 | return [ |
||
| 277 | 62 | RunEvent::STARTED, |
|
| 278 | 62 | RunEvent::COMPLETED, |
|
| 279 | 62 | RunEvent::SUCCESSFUL, |
|
| 280 | 62 | RunEvent::FAILED, |
|
| 281 | 62 | RunEvent::UPDATED, |
|
| 282 | 62 | PoolRunEvent::POOL_RUN_ADDED, |
|
| 283 | 62 | PriorityChangedEvent::CHANGED, |
|
| 284 | 62 | ]; |
|
| 285 | } |
||
| 286 | |||
| 287 | /** |
||
| 288 | * Run this pool of runs and block until they are complete. |
||
| 289 | * |
||
| 290 | * Note this will run the parent pool |
||
| 291 | * |
||
| 292 | * @param float $interval |
||
| 293 | * |
||
| 294 | * @return bool `true` if all the runs were successful |
||
| 295 | */ |
||
| 296 | 7 | public function run($interval = self::CHECK_INTERVAL) |
|
| 297 | { |
||
| 298 | 7 | $this->start(); |
|
| 299 | |||
| 300 | 7 | $sleep = (int) ($interval * 1000000); |
|
| 301 | 7 | while ($this->poll()) { |
|
| 302 | 1 | usleep($sleep); |
|
| 303 | 1 | } |
|
| 304 | |||
| 305 | 7 | return $this->isSuccessful(); |
|
| 306 | } |
||
| 307 | |||
| 308 | /** |
||
| 309 | * @return RunInterface[] |
||
| 310 | */ |
||
| 311 | 7 | public function getWaiting() |
|
| 312 | { |
||
| 313 | 7 | return array_values($this->waiting); |
|
| 314 | } |
||
| 315 | |||
| 316 | /** |
||
| 317 | * @return RunInterface[] |
||
| 318 | */ |
||
| 319 | 6 | public function getRunning() |
|
| 320 | { |
||
| 321 | 6 | return array_values($this->running); |
|
| 322 | } |
||
| 323 | |||
| 324 | /** |
||
| 325 | * @return RunInterface[] |
||
| 326 | */ |
||
| 327 | 6 | public function getFinished() |
|
| 328 | { |
||
| 329 | 6 | return array_values($this->complete); |
|
| 330 | } |
||
| 331 | } |
||
| 332 |