Shaper::waits()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
c 0
b 0
f 0
rs 10
cc 1
nc 1
nop 0
1
<?php
2
/**
3
 * Shaper
4
 * User: moyo
5
 * Date: 19/08/2017
6
 * Time: 11:02 AM
7
 */
8
9
namespace Carno\Shaping;
10
11
use function Carno\Coroutine\race;
12
use function Carno\Coroutine\timeout;
13
use Carno\Promise\Promise;
14
use Carno\Promise\Promised;
15
use Carno\Shaping\Exception\AcquirePermitsDeniedException;
16
use Carno\Shaping\Exception\AcquireWaitTimeoutException;
17
use SplStack;
18
19
class Shaper
20
{
21
    /**
22
     * @var Options
23
     */
24
    private $options = null;
25
26
    /**
27
     * @var Bucket
28
     */
29
    private $bucket = null;
30
31
    /**
32
     * @var SplStack
33
     */
34
    private $waits = null;
35
36
    /**
37
     * Shaper constructor.
38
     * @param Options $options
39
     */
40
    public function __construct(Options $options)
41
    {
42
        $this->options = $options;
43
44
        $this->waits = new SplStack;
45
46
        $this->bucket = new Bucket($options, function (Bucket $bucket) {
47
            $this->continues($bucket);
48
        });
49
50
        Control::register($this);
51
    }
52
53
    /**
54
     */
55
    public function shutdown() : void
56
    {
57
        $this->bucket->stop();
58
59
        Control::deregister($this);
60
    }
61
62
    /**
63
     * @return int
64
     */
65
    public function tokens() : int
66
    {
67
        return $this->bucket->tokens();
68
    }
69
70
    /**
71
     * @return int
72
     */
73
    public function waits() : int
74
    {
75
        return $this->waits->count();
76
    }
77
78
    /**
79
     * @param Bucket $bucket
80
     */
81
    private function continues(Bucket $bucket) : void
82
    {
83
        if ($this->waits->isEmpty()) {
84
            return;
85
        }
86
87
        while (!$this->waits->isEmpty()) {
88
            /**
89
             * @var Promised $wait
90
             */
91
            list($permits, $wait) = $this->waits->shift();
92
            if ($wait->pended()) {
93
                if ($bucket->out($permits)) {
94
                    $wait->resolve();
95
                } else {
96
                    $this->waits->unshift([$permits, $wait]);
97
                    break;
98
                }
99
            }
100
        }
101
    }
102
103
    /**
104
     * @param int $permits
105
     * @return bool
106
     */
107
    public function acquired(int $permits = 1) : bool
108
    {
109
        return $this->bucket->out($permits) ? true : false;
110
    }
111
112
    /**
113
     * @param int $permits
114
     * @return Promised
115
     */
116
    public function queued(int $permits = 1) : Promised
117
    {
118
        if ($this->options->waitQMax && $this->waits->count() < $this->options->waitQMax) {
119
            $queued = Promise::deferred();
120
            $this->waits->push([$permits, $queued]);
121
            return race($queued, timeout($this->options->waitTimeout, AcquireWaitTimeoutException::class));
122
        } else {
123
            return Promise::rejected(new AcquirePermitsDeniedException);
124
        }
125
    }
126
}
127