Issues (5)

src/ThreadPool.php (1 issue)

Severity
1
<?php
2
3
namespace Wonderland\Thread;
4
5
use Wonderland\Thread\Event\Event;
6
use Wonderland\Thread\Exception\ThreadException;
7
8
class ThreadPool extends AbstractThreadPoolMediator
9
{
10
	// 0.2s
11
	private const SLEEP_TIME_MS = 50000;
12
13
	/** @var Thread[] $childs */
14
	private $threads;
15
16
	/** @var Thread[] $toRunThreads */
17
	private $toRunThreads;
18
19
	/** @var Thread[] $runningChilds */
20
	private $runningThreads;
21
22
	/** @var bool $isRunning */
23
	private $isRunning;
24
25
	/** @var int $maxRunningThreadNb */
26
	private $maxRunningThreadNb;
27
28
	/**
29
	 * ThreadPool constructor.
30
	 */
31 9
	public function __construct()
32
	{
33 9
		parent::__construct();
34 9
		$this->threads = [];
35 9
		$this->runningThreads = [];
36 9
		$this->toRunThreads = [];
37 9
		$this->isRunning = false;
38 9
		$this->maxRunningThreadNb = 0;
39 9
	}
40
41
	/**
42
	 *
43
	 */
44 1
	public function __destruct()
45
	{
46 1
		pcntl_waitpid(-1, $status, WNOHANG);
47 1
	}
48
49
	/**
50
	 * @return Thread[]
51
	 */
52 3
	public function getThreads(): array
53
	{
54 3
		return $this->threads;
55
	}
56
57
	/**
58
	 * @param Thread[] $threads
59
	 * @return ThreadPool
60
	 */
61 2
	public function setThreads(array $threads): self
62
	{
63 2
		$this->threads = $threads;
64
65 2
		return $this;
66
	}
67
68
	/**
69
	 * @param Thread $thread
70
	 * @return ThreadPool
71
	 */
72 2
	public function addThread(Thread $thread): self
73
	{
74 2
		$this->threads[] = $thread;
75
76 2
		return $this;
77
	}
78
79
	/**
80
	 * @return int
81
	 */
82 3
	public function getMaxRunningThreadNb(): int
83
	{
84 3
		return $this->maxRunningThreadNb;
85
	}
86
87
	/**
88
	 * @param int $maxRunningThreadNb
89
	 * @return ThreadPool
90
	 */
91 2
	public function setMaxRunningThreadNb(int $maxRunningThreadNb): self
92
	{
93 2
		$this->maxRunningThreadNb = $maxRunningThreadNb;
94
95 2
		return $this;
96
	}
97
98
	/**
99
	 * @return Thread[]
100
	 */
101 2
	public function getToRunThreads(): array
102
	{
103 2
		return $this->toRunThreads;
104
	}
105
106
	/**
107
	 * @return Thread[]
108
	 */
109 2
	public function getRunningThreads(): array
110
	{
111 2
		return $this->runningThreads;
112
	}
113
114
	/**
115
	 * @throws ThreadException
116
	 */
117 2
	public function run()
118
	{
119 2
		$this->checkEnv();
120 1
		$this->initRun();
121
122 1
		while ($this->isRunningThreads()) {
123 1
			$this->waitOnThreads();
124
		}
125
126 1
		$this->resetRun();
127 1
	}
128
129
	/**
130
	 * @return bool
131
	 * @throws ThreadException
132
	 */
133 1
	private function isRunningThreads(): bool
134
	{
135 1
		if (count($this->toRunThreads) > 0) {
136 1
			while (count($this->runningThreads) < $this->maxRunningThreadNb && count($this->toRunThreads) > 0) {
137 1
				$this->createThreadProcess(array_shift($this->toRunThreads));
138
			}
139
		}
140
141 1
		return count($this->runningThreads) > 0;
142
	}
143
144
	/**
145
	 * can't test some part of it this since we can't unit-test in web and we're never in a child
146
	 * process when pid 0 when unit-testing since the coverage is done by the parent thread
147
	 * @param Thread $thread
148
	 * @throws ThreadException
149
	 */
150 1
	private function createThreadProcess(Thread $thread)
151
	{
152 1
		$pid = pcntl_fork();
153
154 1
		switch ($pid) {
155
			case -1: //error forking
156
				// @codeCoverageIgnoreStart
157
				throw new ThreadException('Error while trying to fork. Check your server installation');
158
				// @codeCoverageIgnoreEnd
159 1
			case 0: // child
160
				// @codeCoverageIgnoreStart
161
				$this->processThread($thread);
162
				break;
163
				// @codeCoverageIgnoreEnd
164
			default: //parent
165 1
				$thread->setPid($pid);
166 1
				$this->runningThreads[] = $thread;
167 1
				$this->notify(Event::POOL_NEW_THREAD, $thread);
168 1
				$this->startRunStatus();
169
		}
170 1
	}
171
172
	/**
173
	 *
174
	 */
175 1
	private function waitOnThreads()
176
	{
177 1
		$this->notify(Event::POOL_PRE_WAIT_TICK);
178 1
		foreach ($this->runningThreads as $k => $thread) {
179
180 1
			$res = pcntl_waitpid($thread->getPid(), $status, WNOHANG);
181 1
			$this->notify(Event::POOL_WAIT_TICK_PID);
182
183 1
			if ($res === -1 || $res > 0) {
184 1
				$this->notify(Event::POOL_WAIT_TICK_PID_REMOVED, $thread);
185 1
				unset($this->runningThreads[$k]);
186
			}
187
188
		}
189 1
		$this->notify(Event::POOL_POST_WAIT_TICK);
190
191 1
		usleep(self::SLEEP_TIME_MS);
192 1
	}
193
194
	/**
195
	 * @codeCoverageIgnore Can't test since this is only run in a child thread.. which doesnt' go throug the
196
	 * unit-test coverage which is only done in the main process
197
	 * @param Thread $thread
198
	 * @throws ThreadException
199
	 */
200
	private function processThread(Thread $thread)
201
	{
202
		$this->notify(Event::THREAD_PRE_PROCESS, $thread);
203
		$response = $thread->run($thread->getProcessName());
204
		$this->notify(Event::THREAD_POST_PROCESS, $thread);
205
206
		switch ($response) {
207
			case Thread::EXIT_STATUS_SUCCESS:
208
				$this->notify(Event::THREAD_EXIT_SUCCESS, $thread);
209
				break;
210
			case Thread::EXIT_STATUS_ERROR:
211
				$this->notify(Event::THREAD_EXIT_ERROR, $thread);
212
				break;
213
			default:
214
				$this->notify(Event::THREAD_EXIT_UNKNOWN, $thread);
215
		}
216
217
		exit($response);
0 ignored issues
show
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
218
	}
219
220
	/**
221
	 * Can't test the exception is not in cli since php-unit is only run in cli environment
222
	 * @throws ThreadException
223
	 */
224 2
	private function checkEnv()
225
	{
226 2
		if (false === $this->isCli()) {
227
			// @codeCoverageIgnoreStart
228
			throw new ThreadException('Error. It is not safe to use process forking in other way than php-cli');
229
			// @codeCoverageIgnoreEnd
230
		}
231 2
		if (0 === count($this->threads)) {
232 1
			throw new ThreadException('Error. Can\'t run child threads processes without any added in the Pool');
233
		}
234 1
	}
235
236
	/**
237
	 *
238
	 */
239 1
	private function initRun()
240
	{
241 1
		$this->resetRun();
242 1
	}
243
244
	/**
245
	 * @return bool
246
	 */
247 2
	private function isCli(): bool
248
	{
249 2
		return PHP_SAPI === 'cli';
250
	}
251
252
	/**
253
	 *
254
	 */
255 1
	private function startRunStatus()
256
	{
257 1
		if (false === $this->isRunning) {
258 1
			$this->notify(Event::POOL_RUN_START);
259 1
			$this->isRunning = true;
260
		}
261 1
	}
262
263
	/**
264
	 *
265
	 */
266 1
	private function resetRun()
267
	{
268 1
		if (true === $this->isRunning) {
269 1
			$this->notify(Event::POOL_RUN_STOP);
270
		}
271 1
		$this->isRunning = false;
272 1
		$this->toRunThreads = $this->threads;
273 1
		$this->runningThreads = [];
274 1
	}
275
276
}
277