1 | <?php |
||
2 | |||
3 | namespace Wonderland\Thread; |
||
4 | |||
5 | use Wonderland\Thread\Event\Event; |
||
6 | use Wonderland\Thread\Exception\ThreadException; |
||
7 | |||
8 | class ThreadPool extends AbstractThreadPoolMediator |
||
9 | { |
||
10 | // 0.2s |
||
11 | private const SLEEP_TIME_MS = 50000; |
||
12 | |||
13 | /** @var Thread[] $childs */ |
||
14 | private $threads; |
||
15 | |||
16 | /** @var Thread[] $toRunThreads */ |
||
17 | private $toRunThreads; |
||
18 | |||
19 | /** @var Thread[] $runningChilds */ |
||
20 | private $runningThreads; |
||
21 | |||
22 | /** @var bool $isRunning */ |
||
23 | private $isRunning; |
||
24 | |||
25 | /** @var int $maxRunningThreadNb */ |
||
26 | private $maxRunningThreadNb; |
||
27 | |||
28 | /** |
||
29 | * ThreadPool constructor. |
||
30 | */ |
||
31 | 9 | public function __construct() |
|
32 | { |
||
33 | 9 | parent::__construct(); |
|
34 | 9 | $this->threads = []; |
|
35 | 9 | $this->runningThreads = []; |
|
36 | 9 | $this->toRunThreads = []; |
|
37 | 9 | $this->isRunning = false; |
|
38 | 9 | $this->maxRunningThreadNb = 0; |
|
39 | 9 | } |
|
40 | |||
41 | /** |
||
42 | * |
||
43 | */ |
||
44 | 1 | public function __destruct() |
|
45 | { |
||
46 | 1 | pcntl_waitpid(-1, $status, WNOHANG); |
|
47 | 1 | } |
|
48 | |||
49 | /** |
||
50 | * @return Thread[] |
||
51 | */ |
||
52 | 3 | public function getThreads(): array |
|
53 | { |
||
54 | 3 | return $this->threads; |
|
55 | } |
||
56 | |||
57 | /** |
||
58 | * @param Thread[] $threads |
||
59 | * @return ThreadPool |
||
60 | */ |
||
61 | 2 | public function setThreads(array $threads): self |
|
62 | { |
||
63 | 2 | $this->threads = $threads; |
|
64 | |||
65 | 2 | return $this; |
|
66 | } |
||
67 | |||
68 | /** |
||
69 | * @param Thread $thread |
||
70 | * @return ThreadPool |
||
71 | */ |
||
72 | 2 | public function addThread(Thread $thread): self |
|
73 | { |
||
74 | 2 | $this->threads[] = $thread; |
|
75 | |||
76 | 2 | return $this; |
|
77 | } |
||
78 | |||
79 | /** |
||
80 | * @return int |
||
81 | */ |
||
82 | 3 | public function getMaxRunningThreadNb(): int |
|
83 | { |
||
84 | 3 | return $this->maxRunningThreadNb; |
|
85 | } |
||
86 | |||
87 | /** |
||
88 | * @param int $maxRunningThreadNb |
||
89 | * @return ThreadPool |
||
90 | */ |
||
91 | 2 | public function setMaxRunningThreadNb(int $maxRunningThreadNb): self |
|
92 | { |
||
93 | 2 | $this->maxRunningThreadNb = $maxRunningThreadNb; |
|
94 | |||
95 | 2 | return $this; |
|
96 | } |
||
97 | |||
98 | /** |
||
99 | * @return Thread[] |
||
100 | */ |
||
101 | 2 | public function getToRunThreads(): array |
|
102 | { |
||
103 | 2 | return $this->toRunThreads; |
|
104 | } |
||
105 | |||
106 | /** |
||
107 | * @return Thread[] |
||
108 | */ |
||
109 | 2 | public function getRunningThreads(): array |
|
110 | { |
||
111 | 2 | return $this->runningThreads; |
|
112 | } |
||
113 | |||
114 | /** |
||
115 | * @throws ThreadException |
||
116 | */ |
||
117 | 2 | public function run() |
|
118 | { |
||
119 | 2 | $this->checkEnv(); |
|
120 | 1 | $this->initRun(); |
|
121 | |||
122 | 1 | while ($this->isRunningThreads()) { |
|
123 | 1 | $this->waitOnThreads(); |
|
124 | } |
||
125 | |||
126 | 1 | $this->resetRun(); |
|
127 | 1 | } |
|
128 | |||
129 | /** |
||
130 | * @return bool |
||
131 | * @throws ThreadException |
||
132 | */ |
||
133 | 1 | private function isRunningThreads(): bool |
|
134 | { |
||
135 | 1 | if (count($this->toRunThreads) > 0) { |
|
136 | 1 | while (count($this->runningThreads) < $this->maxRunningThreadNb && count($this->toRunThreads) > 0) { |
|
137 | 1 | $this->createThreadProcess(array_shift($this->toRunThreads)); |
|
138 | } |
||
139 | } |
||
140 | |||
141 | 1 | return count($this->runningThreads) > 0; |
|
142 | } |
||
143 | |||
144 | /** |
||
145 | * can't test some part of it this since we can't unit-test in web and we're never in a child |
||
146 | * process when pid 0 when unit-testing since the coverage is done by the parent thread |
||
147 | * @param Thread $thread |
||
148 | * @throws ThreadException |
||
149 | */ |
||
150 | 1 | private function createThreadProcess(Thread $thread) |
|
151 | { |
||
152 | 1 | $pid = pcntl_fork(); |
|
153 | |||
154 | 1 | switch ($pid) { |
|
155 | case -1: //error forking |
||
156 | // @codeCoverageIgnoreStart |
||
157 | throw new ThreadException('Error while trying to fork. Check your server installation'); |
||
158 | // @codeCoverageIgnoreEnd |
||
159 | 1 | case 0: // child |
|
160 | // @codeCoverageIgnoreStart |
||
161 | $this->processThread($thread); |
||
162 | break; |
||
163 | // @codeCoverageIgnoreEnd |
||
164 | default: //parent |
||
165 | 1 | $thread->setPid($pid); |
|
166 | 1 | $this->runningThreads[] = $thread; |
|
167 | 1 | $this->notify(Event::POOL_NEW_THREAD, $thread); |
|
168 | 1 | $this->startRunStatus(); |
|
169 | } |
||
170 | 1 | } |
|
171 | |||
172 | /** |
||
173 | * |
||
174 | */ |
||
175 | 1 | private function waitOnThreads() |
|
176 | { |
||
177 | 1 | $this->notify(Event::POOL_PRE_WAIT_TICK); |
|
178 | 1 | foreach ($this->runningThreads as $k => $thread) { |
|
179 | |||
180 | 1 | $res = pcntl_waitpid($thread->getPid(), $status, WNOHANG); |
|
181 | 1 | $this->notify(Event::POOL_WAIT_TICK_PID); |
|
182 | |||
183 | 1 | if ($res === -1 || $res > 0) { |
|
184 | 1 | $this->notify(Event::POOL_WAIT_TICK_PID_REMOVED, $thread); |
|
185 | 1 | unset($this->runningThreads[$k]); |
|
186 | } |
||
187 | |||
188 | } |
||
189 | 1 | $this->notify(Event::POOL_POST_WAIT_TICK); |
|
190 | |||
191 | 1 | usleep(self::SLEEP_TIME_MS); |
|
192 | 1 | } |
|
193 | |||
194 | /** |
||
195 | * @codeCoverageIgnore Can't test since this is only run in a child thread.. which doesnt' go throug the |
||
196 | * unit-test coverage which is only done in the main process |
||
197 | * @param Thread $thread |
||
198 | * @throws ThreadException |
||
199 | */ |
||
200 | private function processThread(Thread $thread) |
||
201 | { |
||
202 | $this->notify(Event::THREAD_PRE_PROCESS, $thread); |
||
203 | $response = $thread->run($thread->getProcessName()); |
||
204 | $this->notify(Event::THREAD_POST_PROCESS, $thread); |
||
205 | |||
206 | switch ($response) { |
||
207 | case Thread::EXIT_STATUS_SUCCESS: |
||
208 | $this->notify(Event::THREAD_EXIT_SUCCESS, $thread); |
||
209 | break; |
||
210 | case Thread::EXIT_STATUS_ERROR: |
||
211 | $this->notify(Event::THREAD_EXIT_ERROR, $thread); |
||
212 | break; |
||
213 | default: |
||
214 | $this->notify(Event::THREAD_EXIT_UNKNOWN, $thread); |
||
215 | } |
||
216 | |||
217 | exit($response); |
||
0 ignored issues
–
show
|
|||
218 | } |
||
219 | |||
220 | /** |
||
221 | * Can't test the exception is not in cli since php-unit is only run in cli environment |
||
222 | * @throws ThreadException |
||
223 | */ |
||
224 | 2 | private function checkEnv() |
|
225 | { |
||
226 | 2 | if (false === $this->isCli()) { |
|
227 | // @codeCoverageIgnoreStart |
||
228 | throw new ThreadException('Error. It is not safe to use process forking in other way than php-cli'); |
||
229 | // @codeCoverageIgnoreEnd |
||
230 | } |
||
231 | 2 | if (0 === count($this->threads)) { |
|
232 | 1 | throw new ThreadException('Error. Can\'t run child threads processes without any added in the Pool'); |
|
233 | } |
||
234 | 1 | } |
|
235 | |||
236 | /** |
||
237 | * |
||
238 | */ |
||
239 | 1 | private function initRun() |
|
240 | { |
||
241 | 1 | $this->resetRun(); |
|
242 | 1 | } |
|
243 | |||
244 | /** |
||
245 | * @return bool |
||
246 | */ |
||
247 | 2 | private function isCli(): bool |
|
248 | { |
||
249 | 2 | return PHP_SAPI === 'cli'; |
|
250 | } |
||
251 | |||
252 | /** |
||
253 | * |
||
254 | */ |
||
255 | 1 | private function startRunStatus() |
|
256 | { |
||
257 | 1 | if (false === $this->isRunning) { |
|
258 | 1 | $this->notify(Event::POOL_RUN_START); |
|
259 | 1 | $this->isRunning = true; |
|
260 | } |
||
261 | 1 | } |
|
262 | |||
263 | /** |
||
264 | * |
||
265 | */ |
||
266 | 1 | private function resetRun() |
|
267 | { |
||
268 | 1 | if (true === $this->isRunning) { |
|
269 | 1 | $this->notify(Event::POOL_RUN_STOP); |
|
270 | } |
||
271 | 1 | $this->isRunning = false; |
|
272 | 1 | $this->toRunThreads = $this->threads; |
|
273 | 1 | $this->runningThreads = []; |
|
274 | 1 | } |
|
275 | |||
276 | } |
||
277 |
In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.