Completed
Push — master ( 942adb...9601ee )
by Aurimas
01:59
created

Stream::setBufferSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
dl 0
loc 6
ccs 0
cts 3
cp 0
rs 9.4286
cc 1
eloc 3
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Thruster\Component\Stream;
4
5
use InvalidArgumentException;
6
use Thruster\Component\EventEmitter\EventEmitterInterface;
7
use Thruster\Component\EventEmitter\EventEmitterTrait;
8
use Thruster\Component\EventLoop\EventLoopInterface;
9
10
/**
11
 * Class Stream
12
 *
13
 * @package Thruster\Component\Stream
14
 * @author  Aurimas Niekis <[email protected]>
15
 */
16
class Stream implements DuplexStreamInterface
17
{
18
    use EventEmitterTrait;
19
    use UtilsTrait;
20
21
    /**
22
     * @var int
23
     */
24
    protected $bufferSize;
25
26
    /**
27
     * @var resource
28
     */
29
    protected $stream;
30
31
    /**
32
     * @var bool
33
     */
34
    protected $readable;
35
36
    /**
37
     * @var bool
38
     */
39
    protected $writable;
40
41
    /**
42
     * @var bool
43
     */
44
    protected $closing;
45
46
    /**
47
     * @var EventLoopInterface
48
     */
49
    protected $loop;
50
51
    /**
52
     * @var Buffer
53
     */
54
    protected $buffer;
55
56
    /**
57
     * Stream constructor.
58
     *
59
     * @param resource           $stream
60
     * @param EventLoopInterface $loop
61
     * @param int                $bufferSize
62
     */
63 7
    public function __construct($stream, EventLoopInterface $loop, int $bufferSize = 4096)
64
    {
65 7
        $this->stream = $stream;
66
67 7
        if (false === is_resource($this->stream) || "stream" !== get_resource_type($this->stream)) {
68 1
            throw new InvalidArgumentException('First parameter must be a valid stream resource');
69
        }
70
71 6
        $this->bufferSize = $bufferSize;
72 6
        $this->readable   = true;
73 6
        $this->writable   = true;
74 6
        $this->closing    = false;
75
76 6
        stream_set_blocking($this->stream, 0);
77 6
        stream_set_read_buffer($this->stream, 0);
78
79 6
        $this->loop   = $loop;
80 6
        $this->buffer = new Buffer($this->stream, $this->loop);
81
82
        $this->buffer->on('error', function ($error) {
83 1
            $this->emit('error', [$error, $this]);
84 1
            $this->close();
85 6
        });
86
87 6
        $this->buffer->on('drain', function () {
88 1
            $this->emit('drain', [$this]);
89 6
        });
90
91 6
        $this->resume();
92 6
    }
93
94
    /**
95
     * @inheritdoc
96
     */
97 6
    public function isReadable() : bool
98
    {
99 6
        return $this->readable;
100
    }
101
102
    /**
103
     * @inheritdoc
104
     */
105 3
    public function isWritable() : bool
106
    {
107 3
        return $this->writable;
108
    }
109
110
    /**
111
     * @inheritdoc
112
     */
113
    public function pause() : self
114
    {
115
        $this->loop->removeReadStream($this->stream);
116
117
        return $this;
118
    }
119
120
    /**
121
     * @inheritdoc
122
     */
123 6
    public function resume() : self
124
    {
125 6
        if ($this->isReadable()) {
126 6
            $this->loop->addReadStream($this->stream, [$this, 'handleData']);
127
        }
128
129 6
        return $this;
130
    }
131
132
    /**
133
     * @inheritdoc
134
     */
135 1
    public function write($data)
136
    {
137 1
        if (false === $this->isWritable()) {
138
            return;
139
        }
140
141 1
        return $this->buffer->write($data);
142
    }
143
144
    /**
145
     * @inheritdoc
146
     */
147 3
    public function close() : self
148
    {
149 3
        if (false === $this->isReadable() && false === $this->closing) {
150
            return $this;
151
        }
152
153 3
        $this->closing = false;
154
155 3
        $this->readable = false;
156 3
        $this->writable = false;
157
158 3
        $this->emit('end', [$this]);
159 3
        $this->emit('close', [$this]);
160 3
        $this->loop->removeStream($this->stream);
161 3
        $this->buffer->removeListeners();
162 3
        $this->removeListeners();
163
164 3
        $this->handleClose();
165
166 3
        return $this;
167
    }
168
169
    /**
170
     * @inheritdoc
171
     */
172 2
    public function end($data = null) : self
173
    {
174 2
        if (false === $this->isWritable()) {
175 1
            return $this;
176
        }
177
178 1
        $this->closing  = true;
179 1
        $this->readable = false;
180 1
        $this->writable = false;
181
182 1
        $this->buffer->on('close', [$this, 'close']);
183
184 1
        $this->buffer->end($data);
185
186 1
        return $this;
187
    }
188
189
    /**
190
     * @inheritdoc
191
     */
192
    public function pipe(WritableStreamInterface $destination, array $options = [])
193
    {
194
        $this->pipeAll($this, $destination, $options);
195
196
        return $destination;
197
    }
198
199
    /**
200
     * @param resource $stream
201
     */
202 2
    public function handleData($stream)
203
    {
204 2
        $data = fread($stream, $this->bufferSize);
205
206 2
        $this->emit('data', [$data, $this]);
207
208 2
        if (false === is_resource($stream) || feof($stream)) {
209 1
            $this->end();
210
        }
211 2
    }
212
213 3
    public function handleClose()
214
    {
215 3
        if (is_resource($this->stream)) {
216 3
            fclose($this->stream);
217
        }
218 3
    }
219
220
    /**
221
     * @return Buffer
222
     */
223 1
    public function getBuffer()
224
    {
225 1
        return $this->buffer;
226
    }
227
228
    /**
229
     * @return int
230
     */
231
    public function getBufferSize() : int
232
    {
233
        return $this->bufferSize;
234
    }
235
236
    /**
237
     * @param int $bufferSize
238
     *
239
     * @return $this
240
     */
241
    public function setBufferSize(int $bufferSize) : self
242
    {
243
        $this->bufferSize = $bufferSize;
244
245
        return $this;
246
    }
247
}
248