Test Failed
Push — main ( d01f4c...5af89f )
by Bingo
15:56
created

compareAndIncrementWorkerCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 1
dl 0
loc 3
rs 10
c 1
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace Jabe\Engine\Impl\Util\Concurrent;
4
5
class ProcessPoolExecutor implements ExecutorServiceInterface
6
{
7
    private $ctl;
8
    private const COUNT_BITS = ( PHP_INT_SIZE * 4 ) - 3;
9
    private const CAPACITY   = (1 << self::COUNT_BITS) - 1;
10
11
    // runState is stored in the high-order bits
12
    private const RUNNING    = -1 << self::COUNT_BITS;
13
    private const SHUTDOWN   =  0 << self::COUNT_BITS;
14
    private const STOP       =  1 << self::COUNT_BITS;
15
    private const TIDYING    =  2 << self::COUNT_BITS;
16
    private const TERMINATED =  3 << self::COUNT_BITS;
17
18
    private const ONLY_ONE = true;
19
20
    private $workQueue;
21
22
    private $mainLock;
23
24
    /**
25
     * Set containing all worker processes in pool
26
     */
27
    private $workers = [];
28
29
    /**
30
     * Wait condition to support awaitTermination
31
     */
32
    //private final Condition termination = mainLock.newCondition();
33
34
    /**
35
     * Tracks largest attained pool size. Accessed only under
36
     * mainLock.
37
     */
38
    private $largestPoolSize;
39
40
    /**
41
     * Counter for completed tasks
42
     */
43
    private $completedTaskCount = 0;
44
45
    /*
46
     * All user control parameters are declared as volatiles so that
47
     * ongoing actions are based on freshest values, but without need
48
     * for locking, since no internal invariants depend on them
49
     * changing synchronously with respect to other actions.
50
     */
51
52
    /**
53
     * Handler called when saturated or shutdown in execute.
54
     */
55
    private $handler;
0 ignored issues
show
introduced by
The private property $handler is not used, and could be removed.
Loading history...
56
57
    /**
58
     * Timeout in nanoseconds for idle processes
59
     */
60
    private $keepAliveTime;
61
62
    /**
63
     * Maximum pool size.
64
     */
65
    private $poolSize;
66
67
    // Packing and unpacking ctl
68
    private static function runStateOf(int $c): int
69
    {
70
        return $c & ~self::CAPACITY;
71
    }
72
73
    private static function workerCountOf(int $c): int
74
    {
75
        return $c & self::CAPACITY;
76
    }
77
78
    private static function ctlOf(int $rs, int $wc)
79
    {
80
        return $rs | $wc;
81
    }
82
83
    /*
84
     * Bit field accessors that don't require unpacking ctl.
85
     * These depend on the bit layout and on workerCount being never negative.
86
     */
87
    private static function runStateLessThan(int $c, int $s): bool
88
    {
89
        return $c < $s;
90
    }
91
92
    private static function runStateAtLeast(int $c, int $s): bool
93
    {
94
        return $c >= $s;
95
    }
96
97
    private static function isRunning(int $c): bool
98
    {
99
        return $c < self::SHUTDOWN;
100
    }
101
102
    /**
103
     * Attempt to CAS-increment the workerCount field of ctl.
104
     */
105
    private function compareAndIncrementWorkerCount(int $expect): bool
106
    {
107
        return $this->ctl->cmpset($expect, $expect + 1);
108
    }
109
110
    /**
111
     * Attempt to CAS-decrement the workerCount field of ctl.
112
     */
113
    private function compareAndDecrementWorkerCount(int $expect): bool
114
    {
115
        return $this->ctl->cmpset($expect, $expect - 1);
116
    }
117
118
    /**
119
     * Decrements the workerCount field of ctl.
120
     */
121
    private function decrementWorkerCount(): void
122
    {
123
        do {
124
        } while (!$this->compareAndDecrementWorkerCount($this->ctl->get()));
125
    }
126
127
    /**
128
     * Transitions runState to given target, or leaves it alone if
129
     * already at least the given target.
130
     *
131
     * @param targetState the desired state, either SHUTDOWN or STOP
0 ignored issues
show
Bug introduced by
The type Jabe\Engine\Impl\Util\Concurrent\the was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
132
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
133
     */
134
    private function advanceRunState(int $targetState): void
135
    {
136
        for (;;) {
137
            $c = $this->ctl->get();
138
            if (
139
                self::runStateAtLeast($c, $targetState) ||
140
                $this->ctl->cmpset($c, self::ctlOf($targetState, self::workerCountOf($c)))
141
            ) {
142
                break;
143
            }
144
        }
145
    }
146
147
    /**
148
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
149
     * and queue empty) or (STOP and pool empty).  If otherwise
150
     * eligible to terminate but workerCount is nonzero, interrupts an
151
     * idle worker to ensure that shutdown signals propagate. This
152
     * method must be called following any action that might make
153
     * termination possible -- reducing worker count or removing tasks
154
     * from the queue during shutdown. The method is non-private to
155
     * allow access from ScheduledThreadPoolExecutor.
156
     */
157
    public function tryTerminate(): void
158
    {
159
        for (;;) {
160
            $c = $this->ctl->get();
161
            if (
162
                self::isRunning($c) ||
163
                self::runStateAtLeast($c, self::TIDYING) ||
164
                (self::runStateOf($c) == self::SHUTDOWN && !$this->workQueue->isEmpty())
165
            ) {
166
                return;
167
            }
168
            if (self::workerCountOf($c) != 0) { // Eligible to terminate
169
                $this->interruptIdleWorkers(self::ONLY_ONE);
170
                return;
171
            }
172
173
            $mainLock = $this->mainLock;
174
            $mainLock->lock();
175
            try {
176
                if ($this->ctl->cmpset($c, self::ctlOf(self::TIDYING, 0))) {
177
                    try {
178
                        $this->terminated();
179
                    } finally {
180
                        $this->ctl->set(self::ctlOf(self::TERMINATED, 0));
181
                        //termination.signalAll();
182
                    }
183
                    return;
184
                }
185
            } finally {
186
                $mainLock->unlock();
187
            }
188
            // else retry on failed CAS
189
        }
190
    }
191
192
    private function checkShutdownAccess(): void
193
    {
194
    }
195
196
    /**
197
     * Interrupts all processes, even if active.
198
     */
199
    private function interruptWorkers(): void
200
    {
201
        $mainLock = $this->mainLock;
202
        $mainLock->lock();
203
        try {
204
            foreach ($this->workers as $w) {
205
                try {
206
                    $w->interrupt();
207
                } catch (\Exception $ignore) {
208
                    //
209
                }
210
            }
211
        } finally {
212
            $mainLock->unlock();
213
        }
214
    }
215
216
    /**
217
     * Interrupts processes that might be waiting for tasks
218
     */
219
    private function interruptIdleWorkers(bool $onlyOne = false): void
220
    {
221
        $mainLock = $this->mainLock;
222
        $mainLock->lock();
223
        try {
224
            foreach ($this->workers as $w) {
225
                $t = $w->process;
226
                if (!$t->isInterrupted()) {
227
                    try {
228
                        $t->interrupt();
229
                    } catch (\Exception $ignore) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
230
                    } finally {
231
                        $w->unlock();
232
                    }
233
                }
234
                if ($onlyOne) {
235
                    break;
236
                }
237
            }
238
        } finally {
239
            $mainLock->unlock();
240
        }
241
    }
