Stream::setBufferSize()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Dazzle\Stream;
4
5
use Dazzle\Throwable\Exception\Runtime\ReadException;
6
use Dazzle\Throwable\Exception\Runtime\WriteException;
7
use Dazzle\Throwable\Exception\Logic\InvalidArgumentException;
8
use Dazzle\Loop\LoopAwareTrait;
9
use Dazzle\Loop\LoopInterface;
10
use Dazzle\Util\Buffer\Buffer;
11
use Dazzle\Util\Buffer\BufferInterface;
12
use Error;
13
use Exception;
14
15
class Stream extends Sync\Stream implements StreamInterface
16
{
17
    use LoopAwareTrait;
18
19
    /**
20
     * @var bool
21
     */
22
    protected $writing;
23
24
    /**
25
     * @var bool
26
     */
27
    protected $reading;
28
29
    /**
30
     * @var bool
31
     */
32
    protected $readingStarted;
33
34
    /**
35
     * @var bool
36
     */
37
    protected $paused;
38
39
    /**
40
     * @var BufferInterface
41
     */
42
    protected $buffer;
43
44
    /**
45
     * @param resource $resource
46
     * @param LoopInterface $loop
47
     * @param bool $autoClose
48
     * @throws InvalidArgumentException
49
     */
50 17
    public function __construct($resource, LoopInterface $loop, $autoClose = true)
51
    {
52 17
        parent::__construct($resource, $autoClose);
53
54 16
        if (function_exists('stream_set_read_buffer'))
55
        {
56 16
            stream_set_read_buffer($this->resource, 0);
57
        }
58
59 16
        if (function_exists('stream_set_write_buffer'))
60
        {
61 16
            stream_set_write_buffer($this->resource, 0);
62
        }
63
64 16
        $this->loop = $loop;
65 16
        $this->writing = false;
66 16
        $this->reading = false;
67 16
        $this->readingStarted = false;
68 16
        $this->paused = true;
69 16
        $this->buffer = new Buffer();
70
71 16
        $this->resume();
72 16
    }
73
74
    /**
75
     *
76
     */
77 13
    public function __destruct()
78
    {
79 13
        parent::__destruct();
80
81 13
        unset($this->loop);
82 13
        unset($this->writing);
83 13
        unset($this->reading);
84 13
        unset($this->readingStarted);
85 13
        unset($this->paused);
86 13
        unset($this->buffer);
87 13
    }
88
89
    /**
90
     * @override
91
     * @inheritDoc
92
     */
93 1
    public function isPaused()
94
    {
95 1
        return $this->paused;
96
    }
97
98
    /**
99
     * @override
100
     * @inheritDoc
101
     */
102
    public function setBufferSize($bufferSize)
103
    {
104
        $this->bufferSize = $bufferSize;
105
    }
106
107
    /**
108
     * @override
109
     * @inheritDoc
110
     */
111
    public function getBufferSize()
112
    {
113
        return $this->bufferSize;
114
    }
115
116
    /**
117
     * @override
118
     * @inheritDoc
119
     */
120 15
    public function pause()
121
    {
122 15
        if (!$this->paused)
123
        {
124 15
            $this->paused = true;
125 15
            $this->writing = false;
126 15
            $this->reading = false;
127 15
            $this->loop->removeWriteStream($this->resource);
128 15
            $this->loop->removeReadStream($this->resource);
129
        }
130 15
    }
131
132
    /**
133
     * @override
134
     * @inheritDoc
135
     */
136 16
    public function resume()
137
    {
138 16
        if (($this->writable || $this->readable) && $this->paused)
139
        {
140 16
            $this->paused = false;
141
142 16
            if ($this->readable && $this->readingStarted)
143
            {
144
                $this->reading = true;
145
                $this->loop->addReadStream($this->resource, $this->getHandleReadFunction());
146
            }
147
148 16 View Code Duplication
            if ($this->writable && $this->buffer->isEmpty() === false)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
149
            {
150
                $this->writing = true;
151
                $this->loop->addWriteStream($this->resource, $this->getHandleWriteFunction());
152
            }
153
        }
154 16
    }
155
156
    /**
157
     * @override
158
     * @inheritDoc
159
     */
160 2 View Code Duplication
    public function write($text = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
161
    {
162 2
        if (!$this->writable)
163
        {
164
            return $this->throwAndEmitException(
165
                new WriteException('Stream is no longer writable.')
166
            );
167
        }
168
169 2
        $this->buffer->push($text);
170
171 2
        if (!$this->writing && !$this->paused)
172
        {
173 2
            $this->writing = true;
174 2
            $this->loop->addWriteStream($this->resource, $this->getHandleWriteFunction());
175
        }
176
177 2
        return $this->buffer->length() < $this->bufferSize;
178
    }
179
180
    /**
181
     * @override
182
     * @inheritDoc
183
     */
184 1 View Code Duplication
    public function read($length = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
185
    {
186 1
        if (!$this->readable)
187
        {
188
            return $this->throwAndEmitException(
189
                new ReadException('Stream is no longer readable.')
190
            );
191
        }
192
193 1
        if (!$this->reading && !$this->paused)
194
        {
195 1
            $this->reading = true;
196 1
            $this->readingStarted = true;
197 1
            $this->loop->addReadStream($this->resource, $this->getHandleReadFunction());
198
        }
199
200 1
        return '';
201
    }
202
203
    /**
204
     * @override
205
     * @inheritDoc
206
     */
207 3 View Code Duplication
    public function close()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
208
    {
209 3
        if ($this->closing)
210
        {
211
            return;
212
        }
213
214 3
        $this->closing = true;
215 3
        $this->readable = false;
216 3
        $this->writable = false;
217
218 3
        if ($this->buffer->isEmpty() === false)
219
        {
220
            $this->writeEnd();
221
        }
222
223 3
        $this->emit('close', [ $this ]);
224 3
        $this->handleClose();
225 3
        $this->emit('done', [ $this ]);
226 3
    }
227
228
    /**
229
     * Handle the outcoming stream.
230
     *
231
     * @internal
232
     */
233 2 View Code Duplication
    public function handleWrite()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
234
    {
235 2
        $text = $this->buffer->peek();
236 2
        $sent = fwrite($this->resource, $text, $this->bufferSize);
237
238 2
        if ($sent === false)
239
        {
240
            $this->emit('error', [ $this, new WriteException('Error occurred while writing to the stream resource.') ]);
241
            return;
242
        }
243
244 2
        $lenBefore = strlen($text);
245 2
        $lenAfter = $lenBefore - $sent;
246 2
        $this->buffer->remove($sent);
247
248 2
        if ($lenAfter > 0 && $lenBefore >= $this->bufferSize && $lenAfter < $this->bufferSize)
249
        {
250
            $this->emit('drain', [ $this ]);
251
        }
252 2
        else if ($lenAfter === 0)
253
        {
254 2
            $this->loop->removeWriteStream($this->resource);
255 2
            $this->writing = false;
256 2
            $this->emit('drain', [ $this ]);
257 2
            $this->emit('finish', [ $this ]);
258
        }
259 2
    }
260
261
    /**
262
     * Handle the incoming stream.
263
     *
264
     * @internal
265
     */
266 2 View Code Duplication
    public function handleRead()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
267
    {
268 2
        $length = $this->bufferSize;
269 2
        $ret = fread($this->resource, $length);
270
271 2
        if ($ret === false)
272
        {
273
            $this->emit('error', [ $this, new ReadException('Error occurred while reading from the stream resource.') ]);
274
            return;
275
        }
276
277 2
        if ($ret !== '')
278
        {
279 2
            $this->emit('data', [ $this, $ret ]);
280
281 2
            if (strlen($ret) < $length)
282
            {
283 2
                $this->loop->removeReadStream($this->resource);
284 2
                $this->reading = false;
285 2
                $this->emit('end', [ $this ]);
286
            }
287
        }
288 2
    }
289
290
    /**
291
     * Handle close.
292
     *
293
     * @internal
294
     */
295 15
    public function handleClose()
296
    {
297 15
        $this->pause();
298
299 15
        parent::handleClose();
300 15
    }
301
302
    /**
303
     * Get function that should be invoked on read event.
304
     *
305
     * @return callable
306
     */
307 2
    protected function getHandleReadFunction()
308
    {
309 2
        return [ $this, 'handleRead' ];
310
    }
311
312
    /**
313
     * Get function that should be invoked on write event.
314
     *
315
     * @return callable
316
     */
317 3
    protected function getHandleWriteFunction()
318
    {
319 3
        return [ $this, 'handleWrite' ];
320
    }
321
322
    /**
323
     *
324
     */
325 View Code Duplication
    private function writeEnd()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
326
    {
327
        do
328
        {
329
            try
330
            {
331
                $sent = @fwrite($this->resource, $this->buffer->peek());
332
                $this->buffer->remove($sent);
333
            }
334
            catch (Error $ex)
335
            {
336
                $sent = 0;
337
            }
338
            catch (Exception $ex)
339
            {
340
                $sent = 0;
341
            }
342
        }
343
        while (is_resource($this->resource) && $sent > 0 && !$this->buffer->isEmpty());
344
    }
345
}
346