Test Failed
Pull Request — master (#10)
by Alice
04:15
created

ThreadPool   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 268
Duplicated Lines 0 %

Test Coverage

Coverage 30.86%

Importance

Changes 0
Metric Value
eloc 82
dl 0
loc 268
ccs 25
cts 81
cp 0.3086
rs 9.68
c 0
b 0
f 0
wmc 34

19 Methods

Rating   Name   Duplication   Size   Complexity  
A getRunningThreads() 0 3 1
A getMaxRunningThreadNb() 0 3 1
A getToRunThreads() 0 3 1
A __construct() 0 8 1
A setMaxRunningThreadNb() 0 5 1
A initRun() 0 3 1
A resetRun() 0 8 2
A run() 0 10 2
A startRunStatus() 0 5 2
A addThread() 0 5 1
A isRunningThreads() 0 9 4
A checkEnv() 0 9 3
A getThreads() 0 3 1
A processThread() 0 18 3
A createThreadProcess() 0 20 3
A setThreads() 0 5 1
A waitOnThreads() 0 17 4
A isCli() 0 3 1
A __destruct() 0 3 1
1
<?php
2
3
namespace Wonderland\Thread;
4
5
use Wonderland\Thread\Event\PoolEvent;
6
use Wonderland\Thread\Exception\ThreadException;
7
8
class ThreadPool extends AbstractThreadPoolMediator
9
{
10
	// 0.2s
11
	private const SLEEP_TIME_MS = 50000;
12
13
	/** @var AbstractThread[] $childs */
14
	private $threads;
15
16
	/** @var AbstractThread[] $toRunThreads */
17
	private $toRunThreads;
18
19
	/** @var AbstractThread[] $runningChilds */
20
	private $runningThreads;
21
22
	/** @var bool $isRunning */
23
	private $isRunning;
24
25
	/** @var int $maxRunningThreadNb */
26
	private $maxRunningThreadNb;
27
28
	/**
29
	 * ThreadPool constructor.
30
	 */
31 6
	public function __construct()
32
	{
33 6
		parent::__construct();
34 6
		$this->threads = [];
35 6
		$this->runningThreads = [];
36 6
		$this->toRunThreads = [];
37 6
		$this->isRunning = false;
38 6
		$this->maxRunningThreadNb = 0;
39 6
	}
40
41
	/**
42
	 *
43
	 */
44
	public function __destruct()
45
	{
46
		pcntl_waitpid(-1, $status, WNOHANG);
47
	}
48
49
	/**
50
	 * @return AbstractThread[]
51
	 */
52
	public function getThreads(): array
53
	{
54
		return $this->threads;
55
	}
56
57
	/**
58
	 * @param AbstractThread[] $threads
59
	 * @return ThreadPool
60
	 */
61
	public function setThreads(array $threads): self
62
	{
63
		$this->threads = $threads;
64
65
		return $this;
66
	}
67
68
	/**
69
	 * @param AbstractThread $thread
70
	 * @return ThreadPool
71
	 */
72
	public function addThread(AbstractThread $thread): self
73
	{
74
		$this->threads[] = $thread;
75
76
		return $this;
77
	}
78
79
	/**
80
	 * @return int
81
	 */
82 2
	public function getMaxRunningThreadNb(): int
83
	{
84 2
		return $this->maxRunningThreadNb;
85
	}
86
87
	/**
88
	 * @param int $maxRunningThreadNb
89
	 * @return ThreadPool
90
	 */
91 1
	public function setMaxRunningThreadNb(int $maxRunningThreadNb): self
92
	{
93 1
		$this->maxRunningThreadNb = $maxRunningThreadNb;
94
95 1
		return $this;
96
	}
97
98
	/**
99
	 * @return AbstractThread[]
100
	 */
101 1
	public function getToRunThreads(): array
102
	{
103 1
		return $this->toRunThreads;
104
	}
105
106
	/**
107
	 * @return AbstractThread[]
108
	 */
109 1
	public function getRunningThreads(): array
110
	{
111 1
		return $this->runningThreads;
112
	}
113
114
	/**
115
	 * @throws ThreadException
116
	 */
117 1
	public function run()
118
	{
119 1
		$this->checkEnv();
120
		$this->initRun();
121
122
		while ($this->isRunningThreads()) {
123
			$this->waitOnThreads();
124
		}
125
126
		$this->resetRun();
127
	}
128
129
	/**
130
	 * @return bool
131
	 * @throws ThreadException
132
	 */
133
	private function isRunningThreads(): bool
134
	{
135
		if (count($this->toRunThreads) > 0) {
136
			while (count($this->runningThreads) < $this->maxRunningThreadNb && count($this->toRunThreads) > 0) {
137
				$this->createThreadProcess(array_shift($this->toRunThreads));
138
			}
139
		}
140
141
		return count($this->runningThreads) > 0;
142
	}
143
144
	/**
145
	 * can't test some part of it this since we can't unit-test in web and we're never in a child
146
	 * process when pid 0 when unit-testing since the coverage is done by the parent thread
147
	 *
148
	 * @param AbstractThread $thread
149
	 * @throws ThreadException
150
	 */
151
	private function createThreadProcess(AbstractThread $thread)
152
	{
153
		$pid = pcntl_fork();
154
155
		switch ($pid) {
156
			case -1: //error forking
157
				// @codeCoverageIgnoreStart
158
				throw new ThreadException('Error while trying to fork. Check your server installation');
159
				// @codeCoverageIgnoreEnd
160
			case 0: // child
161
				// @codeCoverageIgnoreStart
162
                $thread->setMediator($this->getMediator());
163
				$this->processThread($thread);
164
				break;
165
				// @codeCoverageIgnoreEnd
166
			default: //parent
167
				$thread->setPid($pid);
168
				$this->runningThreads[] = $thread;
169
				$this->notify(PoolEvent::POOL_NEW_THREAD, $thread);
170
				$this->startRunStatus();
171
		}
172
	}
173
174
	/**
175
	 *
176
	 */
177
	private function waitOnThreads()
178
	{
179
		$this->notify(PoolEvent::POOL_PRE_WAIT_TICK);
180
		foreach ($this->runningThreads as $k => $thread) {
181
182
			$res = pcntl_waitpid($thread->getPid(), $status, WNOHANG);
183
			$this->notify(PoolEvent::POOL_WAIT_TICK_PID);
184
185
			if ($res === -1 || $res > 0) {
186
				$this->notify(PoolEvent::POOL_WAIT_TICK_PID_REMOVED, $thread);
187
				unset($this->runningThreads[$k]);
188
			}
189
190
		}
191
		$this->notify(PoolEvent::POOL_POST_WAIT_TICK);
192
193
		usleep(self::SLEEP_TIME_MS);
194
	}
195
196
	/**
197
	 * @codeCoverageIgnore Can't test since this is only run in a child thread.. which doesnt' go throug the
198
	 * unit-test coverage which is only done in the main process
199
	 * @param AbstractThread $thread
200
	 * @throws ThreadException
201
	 */
202
	private function processThread(AbstractThread $thread)
203
	{
204
		$this->notify(PoolEvent::THREAD_PRE_PROCESS, $thread);
205
		$response = $thread->run();
206
		$this->notify(PoolEvent::THREAD_POST_PROCESS, $thread);
207
208
		switch ($response) {
209
			case AbstractThread::EXIT_STATUS_SUCCESS:
210
				$this->notify(PoolEvent::THREAD_EXIT_SUCCESS, $thread);
211
				break;
212
			case AbstractThread::EXIT_STATUS_ERROR:
213
				$this->notify(PoolEvent::THREAD_EXIT_ERROR, $thread);
214
				break;
215
			default:
216
				$this->notify(PoolEvent::THREAD_EXIT_UNKNOWN, $thread);
217
		}
218
219
		exit($response);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
220
	}
221
222
	/**
223
	 * Can't test the exception is not in cli since php-unit is only run in cli environment
224
	 * @throws ThreadException
225
	 */
226 1
	private function checkEnv()
227
	{
228 1
		if (false === $this->isCli()) {
229
			// @codeCoverageIgnoreStart
230
			throw new ThreadException('Error. It is not safe to use process forking in other way than php-cli');
231
			// @codeCoverageIgnoreEnd
232
		}
233 1
		if (0 === count($this->threads)) {
234 1
			throw new ThreadException('Error. Can\'t run child threads processes without any added in the Pool');
235
		}
236
	}
237
238
	/**
239
	 *
240
	 */
241
	private function initRun()
242
	{
243
		$this->resetRun();
244
	}
245
246
	/**
247
	 * @return bool
248
	 */
249 1
	private function isCli(): bool
250
	{
251 1
		return PHP_SAPI === 'cli';
252
	}
253
254
	/**
255
	 *
256
	 */
257
	private function startRunStatus()
258
	{
259
		if (false === $this->isRunning) {
260
			$this->notify(PoolEvent::POOL_RUN_START);
261
			$this->isRunning = true;
262
		}
263
	}
264
265
	/**
266
	 *
267
	 */
268
	private function resetRun()
269
	{
270
		if (true === $this->isRunning) {
271
			$this->notify(PoolEvent::POOL_RUN_STOP);
272
		}
273
		$this->isRunning = false;
274
		$this->toRunThreads = $this->threads;
275
		$this->runningThreads = [];
276
	}
277
278
}
279