Passed
Push — master ( b05304...84ccec )
by Harry
02:04
created

PriorityPool::add()   C

Complexity

Conditions 12
Paths 9

Size

Total Lines 37
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 12

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 37
ccs 23
cts 23
cp 1
rs 6.9666
c 0
b 0
f 0
cc 12
nc 9
nop 2
crap 12

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\ParallelProcess\Event\DispatcherInterface;
17
use Graze\ParallelProcess\Event\PoolRunEvent;
18
use Graze\ParallelProcess\Event\PriorityChangedEvent;
19
use Graze\ParallelProcess\Exceptions\NotRunningException;
20
use SplPriorityQueue;
21
use Symfony\Component\Process\Process;
22
23
/**
24
 * A PriorityPool allows you to manage how many, and the order on which to run child runs
25
 */
26
class PriorityPool extends Pool
27
{
28
    const NO_MAX = -1;
29
30
    /** @var SplPriorityQueue */
31
    protected $waitingQueue;
32
    /** @var int */
33
    private $maxSimultaneous = -1;
34
    /** @var bool */
35
    private $runInstantly = false;
36
    /** @var bool */
37
    private $initialised = false;
38
39
    /**
40
     * Pool constructor.
41
     *
42
     * @param RunInterface[]|Process[] $items
43
     * @param int                      $maxSimultaneous Maximum number of simultaneous processes
44
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
45
     *                                                  running
46
     * @param array                    $tags
47
     * @param float                    $priority
48
     */
49 49
    public function __construct(
50
        array $items = [],
51
        $maxSimultaneous = self::NO_MAX,
52
        $runInstantly = false,
53
        array $tags = [],
54
        $priority = 1.0
55
    ) {
56 49
        $this->maxSimultaneous = $maxSimultaneous;
57 49
        $this->runInstantly = $runInstantly;
58 49
        $this->waitingQueue = new SplPriorityQueue();
59 49
        $this->waitingQueue->setExtractFlags(SplPriorityQueue::EXTR_DATA);
60
61 49
        parent::__construct($items, $tags, $priority);
62
63 49
        $this->initialised = true;
64 49
        if ($this->isRunning() || $runInstantly) {
65 1
            $this->startNext();
66
        }
67 49
    }
68
69
    /**
70
     * Add a new process to the pool
71
     *
72
     * @param RunInterface|PoolInterface|Process $item
73
     * @param array                              $tags If a process is supplied, these are added to create a run.
74
     *                                                 This is ignored when adding a run
75
     *
76
     * @return $this
77
     */
78 39
    public function add($item, array $tags = [])
79
    {
80 39
        if ($item instanceof RunInterface
81 38
            && !($this->isRunning()
82 39
                 || $this->runInstantly)
83 39
            && $item->isRunning()) {
84 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
85
        }
86
87
        // special handling of child collections, a pool should only care about leaf nodes
88 38
        if ($item instanceof PoolInterface) {
89 1
            $item->addListener(
90 1
                PoolRunEvent::POOL_RUN_ADDED,
91 1
                function (PoolRunEvent $event) {
92 1
                    $this->add($event->getRun());
93 1
                }
94
            );
95 1
            foreach ($item->getAll() as $child) {
96 1
                $this->add($child);
97
            }
98 1
            return $this;
99
        }
100
101 38
        parent::add($item, $tags);
102
103 37
        if ($item instanceof RunInterface && !$item->hasStarted()) {
104 37
            $this->waitingQueue->insert($item, $item->getPriority());
105 37
            if ($this->runInstantly) {
106 2
                $this->startNext();
107
            }
108
        }
109
110 37
        if ($item instanceof PrioritisedInterface && $item instanceof DispatcherInterface) {
111 33
            $item->addListener(PriorityChangedEvent::CHANGED, [$this, 'onPriorityChanged']);
112
        }
113
114 37
        return $this;
115
    }
116
117
    /**
118
     * @param PriorityChangedEvent $event
119
     */
120 1
    public function onPriorityChanged(PriorityChangedEvent $event)
121
    {
122 1
        $index = array_search($event->getItem(), $this->waiting, true);
123 1
        if ($index !== false) {
124
            // we are unable to delete an item from a SplPriorityQueue, so we delete it and start again here
125 1
            $this->waitingQueue = new SplPriorityQueue();
126 1
            foreach ($this->waiting as $item) {
127 1
                $this->waitingQueue->insert($item, $item->getPriority());
128
            }
129
        }
130 1
    }
131
132
    /**
133
     * Start all the processes running
134
     *
135
     * @return $this
136
     */
137 3
    public function start()
138
    {
139 3
        $this->startNext();
140
141 3
        return $this;
142
    }
143
144
    /**
145
     * Blocking call to run processes;
146
     *
147
     * @param float $checkInterval Seconds between checks
148
     *
149
     * @return bool `true` if all the runs were successful
150
     */
151 23
    public function run($checkInterval = self::CHECK_INTERVAL)
152
    {
153 23
        $this->startNext();
154
155 23
        $interval = (int) ($checkInterval * 1000000);
156 23
        while ($this->poll()) {
157 21
            usleep($interval);
158
        }
159
160 23
        return $this->isSuccessful();
161
    }
162
163
    /**
164
     * @return bool
165
     */
166 28
    public function poll()
167
    {
168 28
        parent::poll();
169 28
        $this->startNext();
170 28
        return $this->isRunning();
171
    }
172
173
    /**
174
     * Actually start a run
175
     *
176
     * @param RunInterface $run
177
     */
178 29
    private function startRun(RunInterface $run)
179
    {
180 29
        $run->start();
181 29
    }
182
183
    /**
184
     * Check when a run has finished, if there are processes waiting, start them
185
     */
186 29
    private function startNext()
187
    {
188
        // this allows us to wait until all runs are added in the constructor before running any.
189
        // Thus preserving the priority they have supplied.
190 29
        if (!$this->initialised) {
191 1
            return;
192
        }
193
194 29
        if ($this->maxSimultaneous !== static::NO_MAX
195 29
            && $this->waitingQueue->valid()
196 29
            && count($this->running) < $this->maxSimultaneous) {
197 4
            for ($i = count($this->running); $i < $this->maxSimultaneous && $this->waitingQueue->valid(); $i++) {
198 4
                $this->startRun($this->waitingQueue->extract());
199
            }
200 27
        } elseif ($this->maxSimultaneous === static::NO_MAX) {
201 25
            while ($this->waitingQueue->valid()) {
202 25
                $this->startRun($this->waitingQueue->extract());
203
            }
204
        }
205 29
    }
206
207
    /**
208
     * @return int
209
     */
210 4
    public function getMaxSimultaneous()
211
    {
212 4
        return $this->maxSimultaneous;
213
    }
214
215
    /**
216
     * @param int $maxSimultaneous
217
     *
218
     * @return $this
219
     */
220 4
    public function setMaxSimultaneous($maxSimultaneous)
221
    {
222 4
        $this->maxSimultaneous = $maxSimultaneous;
223 4
        return $this;
224
    }
225
226
    /**
227
     * @return bool
228
     */
229 3
    public function isRunInstantly()
230
    {
231 3
        return $this->runInstantly;
232
    }
233
234
    /**
235
     * @param bool $runInstantly
236
     *
237
     * @return PriorityPool
238
     */
239 3
    public function setRunInstantly($runInstantly)
240
    {
241 3
        $this->runInstantly = $runInstantly;
242 3
        return $this;
243
    }
244
}
245