Pool   A
last analyzed

Complexity

Total Complexity 28

Size/Duplication

Total Lines 218
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 88.37%

Importance

Changes 0
Metric Value
wmc 28
lcom 1
cbo 4
dl 0
loc 218
ccs 76
cts 86
cp 0.8837
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A add() 0 20 3
A setWorkerInstance() 0 8 2
A rebuild() 0 12 2
A spawn() 0 13 2
A kill() 0 19 4
A killAll() 0 6 2
A boot() 0 16 4
A ping() 0 10 3
A getWorkers() 0 4 1
A setOutput() 0 4 1
A getOutput() 0 8 2
1
<?php
2
/**
3
 * Nyx
4
 *
5
 * (The MIT license)
6
 * Copyright (c) 2016 Luke Steadman
7
 *
8
 * Permission is hereby granted, free of charge, to any person obtaining a copy
9
 * of this software and associated * documentation files (the "Software"), to
10
 * deal in the Software without restriction, including without limitation the
11
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12
 * sell copies of the Software, and to permit persons to whom the Software is
13
 * furnished to do so, subject to the following conditions:
14
 * The above copyright notice and this permission notice shall be included in
15
 * all copies or substantial portions of the Software.
16
 *
17
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23
 * IN THE SOFTWARE.
24
 *
25
 * @package Nyx
26
 */
27
namespace Nyx;
28
29
class Pool implements PoolInterface, OutputableInterface
30
{
31
    /**
32
     * Default amount of workers to spawn
33
     *
34
     * @var int
35
     */
36
    const DEFAULT_WORKER_COUNT = 1;
37
38
    /**
39
     * Immutable. Number of workers this pool maintain
40
     *
41
     * @var int
42
     */
43
    protected $numberOfWorkers;
44
45
    /**
46
     * Pool of worker objects
47
     *
48
     * @var array
49
     */
50
    protected $workers = array();
51
52
    /**
53
     * @var WorkerInterface
54
     */
55
    protected $workerInstance;
56
57
    /**
58
     * @var OutputInterface
59
     */
60
    protected $output;
61
62
    /**
63
     * Timestamp of when the pool first started
64
     *
65
     * @var
66
     */
67
    protected $started;
68
69
    /**
70
     * Pool constructor.
71
     *
72
     * @param null $numberOfWorkers
73
     */
74 9
    public function __construct($numberOfWorkers = null)
75
    {
76 9
        if (is_null($numberOfWorkers)) {
77 9
            $numberOfWorkers = static::DEFAULT_WORKER_COUNT;
78 9
        }
79
80 9
        $this->numberOfWorkers = $numberOfWorkers;
81 9
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86 8
    public function add(WorkerInterface $worker)
87
    {
88
        try {
89 8
            $this->setWorkerInstance($worker);
90 8
        } catch (\Exception $e) {
91 6
            $this->getOutput()->write('[x] Unable to set worker instance: ' . $e->getMessage());
92
        }
93
94 8
        $cmd1 = $worker->getProcess()->getCommand()->getCmd();
95 8
        $cmd2 = $this->workerInstance->getProcess()->getCommand()->getCmd();
96
97 8
        if ($cmd1 === $cmd2) {
98 8
            $this->workers[] = $worker;
99 8
        } else {
100 1
            $msg = '$worker must be an instance of WorkerInterface and the same as';
101 1
            throw new \Exception($msg . get_class($worker));
102
        }
103
104 8
        return $this;
105
    }
106
107
    /**
108
     * {@inheritdoc}
109
     */
110 8
    public function setWorkerInstance(WorkerInterface $worker)
111
    {
112 8
        if (is_null($this->workerInstance)) {
113 8
            $this->workerInstance = clone $worker;
114 8
        } else {
115 7
            throw new \Exception('setWorkerInstance already called');
116
        }
117 8
    }
118
119
    /**
120
     * {@inheritdoc}
121
     */
122 1
    public function rebuild()
123
    {
124
        // Close
125 1
        $this->killAll();
126
127
        // Then spawn new ones
128 1
        for ($i = 0; $i < $this->numberOfWorkers; $i++) {
129 1
            $this->spawn();
130 1
        }
131
132 1
        return true;
133
    }
134
135
    /**
136
     * {@inheritdoc}
137
     */
138 5
    public function spawn()
139
    {
140
        // Create a clone worker
141 5
        if ($this->numberOfWorkers > count($this->getWorkers())) {
142 5
            $worker = clone $this->workerInstance;
143 5
            $this->add($worker);
144 5
            $worker->start();
145 5
        } else {
146 1
            throw new \Exception('Maximum number of workers allocated for this pool ('.$this->numberOfWorkers.').');
147
        }
148
149 5
        return $this;
150
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155 2
    public function kill($pid = null)
156
    {
157 2
        $worker = false;
158
159 2
        if (is_null($pid)) {
160 1
            $worker = reset($this->workers);
161 1
            $pid = key($this->workers);
162 2
        } elseif (array_key_exists($pid, $this->workers)) {
163 1
            $worker = $this->workers[$pid];
164 1
        }
165
166 2
        if ($worker) {
167 2
            $worker->getProcess()->close();
168 2
            unset($this->workers[$pid]);
169 2
            $this->getOutput()->write('[x] Worker closed.');
170 2
        }
171
172 2
        return $this;
173
    }
174
175
    /**
176
     * {@inheritdoc}
177
     */
178 1
    public function killAll()
179
    {
180 1
        foreach ($this->workers as $pid => $worker) {
181 1
            $this->kill($pid);
182 1
        }
183 1
    }
184
185
    /**
186
     * {@inheritdoc}
187
     */
188 3
    public function boot()
189
    {
190 3
        if (!$this->started) {
191
            // Spawn the amount of workers required, if not already set.
192 3
            while (count($this->workers) < $this->numberOfWorkers) {
193 3
                $this->spawn();
194 3
            }
195
196 3
            foreach ($this->workers as $worker) {
197
                // Only start the worker if it isn't running
198 3
                $worker->start();
199 3
            }
200
201 3
            $this->started = true;
202 3
        }
203 3
    }
204
205
    /**
206
     * {@inheritdoc}
207
     */
208
    public function ping()
209
    {
210
        foreach ($this->getWorkers() as $pid => $worker) {
211
            // If the worker has stopped, restart it
212
            if (!$worker->getProcess()->isRunning()) {
213
                $this->getOutput()->write('[x] Worker stopped. Spawning a new one.');
214
                $this->kill($pid)->spawn();
215
            }
216
        }
217
    }
218
219
    /**
220
     * {@inheritdoc}
221
     */
222 7
    public function getWorkers()
223
    {
224 7
        return (array) $this->workers;
225
    }
226
227
    /**
228
     * {@inheritdoc}
229
     */
230 9
    public function setOutput(OutputInterface $output)
231
    {
232 9
        $this->output = $output;
233 9
    }
234
235
    /**
236
     * {@inheritdoc}
237
     */
238 6
    public function getOutput()
239
    {
240 6
        if (is_null($this->output)) {
241
            $this->setOutput(new Console());
242
        }
243
244 6
        return $this->output;
245
    }
246
}
247