Passed
Pull Request — master (#18)
by Harry
04:35 queued 23s
created

PriorityPool   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 198
Duplicated Lines 0 %

Test Coverage

Coverage 88.41%

Importance

Changes 0
Metric Value
wmc 32
eloc 60
dl 0
loc 198
rs 9.84
c 0
b 0
f 0
ccs 61
cts 69
cp 0.8841

11 Methods

Rating   Name   Duplication   Size   Complexity  
B add() 0 33 11
A poll() 0 5 1
A setRunInstantly() 0 4 1
A setMaxSimultaneous() 0 4 1
A startRun() 0 3 1
A run() 0 10 2
A __construct() 0 17 3
A getMaxSimultaneous() 0 3 1
A start() 0 5 1
B startNext() 0 17 9
A isRunInstantly() 0 3 1
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess;
15
16
use Graze\ParallelProcess\Event\PoolRunEvent;
17
use Graze\ParallelProcess\Exceptions\NotRunningException;
18
use SplPriorityQueue;
19
use Symfony\Component\Process\Process;
20
21
/**
22
 * A PriorityPool allows you to manage how many, and the order on which to run child runs
23
 */
24
class PriorityPool extends Pool
25
{
26
    const NO_MAX = -1;
27
28
    /** @var SplPriorityQueue */
29
    protected $waitingQueue;
30
    /** @var int */
31
    private $maxSimultaneous = -1;
32
    /** @var bool */
33
    private $runInstantly = false;
34
    /** @var bool */
35
    private $initialised = false;
36
37
    /**
38
     * Pool constructor.
39
     *
40
     * @param RunInterface[]|Process[] $items
41
     * @param int                      $maxSimultaneous Maximum number of simultaneous processes
42
     * @param bool                     $runInstantly    Run any added processes immediately if they are not already
43
     *                                                  running
44
     * @param array                    $tags
45
     * @param float                    $priority
46
     */
47 47
    public function __construct(
48
        array $items = [],
49
        $maxSimultaneous = self::NO_MAX,
50
        $runInstantly = false,
51
        array $tags = [],
52
        $priority = 1.0
53
    ) {
54 47
        $this->maxSimultaneous = $maxSimultaneous;
55 47
        $this->runInstantly = $runInstantly;
56 47
        $this->waitingQueue = new SplPriorityQueue();
57 47
        $this->waitingQueue->setExtractFlags(SplPriorityQueue::EXTR_DATA);
58
59 47
        parent::__construct($items, $tags, $priority);
60
61 47
        $this->initialised = true;
62 47
        if ($this->isRunning() || $runInstantly) {
63 1
            $this->startNext();
64
        }
65 47
    }
66
67
    /**
68
     * Add a new process to the pool
69
     *
70
     * @param RunInterface|PoolInterface|Process $item
71
     * @param array                              $tags If a process is supplied, these are added to create a run.
72
     *                                                 This is ignored when adding a run
73
     *
74
     * @return $this
75
     */
76 37
    public function add($item, array $tags = [])
77
    {
78 37
        if ($item instanceof RunInterface
79 36
            && !($this->isRunning()
80 37
                 || $this->runInstantly)
81 37
            && $item->isRunning()) {
82 1
            throw new NotRunningException("add: unable to add a running item when the pool has not started");
83
        }
84
85
        // special handling of child collections, a pool should only care about leaf nodes
86 36
        if ($item instanceof PoolInterface) {
87
            $item->addListener(
88
                PoolRunEvent::POOL_RUN_ADDED,
89
                function (PoolRunEvent $event) {
90
                    $this->add($event->getRun());
91
                }
92
            );
93
            foreach ($item->getAll() as $child) {
94
                $this->add($child);
95
            }
96
            return $this;
97
        }
98
99 36
        parent::add($item, $tags);
100
101 35
        if ($item instanceof RunInterface && !$item->hasStarted()) {
102 35
            $this->waitingQueue->insert($item, $item->getPriority());
103 35
            if ($this->isRunning() || $this->runInstantly) {
104 3
                $this->startNext();
105
            }
106
        }
107
108 35
        return $this;
109
    }
110
111
    /**
112
     * Start all the processes running
113
     *
114
     * @return $this
115
     */
116 2
    public function start()
117
    {
118 2
        $this->startNext();
119
120 2
        return $this;
121
    }
122
123
    /**
124
     * Blocking call to run processes;
125
     *
126
     * @param float $checkInterval Seconds between checks
127
     *
128
     * @return bool `true` if all the runs were successful
129
     */
130 23
    public function run($checkInterval = self::CHECK_INTERVAL)
131
    {
132 23
        $this->startNext();
133
134 23
        $interval = (int) ($checkInterval * 1000000);
135 23
        while ($this->poll()) {
136 21
            usleep($interval);
137
        }
138
139 23
        return $this->isSuccessful();
140
    }
141
142
    /**
143
     * @return bool
144
     */
145 27
    public function poll()
146
    {
147 27
        parent::poll();
148 27
        $this->startNext();
149 27
        return $this->isRunning();
150
    }
151
152
    /**
153
     * Actually start a run
154
     *
155
     * @param RunInterface $run
156
     */
157 28
    private function startRun(RunInterface $run)
158
    {
159 28
        $run->start();
160 28
    }
161
162
    /**
163
     * Check when a run has finished, if there are processes waiting, start them
164
     */
165 28
    private function startNext()
166
    {
167
        // this allows us to wait until all runs are added in the constructor before running any.
168
        // Thus preserving the priority they have supplied.
169 28
        if (!$this->initialised) {
170 1
            return;
171
        }
172
173 28
        if ($this->maxSimultaneous !== static::NO_MAX
174 28
            && $this->waitingQueue->valid()
175 28
            && count($this->running) < $this->maxSimultaneous) {
176 3
            for ($i = count($this->running); $i < $this->maxSimultaneous && $this->waitingQueue->valid(); $i++) {
177 3
                $this->startRun($this->waitingQueue->extract());
178
            }
179 28
        } elseif ($this->maxSimultaneous === static::NO_MAX) {
180 25
            while ($this->waitingQueue->valid()) {
181 25
                $this->startRun($this->waitingQueue->extract());
182
            }
183
        }
184 28
    }
185
186
    /**
187
     * @return int
188
     */
189 4
    public function getMaxSimultaneous()
190
    {
191 4
        return $this->maxSimultaneous;
192
    }
193
194
    /**
195
     * @param int $maxSimultaneous
196
     *
197
     * @return $this
198
     */
199 3
    public function setMaxSimultaneous($maxSimultaneous)
200
    {
201 3
        $this->maxSimultaneous = $maxSimultaneous;
202 3
        return $this;
203
    }
204
205
    /**
206
     * @return bool
207
     */
208 3
    public function isRunInstantly()
209
    {
210 3
        return $this->runInstantly;
211
    }
212
213
    /**
214
     * @param bool $runInstantly
215
     *
216
     * @return PriorityPool
217
     */
218 3
    public function setRunInstantly($runInstantly)
219
    {
220 3
        $this->runInstantly = $runInstantly;
221 3
        return $this;
222
    }
223
}
224