Test Failed
Push — main ( c76fcf...a2096a )
by Bingo
14:02
created

ProcessPoolExecutor::runWorker()   B

Complexity

Conditions 6
Paths 30

Size

Total Lines 33
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 26
c 1
b 0
f 0
dl 0
loc 33
rs 8.8817
cc 6
nc 30
nop 1
1
<?php
2
3
namespace Jabe\Engine\Impl\Util\Concurrent;
4
5
class ProcessPoolExecutor implements ExecutorServiceInterface
6
{
7
    private $ctl;
8
    private const COUNT_BITS = ( PHP_INT_SIZE * 4 ) - 3;
9
    private const CAPACITY   = (1 << self::COUNT_BITS) - 1;
10
11
    // runState is stored in the high-order bits
12
    private const RUNNING    = -1 << self::COUNT_BITS;
13
    private const SHUTDOWN   =  0 << self::COUNT_BITS;
14
    private const STOP       =  1 << self::COUNT_BITS;
15
    private const TIDYING    =  2 << self::COUNT_BITS;
16
    private const TERMINATED =  3 << self::COUNT_BITS;
17
    private const ONLY_ONE = true;
18
19
    private $workQueue;
20
    private $queueSize;
21
22
    private $mainLock;
23
24
    /**
25
     * Set containing all worker processes in pool
26
     */
27
    private $workers = [];
28
29
    /**
30
     * Tracks largest attained pool size. Accessed only under
31
     * mainLock.
32
     */
33
    private $largestPoolSize;
34
35
    /*
36
     * All user control parameters are declared as volatiles so that
37
     * ongoing actions are based on freshest values, but without need
38
     * for locking, since no internal invariants depend on them
39
     * changing synchronously with respect to other actions.
40
     */
41
42
    /**
43
     * Handler called when saturated or shutdown in execute.
44
     */
45
    private $handler;
0 ignored issues
show
introduced by
The private property $handler is not used, and could be removed.
Loading history...
46
47
    /**
48
     * Timeout in nanoseconds for idle processes
49
     */
50
    private $keepAliveTime;
51
52
    /**
53
     * Maximum pool size.
54
     */
55
    private $poolSize;
56
57
    // Packing and unpacking ctl
58
    private static function runStateOf(int $c): int
59
    {
60
        return $c & ~self::CAPACITY;
61
    }
62
63
    private static function workerCountOf(int $c): int
64
    {
65
        return $c & self::CAPACITY;
66
    }
67
68
    private static function ctlOf(int $rs, int $wc)
69
    {
70
        return $rs | $wc;
71
    }
72
73
    /*
74
     * Bit field accessors that don't require unpacking ctl.
75
     * These depend on the bit layout and on workerCount being never negative.
76
     */
77
    private static function runStateLessThan(int $c, int $s): bool
78
    {
79
        return $c < $s;
80
    }
81
82
    private static function runStateAtLeast(int $c, int $s): bool
83
    {
84
        return $c >= $s;
85
    }
86
87
    private static function isRunning(int $c): bool
88
    {
89
        return $c < self::SHUTDOWN;
90
    }
91
92
    /**
93
     * Attempt to CAS-increment the workerCount field of ctl.
94
     */
95
    private function compareAndIncrementWorkerCount(int $expect): bool
96
    {
97
        return $this->ctl->cmpset($expect, $expect + 1);
98
    }
99
100
    /**
101
     * Attempt to CAS-decrement the workerCount field of ctl.
102
     */
103
    private function compareAndDecrementWorkerCount(int $expect): bool
104
    {
105
        return $this->ctl->cmpset($expect, $expect - 1);
106
    }
107
108
    /**
109
     * Decrements the workerCount field of ctl.
110
     */
111
    private function decrementWorkerCount(): void
112
    {
113
        do {
114
        } while (!$this->compareAndDecrementWorkerCount($this->ctl->get()));
115
    }
116
117
    /**
118
     * Transitions runState to given target, or leaves it alone if
119
     * already at least the given target.
120
     *
121
     * @param targetState the desired state, either SHUTDOWN or STOP
0 ignored issues
show
Bug introduced by
The type Jabe\Engine\Impl\Util\Concurrent\the was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
122
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
123
     */
124
    private function advanceRunState(int $targetState): void
125
    {
126
        for (;;) {
127
            $c = $this->ctl->get();
128
            if (
129
                self::runStateAtLeast($c, $targetState) ||
130
                $this->ctl->cmpset($c, self::ctlOf($targetState, self::workerCountOf($c)))
131
            ) {
132
                break;
133
            }
134
        }
135
    }
136
137
    /**
138
     * Attempt to CAS-increment the queue size.
139
     */
140
    private function compareAndIncrementQueueSize(int $expect): bool
141
    {
142
        return $this->queueSize->cmpset($expect, $expect + 1);
143
    }
144
145
    /**
146
     * Attempt to CAS-decrement the queue size.
147
     */
148
    private function compareAndDecrementQueueSize(int $expect): bool
149
    {
150
        if ($expect > 0) {
151
            return $this->queueSize->cmpset($expect, $expect - 1);
152
        }
153
        return false;
154
    }
155
156
    /**
157
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
158
     * and queue empty) or (STOP and pool empty).  If otherwise
159
     * eligible to terminate but workerCount is nonzero, interrupts an
160
     * idle worker to ensure that shutdown signals propagate. This
161
     * method must be called following any action that might make
162
     * termination possible -- reducing worker count or removing tasks
163
     * from the queue during shutdown. The method is non-private to
164
     * allow access from ScheduledThreadPoolExecutor.
165
     */
166
    public function tryTerminate(): void
167
    {
168
        for (;;) {
169
            $c = $this->ctl->get();
170
            if (
171
                self::isRunning($c) ||
172
                self::runStateAtLeast($c, self::TIDYING) ||
173
                (self::runStateOf($c) == self::SHUTDOWN && !$this->workQueue->isEmpty())
174
            ) {
175
                return;
176
            }
177
            if (self::workerCountOf($c) != 0) { // Eligible to terminate
178
                $this->interruptIdleWorkers(self::ONLY_ONE);
179
                return;
180
            }
181
182
            $mainLock = $this->mainLock;
183
            $mainLock->trylock();
184
            try {
185
                if ($this->ctl->cmpset($c, self::ctlOf(self::TIDYING, 0))) {
186
                    try {
187
                        $this->terminated();
188
                    } finally {
189
                        $this->ctl->set(self::ctlOf(self::TERMINATED, 0));
190
                        //termination.signalAll();
191
                    }
192
                    return;
193
                }
194
            } finally {
195
                $mainLock->unlock();
196
            }
197
            // else retry on failed CAS
198
        }
199
    }
200
201
    private function checkShutdownAccess(): void
202
    {
203
    }
204
205
    /**
206
     * Interrupts all processes, even if active.
207
     */
208
    private function interruptWorkers(): void
209
    {
210
        $mainLock = $this->mainLock;
211
        $mainLock->trylock();
212
        try {
213
            foreach ($this->workers as $w) {
214
                try {
215
                    $w->interrupt();
216
                } catch (\Exception $ignore) {
217
                    //
218
                }
219
            }
220
        } finally {
221
            $mainLock->unlock();
222
        }
223
    }
224
225
    /**
226
     * Interrupts processes that might be waiting for tasks
227
     */
228
    private function interruptIdleWorkers(bool $onlyOne = false): void
229
    {
230
        $mainLock = $this->mainLock;
231
        $mainLock->trylock();
232
        try {
233
            foreach ($this->workers as $w) {
234
                $t = $w->process;
235
                if (!$t->isInterrupted() && $w->trylock()) {
236
                    try {
237
                        $t->interrupt();
238
                    } finally {
239
                        $w->unlock();
240
                    }
241
                }
242
                if ($onlyOne) {
243
                    break;
244
                }
245
            }
246
        } finally {
247
            $mainLock->unlock();
248
        }
249
    }
250
251
    /**
252
     * Invokes the rejected execution handler for the given command.
253
     * Package-protected for use by ScheduledThreadPoolExecutor.
254
     */
255
    private function reject(RunnableInterface $command): void
0 ignored issues
show
Unused Code introduced by
The parameter $command is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

255
    private function reject(/** @scrutinizer ignore-unused */ RunnableInterface $command): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
256
    {
257
    }
258
259
    private function drainQueue(): array
260
    {
261
        $q = $this->workQueue;
262
        $taskList = [];
263
        $q->drainTo($taskList);
264
        if ($this->queueSize->get() != 0) {
265
            $runnable = [];
266
            foreach ($q->toArray($runnable) as $r) {
267
                if ($q->remove($r)) {
268
                    $this->compareAndDecrementQueueSize($q->size() + 1);
269
                    $taskList[] = $r;
270
                }
271
            }
272
        }
273
        return $taskList;
274
    }
275
276
    private function addWorker(?RunnableInterface $firstTask)
277
    {
278
        for (;;) {
279
            $c = $this->ctl->get();
280
            $rs = self::runStateOf($c);
281
282
            // Check if queue empty only if necessary.
283
            if (
284
                $rs >= self::SHUTDOWN &&
285
                ! (
286
                    $rs == self::SHUTDOWN &&
287
                    $firstTask == null &&
288
                    $this->queueSize->get() != 0
289
                )
290
            ) {
291
                return false;
292
            }
293
294
            for (;;) {
295
                $wc = self::workerCountOf($c);
296
                if (
297
                    $wc >= self::CAPACITY ||
298
                    $wc >= $this->poolSize
299
                ) {
300
                    return false;
301
                }
302
                if ($this->compareAndIncrementWorkerCount($c)) {
303
                    break 2;
304
                }
305
                $c = $this->ctl->get();  // Re-read ctl
306
                if (self::runStateOf($c) != $rs) {
307
                    continue 2;
308
                }
309
                // else CAS failed due to workerCount change; retry inner loop
310
            }
311
        }
312
313
        $w = new Worker($firstTask, $this);
314
        $t = $w->process;
315
316
        $this->mainLock->trylock();
317
        try {
318
            // Recheck while holding lock.
319
            // Back out on ThreadFactory failure or if
320
            // shut down before lock acquired.
321
            $c = $this->ctl->get();
322
            $rs = self::runStateOf($c);
323
324
            if (
325
                $t == null ||
326
                ($rs >= self::SHUTDOWN &&
327
                 !($rs == self::SHUTDOWN && $firstTask == null))
328
            ) {
329
                $this->decrementWorkerCount();
330
                $this->tryTerminate();
331
                return false;
332
            }
333
334
            $this->workers[] = $w;
335
336
            $s = count($this->workers);
337
            if ($s > $this->largestPoolSize) {
338
                $this->largestPoolSize = $s;
339
            }
340
        } finally {
341
            $this->mainLock->unlock();
342
        }
343
        $t->start();
344
        return true;
345
    }
346
347
    private function processWorkerExit(Worker $w, bool $completedAbruptly): void
348
    {
349
        if ($completedAbruptly) {// If abrupt, then workerCount wasn't adjusted
350
            $this->decrementWorkerCount();
351
        }
352
353
        $this->mainLock->trylock();
354
        try {
355
            foreach ($this->workers as $key => $val) {
356
                if ($val == $w) {
357
                    unset($this->workers[$key]);
358
                    break;
359
                }
360
            }
361
        } finally {
362
            $this->mainLock->unlock();
363
        }
364
365
        $this->tryTerminate();
366
367
        $c = $this->ctl->get();
368
        if (self::runStateLessThan($c, self::STOP)) {
369
            if (!$completedAbruptly) {
370
                $min = $this->poolSize;
371
                if ($min == 0 && $this->queueSize->get() != 0) {
372
                    $min = 1;
373
                }
374
                if (self::workerCountOf($c) >= $min) {
375
                    return; // replacement not needed
376
                }
377
            }
378
            $this->addWorker(null);
379
        }
380
    }
381
382
    private function getTask(InterruptibleProcess $process): ?RunnableInterface
383
    {
384
        $timedOut = false;
385
        for (;;) {
386
            $c = $this->ctl->get();
387
            $rs = self::runStateOf($c);
388
389
            // Check if queue empty only if necessary.
390
            if ($rs >= self::SHUTDOWN && ($rs >= self::STOP || $this->queueSize->get() == 0)) { //$this->workQueue->isEmpty() - does not work, because workQueue is not shared
391
                $this->decrementWorkerCount();
392
                return null;
393
            }
394
395
            $timed = false;
396
397
            for (;;) {
398
                $wc = self::workerCountOf($c);
399
                $timed = $wc > $this->poolSize;
400
401
                if ($wc <= $this->poolSize && !($timedOut && $timed)) {
402
                    break;
403
                }
404
                if ($this->compareAndDecrementWorkerCount($c)) {
405
                    return null;
406
                }
407
                $c = $this->ctl->get();  // Re-read ctl
408
                if (self::runStateOf($c) != $rs) {
409
                    continue 2;
410
                }
411
                // else CAS failed due to workerCount change; retry inner loop
412
            }
413
            try {
414
                $r = $timed ?
415
                    $this->workQueue->poll($this->keepAliveTime, TimeUnit::NANOSECONDS, $process) :
416
                    $this->workQueue->take($process);
417
                if ($r != null) {
418
                    return unserialize($r);
419
                }
420
                $timedOut = true;
421
            } catch (\Exception $retry) {
422
                $timedOut = false;
423
            }
424
        }
425
    }
426
427
    public function runWorker(Worker $w): void
428
    {
429
        $firstTask = $w->firstTask;
430
        $queuedTask = null;
431
        $w->firstTask = null;
432
        $completedAbruptly = true;
433
        try {
434
            while ($firstTask != null || ($queuedTask = $this->getTask($w->process)) != null) {
435
                $w->trylock();
436
                try {
437
                    $thrown = null;
438
                    try {
439
                        if ($firstTask != null) {
440
                            $firstTask->run();
441
                        } elseif ($queuedTask != null) {
442
                            //take care
443
                            $this->compareAndDecrementQueueSize($this->queueSize->get());
444
                            $queuedTask->run();
445
                        }
446
                    } catch (\Exception $x) {
447
                        $thrown = $x;
0 ignored issues
show
Unused Code introduced by
The assignment to $thrown is dead and can be removed.
Loading history...
448
                        throw $x;
449
                    }
450
                } finally {
451
                    $firstTask = null;
452
                    $queuedTask = null;
453
                    $w->firstTask = null;
454
                    $w->unlock();
455
                }
456
            }
457
            $completedAbruptly = false;
458
        } finally {
459
            $this->processWorkerExit($w, $completedAbruptly);
460
        }
461
    }
462
463
    public function __construct(
464
        int $poolSize,
465
        int $keepAliveTime,
466
        string $unit,
467
        BlockingQueueInterface $workQueue
468
    ) {
469
        $this->ctl = new \Swoole\Atomic\Long(self::ctlOf(self::RUNNING, 0));
0 ignored issues
show
Bug introduced by
The type Swoole\Atomic\Long was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
470
        $this->mainLock = new \Swoole\Lock(SWOOLE_MUTEX);
0 ignored issues
show
Bug introduced by
The type Swoole\Lock was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
Bug introduced by
The constant Jabe\Engine\Impl\Util\Concurrent\SWOOLE_MUTEX was not found. Maybe you did not declare it correctly or list all dependencies?
Loading history...
471
        $this->queueSize = new \Swoole\Atomic\Long(0);
472
473
        if (
474
            $poolSize <= 0 || $keepAliveTime < 0
475
        ) {
476
            throw new \Exception("Illegal argument");
477
        }
478
        $this->poolSize = $poolSize;
479
        $this->workQueue = $workQueue;
480
        $this->keepAliveTime = TimeUnit::toNanos($keepAliveTime, $unit);
481
    }
482
483
    public function execute(RunnableInterface $command): void
484
    {
485
        $c = $this->ctl->get();
486
        if (self::workerCountOf($c) < $this->poolSize) {
487
            if ($this->addWorker($command)) {
488
                return;
489
            }
490
            $c = $this->ctl->get();
491
        }
492
        if (self::isRunning($c)) {
493
            $process = $this->workers[rand(0, count($this->workers) - 1)]->process;
494
            if ($this->workQueue->offer($command, $process)) {
495
                $this->compareAndIncrementQueueSize($this->workQueue->size() - 1);
496
                $recheck = $this->ctl->get();
497
                if (!self::isRunning($recheck) && $this->remove($command)) {
498
                    $this->reject($command);
499
                } elseif (self::workerCountOf($recheck) == 0) {
500
                    $this->addWorker(null);
501
                }
502
            }
503
        } elseif (!$this->addWorker($command)) {
504
            $this->reject($command);
505
        }
506
    }
507
508
    public function remove(RunnableInterface $task): bool
509
    {
510
        $removed = $this->workQueue->remove($task);
511
        if ($removed) {
512
            $this->compareAndDecrementQueueSize($this->workQueue->size() + 1);
513
        }
514
        $this->tryTerminate(); // In case SHUTDOWN and now empty
515
        return $removed;
516
    }
517
518
    /**
519
     * Performs any further cleanup following run state transition on
520
     * invocation of shutdown.  A no-op here, but used by
521
     * ScheduledThreadPoolExecutor to cancel delayed tasks.
522
     */
523
    public function onShutdown(): void
524
    {
525
    }
526
527
    /**
528
     * State check needed by ScheduledThreadPoolExecutor to
529
     * enable running tasks during shutdown.
530
     *
531
     * @param shutdownOK true if should return true if SHUTDOWN
532
     */
533
    public function isRunningOrShutdown(bool $shutdownOK): bool
534
    {
535
        $rs = self::runStateOf($this->ctl->get());
536
        return $rs == self::RUNNING || ($rs == self::SHUTDOWN && $shutdownOK);
537
    }
538
539
    public function shutdown(): void
540
    {
541
        $this->mainLock->trylock();
542
        try {
543
            $this->checkShutdownAccess();
544
            $this->advanceRunState(self::SHUTDOWN);
545
            $this->interruptIdleWorkers();
546
            $this->onShutdown();
547
        } finally {
548
            $this->mainLock->unlock();
549
        }
550
        $this->tryTerminate();
551
    }
552
553
    public function shutdownNow(): array
554
    {
555
        $tasks = [];
556
        $this->mainLock->trylock();
557
        try {
558
            $this->checkShutdownAccess();
559
            $this->advanceRunState(self::STOP);
560
            $this->interruptWorkers();
561
            $tasks = $this->drainQueue();
562
        } finally {
563
            $this->mainLock->unlock();
564
        }
565
        $this->tryTerminate();
566
        return $tasks;
567
    }
568
569
    public function isShutdown(): bool
570
    {
571
        return !self::isRunning($this->ctl->get());
572
    }
573
574
    public function isTerminating(): bool
575
    {
576
        $c = $this->ctl->get();
577
        return !self::isRunning($c) && self::runStateLessThan($c, self::TERMINATED);
578
    }
579
580
    public function isTerminated(): bool
581
    {
582
        return self::runStateAtLeast($this->ctl->get(), self::TERMINATED);
583
    }
584
585
    public function awaitTermination(int $timeout, string $unit)
586
    {
587
        $nanos = TimeUnit::toNanos($timeout, $unit);
588
        $this->mainLock->trylock();
589
        try {
590
            for (;;) {
591
                if (self::runStateAtLeast($this->ctl->get(), self::TERMINATED)) {
592
                    return true;
593
                }
594
                if ($nanos <= 0) {
595
                    return false;
596
                }
597
                time_nanosleep(0, $nanos);
598
                $nanos = -1;
599
            }
600
        } finally {
601
            $this->mainLock->unlock();
602
        }
603
    }
604
605
    protected function terminated(): void
606
    {
607
    }
608
}
609