Test Failed
Pull Request — master (#10)
by Alice
01:55
created

ThreadPool::resetRun()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 0
dl 0
loc 8
ccs 0
cts 0
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Wonderland\Thread;
4
5
use Wonderland\Thread\Event\PoolEvent;
6
use Wonderland\Thread\Exception\ThreadException;
7
8
class ThreadPool extends AbstractThreadPoolMediator
9
{
10
	// 0.2s
11
	private const SLEEP_TIME_MS = 50000;
12
13
	/** @var AbstractThread[] $childs */
14
	private $threads;
15
16
	/** @var AbstractThread[] $toRunThreads */
17
	private $toRunThreads;
18
19
	/** @var AbstractThread[] $runningChilds */
20
	private $runningThreads;
21
22
	/** @var bool $isRunning */
23
	private $isRunning;
24
25
	/** @var int $maxRunningThreadNb */
26
	private $maxRunningThreadNb;
27
28
	/**
29
	 * ThreadPool constructor.
30
	 */
31 9
	public function __construct()
32
	{
33 9
		parent::__construct();
34 9
		$this->threads = [];
35 9
		$this->runningThreads = [];
36 9
		$this->toRunThreads = [];
37 9
		$this->isRunning = false;
38 9
		$this->maxRunningThreadNb = 0;
39 9
	}
40
41
	/**
42
	 *
43
	 */
44 1
	public function __destruct()
45
	{
46 1
		pcntl_waitpid(-1, $status, WNOHANG);
47 1
	}
48
49
	/**
50
	 * @return AbstractThread[]
51
	 */
52 3
	public function getThreads(): array
53
	{
54 3
		return $this->threads;
55
	}
56
57
	/**
58
	 * @param AbstractThread[] $threads
59
	 * @return ThreadPool
60
	 */
61 2
	public function setThreads(array $threads): self
62
	{
63 2
		$this->threads = $threads;
64
65 2
		return $this;
66
	}
67
68
	/**
69
	 * @param AbstractThread $thread
70
	 * @return ThreadPool
71
	 */
72 2
	public function addThread(AbstractThread $thread): self
73
	{
74 2
		$this->threads[] = $thread;
75
76 2
		return $this;
77
	}
78
79
	public function addLiveThread(AbstractThread $thread): self
80
    {
81
        $this->toRunThreads[] = $thread;
82 3
83
        return $this;
84 3
    }
85
86
	/**
87
	 * @return int
88
	 */
89
	public function getMaxRunningThreadNb(): int
90
	{
91 2
		return $this->maxRunningThreadNb;
92
	}
93 2
94
	/**
95 2
	 * @param int $maxRunningThreadNb
96
	 * @return ThreadPool
97
	 */
98
	public function setMaxRunningThreadNb(int $maxRunningThreadNb): self
99
	{
100
		$this->maxRunningThreadNb = $maxRunningThreadNb;
101 2
102
		return $this;
103 2
	}
104
105
	/**
106
	 * @return AbstractThread[]
107
	 */
108
	public function getToRunThreads(): array
109 2
	{
110
		return $this->toRunThreads;
111 2
	}
112
113
	/**
114
	 * @return AbstractThread[]
115
	 */
116
	public function getRunningThreads(): array
117 2
	{
118
		return $this->runningThreads;
119 2
	}
120 1
121
	/**
122 1
	 * @throws ThreadException
123 1
	 */
124
	public function run()
125
	{
126 1
		$this->checkEnv();
127 1
		$this->initRun();
128
129
		while ($this->isRunningThreads()) {
130
			$this->waitOnThreads();
131
		}
132
133 1
		$this->resetRun();
134
	}
135 1
136 1
	/**
137 1
	 * @return bool
138
	 * @throws ThreadException
139
	 */
140
	private function isRunningThreads(): bool
141 1
	{
142
//	    echo count($this->toRunThreads) . "\t|\t" . count($this->runningThreads) . PHP_EOL;
143
		if (count($this->toRunThreads) > 0) {
144
			while (count($this->runningThreads) < $this->maxRunningThreadNb && count($this->toRunThreads) > 0) {
145
				$this->createThreadProcess(array_shift($this->toRunThreads));
146
			}
147
		}
148
149
		return count($this->runningThreads) > 0;
150 1
	}
151
152 1
	/**
153
	 * can't test some part of it this since we can't unit-test in web and we're never in a child
154 1
	 * process when pid 0 when unit-testing since the coverage is done by the parent thread
155
	 *
156
	 * @param AbstractThread $thread
157
	 * @throws ThreadException
158
	 */
159 1
	private function createThreadProcess(AbstractThread $thread)
160
	{
161
		$pid = pcntl_fork();
162
163
		switch ($pid) {
164
			case -1: //error forking
165 1
				// @codeCoverageIgnoreStart
166 1
				throw new ThreadException('Error while trying to fork. Check your server installation');
167 1
				// @codeCoverageIgnoreEnd
168 1
			case 0: // child
169
				// @codeCoverageIgnoreStart
170 1
				$thread->setMediator($this->getMediator());
171
				$this->processThread($thread);
172
				break;
173
				// @codeCoverageIgnoreEnd
174
			default: //parent
175 1
				$thread->setPid($pid);
176
				$this->runningThreads[] = $thread;
177 1
				$this->notify(PoolEvent::POOL_NEW_THREAD, $thread);
178 1
				$this->startRunStatus();
179
		}
180 1
	}
181 1
182
	/**
183 1
	 *
184 1
	 */
185 1
	private function waitOnThreads()
186
	{
187
		$this->notify(PoolEvent::POOL_PRE_WAIT_TICK);
188
		foreach ($this->runningThreads as $k => $thread) {
189 1
190
			$res = pcntl_waitpid($thread->getPid(), $status, WNOHANG);
191 1
			$this->notify(PoolEvent::POOL_WAIT_TICK_PID);
192 1
193
			if ($res === -1 || $res > 0) {
194
				$this->notify(PoolEvent::POOL_WAIT_TICK_PID_REMOVED, $thread);
195
				unset($this->runningThreads[$k]);
196
			}
197
198
		}
199
		$this->notify(PoolEvent::POOL_POST_WAIT_TICK);
200
201
		usleep(self::SLEEP_TIME_MS);
202
	}
203
204
	/**
205
	 * @codeCoverageIgnore Can't test since this is only run in a child thread.. which doesnt' go throug the
206
	 * unit-test coverage which is only done in the main process
207
	 * @param AbstractThread $thread
208
	 * @throws ThreadException
209
	 */
210
	private function processThread(AbstractThread $thread)
211
	{
212
		$this->notify(PoolEvent::THREAD_PRE_PROCESS, $thread);
213
		$response = $thread->run();
214
		$this->notify(PoolEvent::THREAD_POST_PROCESS, $thread);
215
216
		switch ($response) {
217
			case AbstractThread::EXIT_STATUS_SUCCESS:
218
				$this->notify(PoolEvent::THREAD_EXIT_SUCCESS, $thread);
219
				break;
220
			case AbstractThread::EXIT_STATUS_ERROR:
221
				$this->notify(PoolEvent::THREAD_EXIT_ERROR, $thread);
222
				break;
223
			default:
224 2
				$this->notify(PoolEvent::THREAD_EXIT_UNKNOWN, $thread);
225
		}
226 2
227
		exit($response);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
228
	}
229
230
	/**
231 2
	 * Can't test the exception is not in cli since php-unit is only run in cli environment
232 1
	 * @throws ThreadException
233
	 */
234 1
	private function checkEnv()
235
	{
236
		if (false === $this->isCli()) {
237
			// @codeCoverageIgnoreStart
238
			throw new ThreadException('Error. It is not safe to use process forking in other way than php-cli');
239 1
			// @codeCoverageIgnoreEnd
240
		}
241 1
		if (0 === count($this->threads)) {
242 1
			throw new ThreadException('Error. Can\'t run child threads processes without any added in the Pool');
243
		}
244
	}
245
246
	/**
247 2
	 *
248
	 */
249 2
	private function initRun()
250
	{
251
		$this->resetRun();
252
	}
253
254
	/**
255 1
	 * @return bool
256
	 */
257 1
	private function isCli(): bool
258 1
	{
259 1
		return PHP_SAPI === 'cli';
260
	}
261 1
262
	/**
263
	 *
264
	 */
265
	private function startRunStatus()
266 1
	{
267
		if (false === $this->isRunning) {
268 1
			$this->notify(PoolEvent::POOL_RUN_START);
269 1
			$this->isRunning = true;
270
		}
271 1
	}
272 1
273 1
	/**
274 1
	 *
275
	 */
276
	private function resetRun()
277
	{
278
		if (true === $this->isRunning) {
279
			$this->notify(PoolEvent::POOL_RUN_STOP);
280
		}
281
		$this->isRunning = false;
282
		$this->toRunThreads = $this->threads;
283
		$this->runningThreads = [];
284
	}
285
286
}
287