Completed
Push — master ( 44a55c...67aefa )
by James
02:27
created

src/BufferedChannel.php (1 issue)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
declare (strict_types = 1); // @codeCoverageIgnore
4
5
namespace Recoil\Channel;
6
7
use Closure;
8
use Generator;
9
use Recoil\Channel\Exception\ChannelClosedException;
10
use Recoil\Kernel\Strand;
11
use Recoil\Recoil;
12
use SplFixedArray;
13
use SplQueue;
14
15
/**
16
 * A buffered, unidirectional data channel.
17
 *
18
 * Buffered data channels allow multiple values to be written to the channel
19
 * without causing write() to block. Once the buffer is full, calls to write()
20
 * will block until values are read from the channel.
21
 */
22
final class BufferedChannel implements ReadableChannel, WritableChannel
23
{
24
    /**
25
     * @param int $capacity The maximum number of values to allow in the buffer
26
     *                      at any given time.
27
     */
28
    public static function create(int $capacity) : self
29
    {
30
        return new self($capacity);
31
    }
32
33
    /**
34
     * Get the channel's buffer capacity.
35
     */
36
    public function capacity() : int
37
    {
38
        return $this->capacity;
39
    }
40
41
    /**
42
     * Read a value from this channel.
43
     *
44
     * @recoil-coroutine
45
     *
46
     * Execution of the current strand is suspended until a value becomes
47
     * available.
48
     *
49
     * @return mixed                  The value read from the channel.
50
     * @throws ChannelClosedException The channel has been closed and the buffer is empty.
51
     */
52
    public function read() : Generator
53
    {
54
        // Channel is already closed and there is nothing buffered ...
55
        if ($this->isClosed && $this->size === 0) {
56
            throw ChannelClosedException::channelClosed();
57
58
        // A negative size indicates that there are more strands attempting to
59
        // read than are attempting to write. Suspend the current strand and add
60
        // it to the strand queue so that it may be resumed by a future write
61
        // operation ...
62
        } elseif (--$this->size < 0) {
63
            return yield Recoil::suspend($this->onSuspend);
64
        }
65
66
        // Read the next value from the buffer and free the reference ...
67
        $value = $this->buffer[$this->readIndex];
68
        $this->buffer[$this->readIndex] = null;
69
70
        // Rewind the read index if we're at the end of the buffer ...
71
        if (++$this->readIndex === $this->capacity) {
72
            $this->readIndex = 0;
73
        }
74
75
        // If there are strands waiting to write to the buffer, resume the next
76
        // one allowing it to occupy the buffer slot that was just freed ...
77
        if ($this->size >= $this->capacity) {
78
            $this->strands->dequeue()->send();
79
        }
80
81
        return $value;
82
    }
83
84
    /**
85
     * Write a value to this channel.
86
     *
87
     * @recoil-coroutine
88
     *
89
     * Execution of the current strand is suspended if this call causes the
90
     * number of buffered values to exceed the channel's buffer capacity.
91
     *
92
     * @param mixed $value The value to write to the channel.
93
     *
94
     * @throws ChannelClosedException The channel has been closed.
95
     */
96
    public function write($value) : Generator
97
    {
98
        // Channel is already closed ...
99
        if ($this->isClosed) {
100
            throw ChannelClosedException::channelClosed();
101
102
        // A non-negative size indicates that there is at least one reader
103
        // already waiting for a value. Remove it from the queue and resume it
104
        // immediately ...
105
        } elseif (++$this->size <= 0) {
106
            $this->strands->dequeue()->send($value);
107
108
        // There is no buffer space left for the value. Suspend this strand
109
        // until it is resumed by a future read operation ...
110
        } elseif ($this->size > $this->capacity) {
111
            yield Recoil::suspend($this->onSuspend);
112
        }
113
114
        // Add the value to the next available slot in the buffer ...
115
        $this->buffer[$this->writeIndex++] = $value;
116
117
        // Rewind the write index if we're at the end of the buffer ...
118
        if ($this->writeIndex === $this->capacity) {
119
            $this->writeIndex = 0;
120
        }
121
    }
122
123
    /**
124
     * Close this channel.
125
     *
126
     * @recoil-coroutine
127
     */
128
    public function close() : Generator
129
    {
130
        if ($this->isClosed) {
131
            return;
132
            yield; // @codeCoverageIgnore
133
        }
134
135
        $this->isClosed = true;
136
137
        // Adjust the size to exclude any suspended writers, but include the
138
        // buffered values ...
139
        if ($this->size > $this->capacity) {
140
            $this->size = $this->capacity;
141
        } elseif ($this->size < 0) {
142
            $this->size = 0;
143
        }
144
145
        $exception = ChannelClosedException::channelClosed();
146
        foreach ($this->strands as $strand) {
147
            $strand->throw($exception);
148
        }
149
    }
150
151
    /**
152
     * Please note that this code is not part of the public API. It may be
153
     * changed or removed at any time without notice.
154
     *
155
     * @access private
156
     *
157
     * This constructor is public because the `Exception` class does not allow
158
     * subclasses to have private or protected constructors.
159
     *
160
     * @param int $capacity The maximum number of values to allow in the buffer
161
     *                      at any given time.
162
     */
163
    public function __construct(int $capacity)
164
    {
165
        assert($capacity > 0);
166
167
        $this->capacity = $capacity;
168
        $this->buffer = new SplFixedArray($capacity);
169
        $this->strands = new SplQueue();
170
171
        $this->onTerminate = function ($strand) {
0 ignored issues
show
The property onTerminate does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
172
            foreach ($this->strands as $key => $value) {
173
                if ($strand === $value) {
174
                    unset($this->strands[$key]);
175
                    if ($this->size < 0) {
176
                        ++$this->size;
177
                    } else {
178
                        --$this->size;
179
                    }
180
                }
181
            }
182
        };
183
184
        $this->onSuspend = function ($strand) {
185
            $strand->setTerminator($this->onTerminate);
186
            $this->strands->enqueue($strand);
187
        };
188
    }
189
190
    /**
191
     * @var SplQueue<Strand> The queue of strands waiting to read or write.
192
     */
193
    private $strands;
194
195
    /**
196
     * @var Closure Callback used when suspending a strand.
197
     */
198
    private $onSuspend;
199
200
    /**
201
     * @var int The maximum number of values to allow in the buffer at any given
202
     *          time.
203
     */
204
    private $capacity;
205
206
    /**
207
     * @var int The "size" of the channel. This number will be positive when
208
     *          $this->strands containers writers, and negative when it contains
209
     *          readers.
210
     */
211
    private $size = 0;
212
213
    /**
214
     * @var SplFixedArray A "circular" buffer that represents the queue of
215
     *                    unread values.
216
     */
217
    private $buffer;
218
219
    /**
220
     * @var int The index of the next available value in the buffer.
221
     */
222
    private $readIndex = 0;
223
224
    /**
225
     * @var int The index of the next free slot in the buffer.
226
     */
227
    private $writeIndex = 0;
228
229
    /**
230
     * @var bool True if close() has been called.
231
     */
232
    private $isClosed = false;
233
}
234