Passed
Pull Request — master (#19)
by Harry
04:08 queued 12s
created

PriorityPool::start()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
cc 1
nc 1
nop 0
crap 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\ParallelProcess\Event\PoolRunEvent;
17
use Graze\ParallelProcess\Event\PriorityChangedEvent;
18
use Graze\ParallelProcess\Exceptions\NotRunningException;
19
use SplPriorityQueue;
20
use Symfony\Component\Process\Process;
21
22
/**
23
 * A PriorityPool allows you to manage how many, and the order on which to run child runs
24
 */
25
class PriorityPool extends Pool
26
{
27
    const NO_MAX = -1;
28
29
    /** @var SplPriorityQueue */
30
    protected $waitingQueue;
31
    /** @var int */
32
    private $maxSimultaneous = -1;
33
    /** @var bool */
34
    private $runInstantly = false;
35
    /** @var bool */
36
    private $initialised = false;
37
38
    /**
39
     * Pool constructor.
40
     *
41
     * @param RunInterface[]|Process[] $items
42
     * @param int                      $maxSimultaneous Maximum number of simultaneous processes
43
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
44
     *                                                  running
45
     * @param array                    $tags
46
     * @param float                    $priority
47
     */
48 49
    public function __construct(
49
        array $items = [],
50
        $maxSimultaneous = self::NO_MAX,
51
        $runInstantly = false,
52
        array $tags = [],
53
        $priority = 1.0
54
    ) {
55 49
        $this->maxSimultaneous = $maxSimultaneous;
56 49
        $this->runInstantly = $runInstantly;
57 49
        $this->waitingQueue = new SplPriorityQueue();
58 49
        $this->waitingQueue->setExtractFlags(SplPriorityQueue::EXTR_DATA);
59
60 49
        parent::__construct($items, $tags, $priority);
61
62 49
        $this->initialised = true;
63 49
        if ($this->isRunning() || $runInstantly) {
64 1
            $this->startNext();
65
        }
66 49
    }
67
68
    /**
69
     * Add a new process to the pool
70
     *
71
     * @param RunInterface|PoolInterface|Process $item
72
     * @param array                              $tags If a process is supplied, these are added to create a run.
73
     *                                                 This is ignored when adding a run
74
     *
75
     * @return $this
76
     */
77 39
    public function add($item, array $tags = [])
78
    {
79 39
        if ($item instanceof RunInterface
80 38
            && !($this->isRunning()
81 39
                 || $this->runInstantly)
82 39
            && $item->isRunning()) {
83 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
84
        }
85
86
        // special handling of child collections, a pool should only care about leaf nodes
87 38
        if ($item instanceof PoolInterface) {
88 1
            $item->addListener(
89 1
                PoolRunEvent::POOL_RUN_ADDED,
90 1
                function (PoolRunEvent $event) {
91 1
                    $this->add($event->getRun());
92 1
                }
93
            );
94 1
            foreach ($item->getAll() as $child) {
95 1
                $this->add($child);
96
            }
97 1
            return $this;
98
        }
99
100 38
        parent::add($item, $tags);
101
102 37
        if ($item instanceof RunInterface && !$item->hasStarted()) {
103 37
            $this->waitingQueue->insert($item, $item->getPriority());
104 37
            if ($this->runInstantly) {
105 2
                $this->startNext();
106
            }
107
        }
108
109 37
        if ($item instanceof PrioritisedInterface) {
110 33
            $item->addListener(PriorityChangedEvent::CHANGED, [$this, 'onPriorityChanged']);
0 ignored issues
show
Bug introduced by
The method addListener() does not exist on Graze\ParallelProcess\PrioritisedInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Graze\ParallelProcess\PrioritisedInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

110
            $item->/** @scrutinizer ignore-call */ 
111
                   addListener(PriorityChangedEvent::CHANGED, [$this, 'onPriorityChanged']);
Loading history...
Bug introduced by
The method addListener() does not exist on Symfony\Component\Process\Process. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

110
            $item->/** @scrutinizer ignore-call */ 
111
                   addListener(PriorityChangedEvent::CHANGED, [$this, 'onPriorityChanged']);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
111
        }
112
113 37
        return $this;
114
    }
115
116
    /**
117
     * @param PriorityChangedEvent $event
118
     */
119 1
    public function onPriorityChanged(PriorityChangedEvent $event)
120
    {
121 1
        $index = array_search($event->getItem(), $this->waiting, true);
122 1
        if ($index !== false) {
123
            // we are unable to delete an item from a SplPriorityQueue, so we delete it and start again here
124 1
            $this->waitingQueue = new SplPriorityQueue();
125 1
            foreach ($this->waiting as $item) {
126 1
                $this->waitingQueue->insert($item, $item->getPriority());
127
            }
128
        }
129 1
    }
130
131
    /**
132
     * Start all the processes running
133
     *
134
     * @return $this
135
     */
136 3
    public function start()
137
    {
138 3
        $this->startNext();
139
140 3
        return $this;
141
    }
142
143
    /**
144
     * Blocking call to run processes;
145
     *
146
     * @param float $checkInterval Seconds between checks
147
     *
148
     * @return bool `true` if all the runs were successful
149
     */
150 23
    public function run($checkInterval = self::CHECK_INTERVAL)
151
    {
152 23
        $this->startNext();
153
154 23
        $interval = (int) ($checkInterval * 1000000);
155 23
        while ($this->poll()) {
156 21
            usleep($interval);
157
        }
158
159 23
        return $this->isSuccessful();
160
    }
161
162
    /**
163
     * @return bool
164
     */
165 28
    public function poll()
166
    {
167 28
        parent::poll();
168 28
        $this->startNext();
169 28
        return $this->isRunning();
170
    }
171
172
    /**
173
     * Actually start a run
174
     *
175
     * @param RunInterface $run
176
     */
177 29
    private function startRun(RunInterface $run)
178
    {
179 29
        $run->start();
180 29
    }
181
182
    /**
183
     * Check when a run has finished, if there are processes waiting, start them
184
     */
185 29
    private function startNext()
186
    {
187
        // this allows us to wait until all runs are added in the constructor before running any.
188
        // Thus preserving the priority they have supplied.
189 29
        if (!$this->initialised) {
190 1
            return;
191
        }
192
193 29
        if ($this->maxSimultaneous !== static::NO_MAX
194 29
            && $this->waitingQueue->valid()
195 29
            && count($this->running) < $this->maxSimultaneous) {
196 4
            for ($i = count($this->running); $i < $this->maxSimultaneous && $this->waitingQueue->valid(); $i++) {
197 4
                $this->startRun($this->waitingQueue->extract());
198
            }
199 27
        } elseif ($this->maxSimultaneous === static::NO_MAX) {
200 25
            while ($this->waitingQueue->valid()) {
201 25
                $this->startRun($this->waitingQueue->extract());
202
            }
203
        }
204 29
    }
205
206
    /**
207
     * @return int
208
     */
209 4
    public function getMaxSimultaneous()
210
    {
211 4
        return $this->maxSimultaneous;
212
    }
213
214
    /**
215
     * @param int $maxSimultaneous
216
     *
217
     * @return $this
218
     */
219 4
    public function setMaxSimultaneous($maxSimultaneous)
220
    {
221 4
        $this->maxSimultaneous = $maxSimultaneous;
222 4
        return $this;
223
    }
224
225
    /**
226
     * @return bool
227
     */
228 3
    public function isRunInstantly()
229
    {
230 3
        return $this->runInstantly;
231
    }
232
233
    /**
234
     * @param bool $runInstantly
235
     *
236
     * @return PriorityPool
237
     */
238 3
    public function setRunInstantly($runInstantly)
239
    {
240 3
        $this->runInstantly = $runInstantly;
241 3
        return $this;
242
    }
243
}
244