| Total Complexity | 108 |
| Total Lines | 587 |
| Duplicated Lines | 0 % |
| Changes | 1 | ||
| Bugs | 0 | Features | 0 |
Complex classes like ProcessPoolExecutor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use ProcessPoolExecutor, and based on these observations, apply Extract Interface, too.
| 1 | <?php |
||
| 5 | class ProcessPoolExecutor implements ExecutorServiceInterface |
||
| 6 | { |
||
| 7 | private $ctl; |
||
| 8 | private const COUNT_BITS = ( PHP_INT_SIZE * 4 ) - 3; |
||
| 9 | private const CAPACITY = (1 << self::COUNT_BITS) - 1; |
||
| 10 | |||
| 11 | // runState is stored in the high-order bits |
||
| 12 | private const RUNNING = -1 << self::COUNT_BITS; |
||
| 13 | private const SHUTDOWN = 0 << self::COUNT_BITS; |
||
| 14 | private const STOP = 1 << self::COUNT_BITS; |
||
| 15 | private const TIDYING = 2 << self::COUNT_BITS; |
||
| 16 | private const TERMINATED = 3 << self::COUNT_BITS; |
||
| 17 | |||
| 18 | private const ONLY_ONE = true; |
||
| 19 | |||
| 20 | private $workQueue; |
||
| 21 | |||
| 22 | private $mainLock; |
||
| 23 | |||
| 24 | /** |
||
| 25 | * Set containing all worker processes in pool |
||
| 26 | */ |
||
| 27 | private $workers = []; |
||
| 28 | |||
| 29 | /** |
||
| 30 | * Wait condition to support awaitTermination |
||
| 31 | */ |
||
| 32 | //private final Condition termination = mainLock.newCondition(); |
||
| 33 | |||
| 34 | /** |
||
| 35 | * Tracks largest attained pool size. Accessed only under |
||
| 36 | * mainLock. |
||
| 37 | */ |
||
| 38 | private $largestPoolSize; |
||
| 39 | |||
| 40 | /** |
||
| 41 | * Counter for completed tasks |
||
| 42 | */ |
||
| 43 | private $completedTaskCount = 0; |
||
| 44 | |||
| 45 | /* |
||
| 46 | * All user control parameters are declared as volatiles so that |
||
| 47 | * ongoing actions are based on freshest values, but without need |
||
| 48 | * for locking, since no internal invariants depend on them |
||
| 49 | * changing synchronously with respect to other actions. |
||
| 50 | */ |
||
| 51 | |||
| 52 | /** |
||
| 53 | * Handler called when saturated or shutdown in execute. |
||
| 54 | */ |
||
| 55 | private $handler; |
||
|
|
|||
| 56 | |||
| 57 | /** |
||
| 58 | * Timeout in nanoseconds for idle processes |
||
| 59 | */ |
||
| 60 | private $keepAliveTime; |
||
| 61 | |||
| 62 | /** |
||
| 63 | * Maximum pool size. |
||
| 64 | */ |
||
| 65 | private $poolSize; |
||
| 66 | |||
| 67 | // Packing and unpacking ctl |
||
| 68 | private static function runStateOf(int $c): int |
||
| 69 | { |
||
| 70 | return $c & ~self::CAPACITY; |
||
| 71 | } |
||
| 72 | |||
| 73 | private static function workerCountOf(int $c): int |
||
| 74 | { |
||
| 75 | return $c & self::CAPACITY; |
||
| 76 | } |
||
| 77 | |||
| 78 | private static function ctlOf(int $rs, int $wc) |
||
| 79 | { |
||
| 80 | return $rs | $wc; |
||
| 81 | } |
||
| 82 | |||
| 83 | /* |
||
| 84 | * Bit field accessors that don't require unpacking ctl. |
||
| 85 | * These depend on the bit layout and on workerCount being never negative. |
||
| 86 | */ |
||
| 87 | private static function runStateLessThan(int $c, int $s): bool |
||
| 88 | { |
||
| 89 | return $c < $s; |
||
| 90 | } |
||
| 91 | |||
| 92 | private static function runStateAtLeast(int $c, int $s): bool |
||
| 93 | { |
||
| 94 | return $c >= $s; |
||
| 95 | } |
||
| 96 | |||
| 97 | private static function isRunning(int $c): bool |
||
| 98 | { |
||
| 99 | return $c < self::SHUTDOWN; |
||
| 100 | } |
||
| 101 | |||
| 102 | /** |
||
| 103 | * Attempt to CAS-increment the workerCount field of ctl. |
||
| 104 | */ |
||
| 105 | private function compareAndIncrementWorkerCount(int $expect): bool |
||
| 106 | { |
||
| 107 | return $this->ctl->cmpset($expect, $expect + 1); |
||
| 108 | } |
||
| 109 | |||
| 110 | /** |
||
| 111 | * Attempt to CAS-decrement the workerCount field of ctl. |
||
| 112 | */ |
||
| 113 | private function compareAndDecrementWorkerCount(int $expect): bool |
||
| 114 | { |
||
| 115 | return $this->ctl->cmpset($expect, $expect - 1); |
||
| 116 | } |
||
| 117 | |||
| 118 | /** |
||
| 119 | * Decrements the workerCount field of ctl. |
||
| 120 | */ |
||
| 121 | private function decrementWorkerCount(): void |
||
| 122 | { |
||
| 123 | do { |
||
| 124 | } while (!$this->compareAndDecrementWorkerCount($this->ctl->get())); |
||
| 125 | } |
||
| 126 | |||
| 127 | /** |
||
| 128 | * Transitions runState to given target, or leaves it alone if |
||
| 129 | * already at least the given target. |
||
| 130 | * |
||
| 131 | * @param targetState the desired state, either SHUTDOWN or STOP |
||
| 132 | * (but not TIDYING or TERMINATED -- use tryTerminate for that) |
||
| 133 | */ |
||
| 134 | private function advanceRunState(int $targetState): void |
||
| 135 | { |
||
| 136 | for (;;) { |
||
| 137 | $c = $this->ctl->get(); |
||
| 138 | if ( |
||
| 139 | self::runStateAtLeast($c, $targetState) || |
||
| 140 | $this->ctl->cmpset($c, self::ctlOf($targetState, self::workerCountOf($c))) |
||
| 141 | ) { |
||
| 142 | break; |
||
| 143 | } |
||
| 144 | } |
||
| 145 | } |
||
| 146 | |||
| 147 | /** |
||
| 148 | * Transitions to TERMINATED state if either (SHUTDOWN and pool |
||
| 149 | * and queue empty) or (STOP and pool empty). If otherwise |
||
| 150 | * eligible to terminate but workerCount is nonzero, interrupts an |
||
| 151 | * idle worker to ensure that shutdown signals propagate. This |
||
| 152 | * method must be called following any action that might make |
||
| 153 | * termination possible -- reducing worker count or removing tasks |
||
| 154 | * from the queue during shutdown. The method is non-private to |
||
| 155 | * allow access from ScheduledThreadPoolExecutor. |
||
| 156 | */ |
||
| 157 | public function tryTerminate(): void |
||
| 158 | { |
||
| 159 | for (;;) { |
||
| 160 | $c = $this->ctl->get(); |
||
| 161 | if ( |
||
| 162 | self::isRunning($c) || |
||
| 163 | self::runStateAtLeast($c, self::TIDYING) || |
||
| 164 | (self::runStateOf($c) == self::SHUTDOWN && !$this->workQueue->isEmpty()) |
||
| 165 | ) { |
||
| 166 | return; |
||
| 167 | } |
||
| 168 | if (self::workerCountOf($c) != 0) { // Eligible to terminate |
||
| 169 | $this->interruptIdleWorkers(self::ONLY_ONE); |
||
| 170 | return; |
||
| 171 | } |
||
| 172 | |||
| 173 | $mainLock = $this->mainLock; |
||
| 174 | $mainLock->lock(); |
||
| 175 | try { |
||
| 176 | if ($this->ctl->cmpset($c, self::ctlOf(self::TIDYING, 0))) { |
||
| 177 | try { |
||
| 178 | $this->terminated(); |
||
| 179 | } finally { |
||
| 180 | $this->ctl->set(self::ctlOf(self::TERMINATED, 0)); |
||
| 181 | //termination.signalAll(); |
||
| 182 | } |
||
| 183 | return; |
||
| 184 | } |
||
| 185 | } finally { |
||
| 186 | $mainLock->unlock(); |
||
| 187 | } |
||
| 188 | // else retry on failed CAS |
||
| 189 | } |
||
| 190 | } |
||
| 191 | |||
| 192 | private function checkShutdownAccess(): void |
||
| 193 | { |
||
| 194 | } |
||
| 195 | |||
| 196 | /** |
||
| 197 | * Interrupts all processes, even if active. |
||
| 198 | */ |
||
| 199 | private function interruptWorkers(): void |
||
| 200 | { |
||
| 201 | $mainLock = $this->mainLock; |
||
| 202 | $mainLock->lock(); |
||
| 203 | try { |
||
| 204 | foreach ($this->workers as $w) { |
||
| 205 | try { |
||
| 206 | $w->interrupt(); |
||
| 207 | } catch (\Exception $ignore) { |
||
| 208 | // |
||
| 209 | } |
||
| 210 | } |
||
| 211 | } finally { |
||
| 212 | $mainLock->unlock(); |
||
| 213 | } |
||
| 214 | } |
||
| 215 | |||
| 216 | /** |
||
| 217 | * Interrupts processes that might be waiting for tasks |
||
| 218 | */ |
||
| 219 | private function interruptIdleWorkers(bool $onlyOne = false): void |
||
| 220 | { |
||
| 221 | $mainLock = $this->mainLock; |
||
| 222 | $mainLock->lock(); |
||
| 223 | try { |
||
| 224 | foreach ($this->workers as $w) { |
||
| 225 | $t = $w->process; |
||
| 226 | if (!$t->isInterrupted()) { |
||
| 227 | try { |
||
| 228 | $t->interrupt(); |
||
| 229 | } catch (\Exception $ignore) { |
||
| 230 | } finally { |
||
| 231 | $w->unlock(); |
||
| 232 | } |
||
| 233 | } |
||
| 234 | if ($onlyOne) { |
||
| 235 | break; |
||
| 236 | } |
||
| 237 | } |
||
| 238 | } finally { |
||
| 239 | $mainLock->unlock(); |
||
| 240 | } |
||
| 241 | } |
||
| 242 | |||
| 243 | /** |
||
| 244 | * Invokes the rejected execution handler for the given command. |
||
| 245 | * Package-protected for use by ScheduledThreadPoolExecutor. |
||
| 246 | */ |
||
| 247 | private function reject(RunnableInterface $command): void |
||
| 248 | { |
||
| 249 | } |
||
| 250 | |||
| 251 | private function drainQueue(): array |
||
| 252 | { |
||
| 253 | $q = $this->workQueue; |
||
| 254 | $taskList = []; |
||
| 255 | $q->drainTo($taskList); |
||
| 256 | if (!$q->isEmpty()) { |
||
| 257 | $runnable = []; |
||
| 258 | foreach ($q->toArray($runnable) as $r) { |
||
| 259 | if ($q->remove($r)) { |
||
| 260 | $taskList[] = $r; |
||
| 261 | } |
||
| 262 | } |
||
| 263 | } |
||
| 264 | return $taskList; |
||
| 265 | } |
||
| 266 | |||
| 267 | private function addWorker(?RunnableInterface $firstTask) |
||
| 268 | { |
||
| 269 | for (;;) { |
||
| 270 | $c = $this->ctl->get(); |
||
| 271 | $rs = self::runStateOf($c); |
||
| 272 | |||
| 273 | // Check if queue empty only if necessary. |
||
| 274 | if ( |
||
| 275 | $rs >= self::SHUTDOWN && |
||
| 276 | ! ( |
||
| 277 | $rs == self::SHUTDOWN && |
||
| 278 | $firstTask == null && |
||
| 279 | !$this->workQueue->isEmpty() |
||
| 280 | ) |
||
| 281 | ) { |
||
| 282 | return false; |
||
| 283 | } |
||
| 284 | |||
| 285 | for (;;) { |
||
| 286 | $wc = self::workerCountOf($c); |
||
| 287 | if ( |
||
| 288 | $wc >= self::CAPACITY || |
||
| 289 | $wc >= $this->poolSize |
||
| 290 | ) { |
||
| 291 | return false; |
||
| 292 | } |
||
| 293 | if ($this->compareAndIncrementWorkerCount($c)) { |
||
| 294 | break 2; |
||
| 295 | } |
||
| 296 | $c = $this->ctl->get(); // Re-read ctl |
||
| 297 | if (self::runStateOf($c) != $rs) { |
||
| 298 | continue 2; |
||
| 299 | } |
||
| 300 | // else CAS failed due to workerCount change; retry inner loop |
||
| 301 | } |
||
| 302 | } |
||
| 303 | |||
| 304 | $w = new Worker($firstTask, $this); |
||
| 305 | $t = $w->process; |
||
| 306 | |||
| 307 | $this->mainLock->lock(); |
||
| 308 | try { |
||
| 309 | // Recheck while holding lock. |
||
| 310 | // Back out on ThreadFactory failure or if |
||
| 311 | // shut down before lock acquired. |
||
| 312 | $c = $this->ctl->get(); |
||
| 313 | $rs = self::runStateOf($c); |
||
| 314 | |||
| 315 | if ( |
||
| 316 | $t == null || |
||
| 317 | ($rs >= self::SHUTDOWN && |
||
| 318 | !($rs == self::SHUTDOWN && $firstTask == null)) |
||
| 319 | ) { |
||
| 320 | $this->decrementWorkerCount(); |
||
| 321 | $this->tryTerminate(); |
||
| 322 | return false; |
||
| 323 | } |
||
| 324 | |||
| 325 | $this->workers[] = $w; |
||
| 326 | |||
| 327 | $s = count($this->workers); |
||
| 328 | if ($s > $this->largestPoolSize) { |
||
| 329 | $this->largestPoolSize = $s; |
||
| 330 | } |
||
| 331 | } finally { |
||
| 332 | $this->mainLock->unlock(); |
||
| 333 | } |
||
| 334 | $t->start(); |
||
| 335 | return true; |
||
| 336 | } |
||
| 337 | |||
| 338 | private function processWorkerExit(Worker $w, bool $completedAbruptly): void |
||
| 339 | { |
||
| 340 | if ($completedAbruptly) {// If abrupt, then workerCount wasn't adjusted |
||
| 341 | $this->decrementWorkerCount(); |
||
| 342 | } |
||
| 343 | |||
| 344 | $this->mainLock->lock(); |
||
| 345 | try { |
||
| 346 | $this->completedTaskCount += $w->completedTasks; |
||
| 347 | foreach ($this->workers as $key => $val) { |
||
| 348 | if ($val == $w) { |
||
| 349 | unset($this->workers[$key]); |
||
| 350 | break; |
||
| 351 | } |
||
| 352 | } |
||
| 353 | } finally { |
||
| 354 | $this->mainLock->unlock(); |
||
| 355 | } |
||
| 356 | |||
| 357 | $this->tryTerminate(); |
||
| 358 | |||
| 359 | $c = $this->ctl->get(); |
||
| 360 | if (self::runStateLessThan($c, self::STOP)) { |
||
| 361 | if (!$completedAbruptly) { |
||
| 362 | $min = $this->poolSize; |
||
| 363 | if ($min == 0 && !$this->workQueue->isEmpty()) { |
||
| 364 | $min = 1; |
||
| 365 | } |
||
| 366 | if (self::workerCountOf($c) >= $min) { |
||
| 367 | return; // replacement not needed |
||
| 368 | } |
||
| 369 | } |
||
| 370 | $this->addWorker(null); |
||
| 371 | } |
||
| 372 | } |
||
| 373 | |||
| 374 | private function getTask(InterruptibleProcess $process): ?RunnableInterface |
||
| 375 | { |
||
| 376 | $timedOut = false; |
||
| 377 | for (;;) { |
||
| 378 | $c = $this->ctl->get(); |
||
| 379 | $rs = self::runStateOf($c); |
||
| 380 | |||
| 381 | // Check if queue empty only if necessary. |
||
| 382 | if ($rs >= self::SHUTDOWN && ($rs >= self::STOP || $this->workQueue->isEmpty())) { |
||
| 383 | $this->decrementWorkerCount(); |
||
| 384 | return null; |
||
| 385 | } |
||
| 386 | |||
| 387 | $timed = false; |
||
| 388 | |||
| 389 | for (;;) { |
||
| 390 | $wc = self::workerCountOf($c); |
||
| 391 | $timed = $wc > $this->poolSize; |
||
| 392 | |||
| 393 | if ($wc <= $this->poolSize && !($timedOut && $timed)) { |
||
| 394 | break; |
||
| 395 | } |
||
| 396 | if ($this->compareAndDecrementWorkerCount($c)) { |
||
| 397 | return null; |
||
| 398 | } |
||
| 399 | $c = $this->ctl->get(); // Re-read ctl |
||
| 400 | if (self::runStateOf($c) != $rs) { |
||
| 401 | continue 2; |
||
| 402 | } |
||
| 403 | // else CAS failed due to workerCount change; retry inner loop |
||
| 404 | } |
||
| 405 | try { |
||
| 406 | $r = $timed ? |
||
| 407 | $this->workQueue->poll($this->keepAliveTime, TimeUnit::NANOSECONDS, $process) : |
||
| 408 | $this->workQueue->take($process); |
||
| 409 | if ($r != null) { |
||
| 410 | return unserialize($r); |
||
| 411 | } |
||
| 412 | $timedOut = true; |
||
| 413 | } catch (\Exception $retry) { |
||
| 414 | $timedOut = false; |
||
| 415 | } |
||
| 416 | return null; |
||
| 417 | } |
||
| 418 | return null; |
||
| 419 | } |
||
| 420 | |||
| 421 | public function runWorker(Worker $w): void |
||
| 422 | { |
||
| 423 | $task = $w->firstTask; |
||
| 424 | $w->firstTask = null; |
||
| 425 | $completedAbruptly = true; |
||
| 426 | try { |
||
| 427 | while ($task != null || ($task = $this->getTask($w->process)) != null) { |
||
| 428 | //$w->lock(); |
||
| 429 | try { |
||
| 430 | $thrown = null; |
||
| 431 | try { |
||
| 432 | $task->run(); |
||
| 433 | } catch (\Exception $x) { |
||
| 434 | $thrown = $x; |
||
| 435 | throw $x; |
||
| 436 | } |
||
| 437 | } finally { |
||
| 438 | $task = null; |
||
| 439 | $w->firstTask = null; |
||
| 440 | $w->completedTasks += 1; |
||
| 441 | //$w->unlock(); |
||
| 442 | } |
||
| 443 | } |
||
| 444 | $completedAbruptly = false; |
||
| 445 | } finally { |
||
| 446 | $this->processWorkerExit($w, $completedAbruptly); |
||
| 447 | } |
||
| 448 | } |
||
| 449 | |||
| 450 | public function __construct( |
||
| 451 | int $poolSize, |
||
| 452 | int $keepAliveTime, |
||
| 453 | string $unit, |
||
| 454 | ProcessQueueInterface $workQueue |
||
| 455 | ) { |
||
| 456 | $this->ctl = new \Swoole\Atomic\Long(self::ctlOf(self::RUNNING, 0)); |
||
| 457 | $this->mainLock = new \Swoole\Lock(SWOOLE_MUTEX); |
||
| 458 | if ( |
||
| 459 | $poolSize <= 0 || $keepAliveTime < 0 |
||
| 460 | ) { |
||
| 461 | throw new \Exception("Illegal argument"); |
||
| 462 | } |
||
| 463 | $this->poolSize = $poolSize; |
||
| 464 | $this->workQueue = $workQueue; |
||
| 465 | $this->keepAliveTime = TimeUnit::toNanos($keepAliveTime, $unit); |
||
| 466 | } |
||
| 467 | |||
| 468 | public function execute(RunnableInterface $command): void |
||
| 469 | { |
||
| 470 | $c = $this->ctl->get(); |
||
| 471 | if (self::workerCountOf($c) < $this->poolSize) { |
||
| 472 | if ($this->addWorker($command)) { |
||
| 473 | return; |
||
| 474 | } |
||
| 475 | $c = $this->ctl->get(); |
||
| 476 | } |
||
| 477 | if (self::isRunning($c)) { |
||
| 478 | $receiver = $this->workers[rand(0, count($this->workers) - 1)]->process; |
||
| 479 | if ($this->workQueue->offer($command, $receiver)) { |
||
| 480 | $recheck = $this->ctl->get(); |
||
| 481 | if (!self::isRunning($recheck) && $this->remove($command)) { |
||
| 482 | $this->reject($command); |
||
| 483 | } elseif (self::workerCountOf($recheck) == 0) { |
||
| 484 | $this->addWorker(null); |
||
| 485 | } |
||
| 486 | } |
||
| 487 | } elseif (!$this->addWorker($command)) { |
||
| 488 | $this->reject($command); |
||
| 489 | } |
||
| 490 | } |
||
| 491 | |||
| 492 | public function remove(RunnableInterface $task): bool |
||
| 493 | { |
||
| 494 | $removed = $this->workQueue->remove($task); |
||
| 495 | $this->tryTerminate(); // In case SHUTDOWN and now empty |
||
| 496 | return $removed; |
||
| 497 | } |
||
| 498 | |||
| 499 | /** |
||
| 500 | * Performs any further cleanup following run state transition on |
||
| 501 | * invocation of shutdown. A no-op here, but used by |
||
| 502 | * ScheduledThreadPoolExecutor to cancel delayed tasks. |
||
| 503 | */ |
||
| 504 | public function onShutdown(): void |
||
| 505 | { |
||
| 506 | } |
||
| 507 | |||
| 508 | /** |
||
| 509 | * State check needed by ScheduledThreadPoolExecutor to |
||
| 510 | * enable running tasks during shutdown. |
||
| 511 | * |
||
| 512 | * @param shutdownOK true if should return true if SHUTDOWN |
||
| 513 | */ |
||
| 514 | public function isRunningOrShutdown(bool $shutdownOK): bool |
||
| 515 | { |
||
| 516 | $rs = self::runStateOf($this->ctl->get()); |
||
| 517 | return $rs == self::RUNNING || ($rs == self::SHUTDOWN && $shutdownOK); |
||
| 518 | } |
||
| 519 | |||
| 520 | public function shutdown(): void |
||
| 521 | { |
||
| 522 | $this->mainLock->lock(); |
||
| 523 | try { |
||
| 524 | $this->checkShutdownAccess(); |
||
| 525 | $this->advanceRunState(self::SHUTDOWN); |
||
| 526 | $this->interruptIdleWorkers(); |
||
| 527 | $this->onShutdown(); |
||
| 528 | } finally { |
||
| 529 | $this->mainLock->unlock(); |
||
| 530 | } |
||
| 531 | $this->tryTerminate(); |
||
| 532 | } |
||
| 533 | |||
| 534 | public function shutdownNow(): array |
||
| 535 | { |
||
| 536 | $tasks = []; |
||
| 537 | $this->mainLock->lock(); |
||
| 538 | try { |
||
| 539 | $this->checkShutdownAccess(); |
||
| 540 | $this->advanceRunState(self::STOP); |
||
| 541 | $this->interruptWorkers(); |
||
| 542 | $tasks = $this->drainQueue(); |
||
| 543 | } finally { |
||
| 544 | $this->mainLock->unlock(); |
||
| 545 | } |
||
| 546 | $this->tryTerminate(); |
||
| 547 | return $tasks; |
||
| 548 | } |
||
| 549 | |||
| 550 | public function isShutdown(): bool |
||
| 551 | { |
||
| 552 | return !self::isRunning($this->ctl->get()); |
||
| 553 | } |
||
| 554 | |||
| 555 | public function isTerminating(): bool |
||
| 559 | } |
||
| 560 | |||
| 561 | public function isTerminated(): bool |
||
| 562 | { |
||
| 563 | return self::runStateAtLeast($this->ctl->get(), self::TERMINATED); |
||
| 564 | } |
||
| 565 | |||
| 566 | public function awaitTermination(int $timeout, string $unit) |
||
| 567 | { |
||
| 568 | $nanos = TimeUnit::toNanos($timeout, $unit); |
||
| 569 | $this->mainLock->lock(); |
||
| 570 | try { |
||
| 571 | for (;;) { |
||
| 572 | if (self::runStateAtLeast($this->ctl->get(), self::TERMINATED)) { |
||
| 573 | return true; |
||
| 574 | } |
||
| 575 | if ($nanos <= 0) { |
||
| 576 | return false; |
||
| 577 | } |
||
| 578 | time_nanosleep(0, $nanos); |
||
| 579 | } |
||
| 580 | } finally { |
||
| 581 | $this->mainLock->unlock(); |
||
| 582 | } |
||
| 583 | } |
||
| 584 | |||
| 585 | protected function terminated(): void |
||
| 586 | { |
||
| 587 | } |
||
| 588 | |||
| 589 | public function completedTaskCount(): int |
||
| 592 | } |
||
| 593 | } |
||
| 594 |