AliceMajere /
wonderland-thread
| 1 | <?php |
||
| 2 | |||
| 3 | namespace Wonderland\Thread; |
||
| 4 | |||
| 5 | use Wonderland\Thread\Event\Event; |
||
| 6 | use Wonderland\Thread\Exception\ThreadException; |
||
| 7 | |||
| 8 | class ThreadPool extends AbstractThreadPoolMediator |
||
| 9 | { |
||
| 10 | // 0.2s |
||
| 11 | private const SLEEP_TIME_MS = 50000; |
||
| 12 | |||
| 13 | /** @var Thread[] $childs */ |
||
| 14 | private $threads; |
||
| 15 | |||
| 16 | /** @var Thread[] $toRunThreads */ |
||
| 17 | private $toRunThreads; |
||
| 18 | |||
| 19 | /** @var Thread[] $runningChilds */ |
||
| 20 | private $runningThreads; |
||
| 21 | |||
| 22 | /** @var bool $isRunning */ |
||
| 23 | private $isRunning; |
||
| 24 | |||
| 25 | /** @var int $maxRunningThreadNb */ |
||
| 26 | private $maxRunningThreadNb; |
||
| 27 | |||
| 28 | /** |
||
| 29 | * ThreadPool constructor. |
||
| 30 | */ |
||
| 31 | 9 | public function __construct() |
|
| 32 | { |
||
| 33 | 9 | parent::__construct(); |
|
| 34 | 9 | $this->threads = []; |
|
| 35 | 9 | $this->runningThreads = []; |
|
| 36 | 9 | $this->toRunThreads = []; |
|
| 37 | 9 | $this->isRunning = false; |
|
| 38 | 9 | $this->maxRunningThreadNb = 0; |
|
| 39 | 9 | } |
|
| 40 | |||
| 41 | /** |
||
| 42 | * |
||
| 43 | */ |
||
| 44 | 1 | public function __destruct() |
|
| 45 | { |
||
| 46 | 1 | pcntl_waitpid(-1, $status, WNOHANG); |
|
| 47 | 1 | } |
|
| 48 | |||
| 49 | /** |
||
| 50 | * @return Thread[] |
||
| 51 | */ |
||
| 52 | 3 | public function getThreads(): array |
|
| 53 | { |
||
| 54 | 3 | return $this->threads; |
|
| 55 | } |
||
| 56 | |||
| 57 | /** |
||
| 58 | * @param Thread[] $threads |
||
| 59 | * @return ThreadPool |
||
| 60 | */ |
||
| 61 | 2 | public function setThreads(array $threads): self |
|
| 62 | { |
||
| 63 | 2 | $this->threads = $threads; |
|
| 64 | |||
| 65 | 2 | return $this; |
|
| 66 | } |
||
| 67 | |||
| 68 | /** |
||
| 69 | * @param Thread $thread |
||
| 70 | * @return ThreadPool |
||
| 71 | */ |
||
| 72 | 2 | public function addThread(Thread $thread): self |
|
| 73 | { |
||
| 74 | 2 | $this->threads[] = $thread; |
|
| 75 | |||
| 76 | 2 | return $this; |
|
| 77 | } |
||
| 78 | |||
| 79 | /** |
||
| 80 | * @return int |
||
| 81 | */ |
||
| 82 | 3 | public function getMaxRunningThreadNb(): int |
|
| 83 | { |
||
| 84 | 3 | return $this->maxRunningThreadNb; |
|
| 85 | } |
||
| 86 | |||
| 87 | /** |
||
| 88 | * @param int $maxRunningThreadNb |
||
| 89 | * @return ThreadPool |
||
| 90 | */ |
||
| 91 | 2 | public function setMaxRunningThreadNb(int $maxRunningThreadNb): self |
|
| 92 | { |
||
| 93 | 2 | $this->maxRunningThreadNb = $maxRunningThreadNb; |
|
| 94 | |||
| 95 | 2 | return $this; |
|
| 96 | } |
||
| 97 | |||
| 98 | /** |
||
| 99 | * @return Thread[] |
||
| 100 | */ |
||
| 101 | 2 | public function getToRunThreads(): array |
|
| 102 | { |
||
| 103 | 2 | return $this->toRunThreads; |
|
| 104 | } |
||
| 105 | |||
| 106 | /** |
||
| 107 | * @return Thread[] |
||
| 108 | */ |
||
| 109 | 2 | public function getRunningThreads(): array |
|
| 110 | { |
||
| 111 | 2 | return $this->runningThreads; |
|
| 112 | } |
||
| 113 | |||
| 114 | /** |
||
| 115 | * @throws ThreadException |
||
| 116 | */ |
||
| 117 | 2 | public function run() |
|
| 118 | { |
||
| 119 | 2 | $this->checkEnv(); |
|
| 120 | 1 | $this->initRun(); |
|
| 121 | |||
| 122 | 1 | while ($this->isRunningThreads()) { |
|
| 123 | 1 | $this->waitOnThreads(); |
|
| 124 | } |
||
| 125 | |||
| 126 | 1 | $this->resetRun(); |
|
| 127 | 1 | } |
|
| 128 | |||
| 129 | /** |
||
| 130 | * @return bool |
||
| 131 | * @throws ThreadException |
||
| 132 | */ |
||
| 133 | 1 | private function isRunningThreads(): bool |
|
| 134 | { |
||
| 135 | 1 | if (count($this->toRunThreads) > 0) { |
|
| 136 | 1 | while (count($this->runningThreads) < $this->maxRunningThreadNb && count($this->toRunThreads) > 0) { |
|
| 137 | 1 | $this->createThreadProcess(array_shift($this->toRunThreads)); |
|
| 138 | } |
||
| 139 | } |
||
| 140 | |||
| 141 | 1 | return count($this->runningThreads) > 0; |
|
| 142 | } |
||
| 143 | |||
| 144 | /** |
||
| 145 | * can't test some part of it this since we can't unit-test in web and we're never in a child |
||
| 146 | * process when pid 0 when unit-testing since the coverage is done by the parent thread |
||
| 147 | * @param Thread $thread |
||
| 148 | * @throws ThreadException |
||
| 149 | */ |
||
| 150 | 1 | private function createThreadProcess(Thread $thread) |
|
| 151 | { |
||
| 152 | 1 | $pid = pcntl_fork(); |
|
| 153 | |||
| 154 | 1 | switch ($pid) { |
|
| 155 | case -1: //error forking |
||
| 156 | // @codeCoverageIgnoreStart |
||
| 157 | throw new ThreadException('Error while trying to fork. Check your server installation'); |
||
| 158 | // @codeCoverageIgnoreEnd |
||
| 159 | 1 | case 0: // child |
|
| 160 | // @codeCoverageIgnoreStart |
||
| 161 | $this->processThread($thread); |
||
| 162 | break; |
||
| 163 | // @codeCoverageIgnoreEnd |
||
| 164 | default: //parent |
||
| 165 | 1 | $thread->setPid($pid); |
|
| 166 | 1 | $this->runningThreads[] = $thread; |
|
| 167 | 1 | $this->notify(Event::POOL_NEW_THREAD, $thread); |
|
| 168 | 1 | $this->startRunStatus(); |
|
| 169 | } |
||
| 170 | 1 | } |
|
| 171 | |||
| 172 | /** |
||
| 173 | * |
||
| 174 | */ |
||
| 175 | 1 | private function waitOnThreads() |
|
| 176 | { |
||
| 177 | 1 | $this->notify(Event::POOL_PRE_WAIT_TICK); |
|
| 178 | 1 | foreach ($this->runningThreads as $k => $thread) { |
|
| 179 | |||
| 180 | 1 | $res = pcntl_waitpid($thread->getPid(), $status, WNOHANG); |
|
| 181 | 1 | $this->notify(Event::POOL_WAIT_TICK_PID); |
|
| 182 | |||
| 183 | 1 | if ($res === -1 || $res > 0) { |
|
| 184 | 1 | $this->notify(Event::POOL_WAIT_TICK_PID_REMOVED, $thread); |
|
| 185 | 1 | unset($this->runningThreads[$k]); |
|
| 186 | } |
||
| 187 | |||
| 188 | } |
||
| 189 | 1 | $this->notify(Event::POOL_POST_WAIT_TICK); |
|
| 190 | |||
| 191 | 1 | usleep(self::SLEEP_TIME_MS); |
|
| 192 | 1 | } |
|
| 193 | |||
| 194 | /** |
||
| 195 | * @codeCoverageIgnore Can't test since this is only run in a child thread.. which doesnt' go throug the |
||
| 196 | * unit-test coverage which is only done in the main process |
||
| 197 | * @param Thread $thread |
||
| 198 | * @throws ThreadException |
||
| 199 | */ |
||
| 200 | private function processThread(Thread $thread) |
||
| 201 | { |
||
| 202 | $this->notify(Event::THREAD_PRE_PROCESS, $thread); |
||
| 203 | $response = $thread->run($thread->getProcessName()); |
||
| 204 | $this->notify(Event::THREAD_POST_PROCESS, $thread); |
||
| 205 | |||
| 206 | switch ($response) { |
||
| 207 | case Thread::EXIT_STATUS_SUCCESS: |
||
| 208 | $this->notify(Event::THREAD_EXIT_SUCCESS, $thread); |
||
| 209 | break; |
||
| 210 | case Thread::EXIT_STATUS_ERROR: |
||
| 211 | $this->notify(Event::THREAD_EXIT_ERROR, $thread); |
||
| 212 | break; |
||
| 213 | default: |
||
| 214 | $this->notify(Event::THREAD_EXIT_UNKNOWN, $thread); |
||
| 215 | } |
||
| 216 | |||
| 217 | exit($response); |
||
|
0 ignored issues
–
show
|
|||
| 218 | } |
||
| 219 | |||
| 220 | /** |
||
| 221 | * Can't test the exception is not in cli since php-unit is only run in cli environment |
||
| 222 | * @throws ThreadException |
||
| 223 | */ |
||
| 224 | 2 | private function checkEnv() |
|
| 225 | { |
||
| 226 | 2 | if (false === $this->isCli()) { |
|
| 227 | // @codeCoverageIgnoreStart |
||
| 228 | throw new ThreadException('Error. It is not safe to use process forking in other way than php-cli'); |
||
| 229 | // @codeCoverageIgnoreEnd |
||
| 230 | } |
||
| 231 | 2 | if (0 === count($this->threads)) { |
|
| 232 | 1 | throw new ThreadException('Error. Can\'t run child threads processes without any added in the Pool'); |
|
| 233 | } |
||
| 234 | 1 | } |
|
| 235 | |||
| 236 | /** |
||
| 237 | * |
||
| 238 | */ |
||
| 239 | 1 | private function initRun() |
|
| 240 | { |
||
| 241 | 1 | $this->resetRun(); |
|
| 242 | 1 | } |
|
| 243 | |||
| 244 | /** |
||
| 245 | * @return bool |
||
| 246 | */ |
||
| 247 | 2 | private function isCli(): bool |
|
| 248 | { |
||
| 249 | 2 | return PHP_SAPI === 'cli'; |
|
| 250 | } |
||
| 251 | |||
| 252 | /** |
||
| 253 | * |
||
| 254 | */ |
||
| 255 | 1 | private function startRunStatus() |
|
| 256 | { |
||
| 257 | 1 | if (false === $this->isRunning) { |
|
| 258 | 1 | $this->notify(Event::POOL_RUN_START); |
|
| 259 | 1 | $this->isRunning = true; |
|
| 260 | } |
||
| 261 | 1 | } |
|
| 262 | |||
| 263 | /** |
||
| 264 | * |
||
| 265 | */ |
||
| 266 | 1 | private function resetRun() |
|
| 267 | { |
||
| 268 | 1 | if (true === $this->isRunning) { |
|
| 269 | 1 | $this->notify(Event::POOL_RUN_STOP); |
|
| 270 | } |
||
| 271 | 1 | $this->isRunning = false; |
|
| 272 | 1 | $this->toRunThreads = $this->threads; |
|
| 273 | 1 | $this->runningThreads = []; |
|
| 274 | 1 | } |
|
| 275 | |||
| 276 | } |
||
| 277 |
In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.