242
243
    /**
244
     * Invokes the rejected execution handler for the given command.
245
     * Package-protected for use by ScheduledThreadPoolExecutor.
246
     */
247
    private function reject(RunnableInterface $command): void
0 ignored issues
show
Unused Code introduced by
The parameter $command is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

247
    private function reject(/** @scrutinizer ignore-unused */ RunnableInterface $command): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
248
    {
249
    }
250
251
    private function drainQueue(): array
252
    {
253
        $q = $this->workQueue;
254
        $taskList = [];
255
        $q->drainTo($taskList);
256
        if (!$q->isEmpty()) {
257
            $runnable = [];
258
            foreach ($q->toArray($runnable) as $r) {
259
                if ($q->remove($r)) {
260
                    $taskList[] = $r;
261
                }
262
            }
263
        }
264
        return $taskList;
265
    }
266
267
    private function addWorker(?RunnableInterface $firstTask)
268
    {
269
        for (;;) {
270
            $c = $this->ctl->get();
271
            $rs = self::runStateOf($c);
272
273
            // Check if queue empty only if necessary.
274
            if (
275
                $rs >= self::SHUTDOWN &&
276
                ! (
277
                    $rs == self::SHUTDOWN &&
278
                    $firstTask == null &&
279
                   !$this->workQueue->isEmpty()
280
                )
281
            ) {
282
                return false;
283
            }
284
285
            for (;;) {
286
                $wc = self::workerCountOf($c);
287
                if (
288
                    $wc >= self::CAPACITY ||
289
                    $wc >= $this->poolSize
290
                ) {
291
                    return false;
292
                }
293
                if ($this->compareAndIncrementWorkerCount($c)) {
294
                    break 2;
295
                }
296
                $c = $this->ctl->get();  // Re-read ctl
297
                if (self::runStateOf($c) != $rs) {
298
                    continue 2;
299
                }
300
                // else CAS failed due to workerCount change; retry inner loop
301
            }
302
        }
303
304
        $w = new Worker($firstTask, $this);
305
        $t = $w->process;
306
307
        $this->mainLock->lock();
308
        try {
309
            // Recheck while holding lock.
310
            // Back out on ThreadFactory failure or if
311
            // shut down before lock acquired.
312
            $c = $this->ctl->get();
313
            $rs = self::runStateOf($c);
314
315
            if (
316
                $t == null ||
317
                ($rs >= self::SHUTDOWN &&
318
                 !($rs == self::SHUTDOWN && $firstTask == null))
319
            ) {
320
                $this->decrementWorkerCount();
321
                $this->tryTerminate();
322
                return false;
323
            }
324
325
            $this->workers[] = $w;
326
327
            $s = count($this->workers);
328
            if ($s > $this->largestPoolSize) {
329
                $this->largestPoolSize = $s;
330
            }
331
        } finally {
332
            $this->mainLock->unlock();
333
        }
334
        $t->start();
335
        return true;
336
    }
337
338
    private function processWorkerExit(Worker $w, bool $completedAbruptly): void
339
    {
340
        if ($completedAbruptly) {// If abrupt, then workerCount wasn't adjusted
341
            $this->decrementWorkerCount();
342
        }
343
344
        $this->mainLock->lock();
345
        try {
346
            $this->completedTaskCount += $w->completedTasks;
347
            foreach ($this->workers as $key => $val) {
348
                if ($val == $w) {
349
                    unset($this->workers[$key]);
350
                    break;
351
                }
352
            }
353
        } finally {
354
            $this->mainLock->unlock();
355
        }
356
357
        $this->tryTerminate();
358
359
        $c = $this->ctl->get();
360
        if (self::runStateLessThan($c, self::STOP)) {
361
            if (!$completedAbruptly) {
362
                $min = $this->poolSize;
363
                if ($min == 0 && !$this->workQueue->isEmpty()) {
364
                    $min = 1;
365
                }
366
                if (self::workerCountOf($c) >= $min) {
367
                    return; // replacement not needed
368
                }
369
            }
370
            $this->addWorker(null);
371
        }
372
    }
373
374
    private function getTask(InterruptibleProcess $process): ?RunnableInterface
375
    {
376
        $timedOut = false;
377
        for (;;) {
378
             $c = $this->ctl->get();
379
            $rs = self::runStateOf($c);
380
381
            // Check if queue empty only if necessary.
382
            if ($rs >= self::SHUTDOWN && ($rs >= self::STOP || $this->workQueue->isEmpty())) {
383
                $this->decrementWorkerCount();
384
                return null;
385
            }
386
387
            $timed = false;
388
389
            for (;;) {
390
                $wc = self::workerCountOf($c);
391
                $timed = $wc > $this->poolSize;
392
393
                if ($wc <= $this->poolSize && !($timedOut && $timed)) {
394
                    break;
395
                }
396
                if ($this->compareAndDecrementWorkerCount($c)) {
397
                    return null;
398
                }
399
                $c = $this->ctl->get();  // Re-read ctl
400
                if (self::runStateOf($c) != $rs) {
401
                    continue 2;
402
                }
403
                // else CAS failed due to workerCount change; retry inner loop
404
            }
405
            try {
406
                $r = $timed ?
407
                    $this->workQueue->poll($this->keepAliveTime, TimeUnit::NANOSECONDS, $process) :
408
                    $this->workQueue->take($process);
409
                if ($r != null) {
410
                    return unserialize($r);
411
                }
412
                $timedOut = true;
0 ignored issues
show
Unused Code introduced by
The assignment to $timedOut is dead and can be removed.
Loading history...
413
            } catch (\Exception $retry) {
414
                $timedOut = false;
415
            }
416
            return null;
417
        }
418
        return null;
419
    }
420
421
    public function runWorker(Worker $w): void
422
    {
423
        $task = $w->firstTask;
424
        $w->firstTask = null;
425
        $completedAbruptly = true;
426
        try {
427
            while ($task != null || ($task = $this->getTask($w->process)) != null) {
428
                //$w->lock();
429
                try {
430
                    $thrown = null;
431
                    try {
432
                        $task->run();
433
                    } catch (\Exception $x) {
434
                        $thrown = $x;
0 ignored issues
show
Unused Code introduced by
The assignment to $thrown is dead and can be removed.
Loading history...
435
                        throw $x;
436
                    }
437
                } finally {
438
                    $task = null;
439
                    $w->firstTask = null;
440
                    $w->completedTasks += 1;
441
                    //$w->unlock();
442
                }
443
            }
444
            $completedAbruptly = false;
445
        } finally {
446
            $this->processWorkerExit($w, $completedAbruptly);
447
        }
448
    }
449
450
    public function __construct(
451
        int $poolSize,
452
        int $keepAliveTime,
453
        string $unit,
454
        ProcessQueueInterface $workQueue
455
    ) {
456
        $this->ctl = new \Swoole\Atomic\Long(self::ctlOf(self::RUNNING, 0));
0 ignored issues
show
Bug introduced by
The type Swoole\Atomic\Long was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
457
        $this->mainLock = new \Swoole\Lock(SWOOLE_MUTEX);
0 ignored issues
show
Bug introduced by
The constant Jabe\Engine\Impl\Util\Concurrent\SWOOLE_MUTEX was not found. Maybe you did not declare it correctly or list all dependencies?
Loading history...
Bug introduced by
The type Swoole\Lock was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
458
        if (
459
            $poolSize <= 0 || $keepAliveTime < 0
460
        ) {
461
            throw new \Exception("Illegal argument");
462
        }
463
        $this->poolSize = $poolSize;
464
        $this->workQueue = $workQueue;
465
        $this->keepAliveTime = TimeUnit::toNanos($keepAliveTime, $unit);
466
    }
467
468
    public function execute(RunnableInterface $command): void
469
    {
470
        $c = $this->ctl->get();
471
        if (self::workerCountOf($c) < $this->poolSize) {
472
            if ($this->addWorker($command)) {
473
                return;
474
            }
475
            $c = $this->ctl->get();
476
        }
477
        if (self::isRunning($c)) {
478
            $receiver = $this->workers[rand(0, count($this->workers) - 1)]->process;
479
            if ($this->workQueue->offer($command, $receiver)) {
480
                $recheck = $this->ctl->get();
481
                if (!self::isRunning($recheck) && $this->remove($command)) {
482
                    $this->reject($command);
483
                } elseif (self::workerCountOf($recheck) == 0) {
484
                    $this->addWorker(null);
485
                }
486
            }
487
        } elseif (!$this->addWorker($command)) {
488
            $this->reject($command);
489
        }
490
    }
491
492
    public function remove(RunnableInterface $task): bool
493
    {
494
        $removed = $this->workQueue->remove($task);
495
        $this->tryTerminate(); // In case SHUTDOWN and now empty
496
        return $removed;
497
    }
498
499
    /**
500
     * Performs any further cleanup following run state transition on
501
     * invocation of shutdown.  A no-op here, but used by
502
     * ScheduledThreadPoolExecutor to cancel delayed tasks.
503
     */
504
    public function onShutdown(): void
505
    {
506
    }
507
508
    /**
509
     * State check needed by ScheduledThreadPoolExecutor to
510
     * enable running tasks during shutdown.
511
     *
512
     * @param shutdownOK true if should return true if SHUTDOWN
513
     */
514
    public function isRunningOrShutdown(bool $shutdownOK): bool
515
    {
516
        $rs = self::runStateOf($this->ctl->get());
517
        return $rs == self::RUNNING || ($rs == self::SHUTDOWN && $shutdownOK);
518
    }
519
520
    public function shutdown(): void
521
    {
522
        $this->mainLock->lock();
523
        try {
524
            $this->checkShutdownAccess();
525
            $this->advanceRunState(self::SHUTDOWN);
526
            $this->interruptIdleWorkers();
527
            $this->onShutdown();
528
        } finally {
529
            $this->mainLock->unlock();
530
        }
531
        $this->tryTerminate();
532
    }
533
534
    public function shutdownNow(): array
535
    {
536
        $tasks = [];
537
        $this->mainLock->lock();
538
        try {
539
            $this->checkShutdownAccess();
540
            $this->advanceRunState(self::STOP);
541
            $this->interruptWorkers();
542
            $tasks = $this->drainQueue();
543
        } finally {
544
            $this->mainLock->unlock();
545
        }
546
        $this->tryTerminate();
547
        return $tasks;
548
    }
549
550
    public function isShutdown(): bool
551
    {
552
        return !self::isRunning($this->ctl->get());
553
    }
554
555
    public function isTerminating(): bool
556
    {
557
        $c = $this->ctl->get();
558
        return !self::isRunning($c) && self::runStateLessThan($c, self::TERMINATED);
559
    }
560
561
    public function isTerminated(): bool
562
    {
563
        return self::runStateAtLeast($this->ctl->get(), self::TERMINATED);
564
    }
565
566
    public function awaitTermination(int $timeout, string $unit)
567
    {
568
        $nanos = TimeUnit::toNanos($timeout, $unit);
569
        $this->mainLock->lock();
570
        try {
571
            for (;;) {
572
                if (self::runStateAtLeast($this->ctl->get(), self::TERMINATED)) {
573
                    return true;
574
                }
575
                if ($nanos <= 0) {
576
                    return false;
577
                }
578
                time_nanosleep(0, $nanos);
579
            }
580
        } finally {
581
            $this->mainLock->unlock();
582
        }
583
    }
584
585
    protected function terminated(): void
586
    {
587
    }
588
589
    public function completedTaskCount(): int
590
    {
591
        return $this->completedTaskCount;
592
    }
593
}
594