1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\Stream; |
4
|
|
|
|
5
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
6
|
|
|
use Dazzle\Loop\LoopInterface; |
7
|
|
|
use Dazzle\Throwable\Exception\Logic\InvalidArgumentException; |
8
|
|
|
use Dazzle\Throwable\Exception\Runtime\ReadException; |
9
|
|
|
use Error; |
10
|
|
|
use Exception; |
11
|
|
|
|
12
|
|
|
class StreamReader extends Sync\StreamReader implements StreamReaderInterface |
13
|
|
|
{ |
14
|
|
|
use LoopAwareTrait; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* @var bool |
18
|
|
|
*/ |
19
|
|
|
protected $reading; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var bool |
23
|
|
|
*/ |
24
|
|
|
protected $readingStarted; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var bool |
28
|
|
|
*/ |
29
|
|
|
protected $paused; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @param resource $resource |
33
|
|
|
* @param LoopInterface $loop |
34
|
|
|
* @param bool $autoClose |
35
|
|
|
* @throws InvalidArgumentException |
36
|
|
|
*/ |
37
|
3 |
View Code Duplication |
public function __construct($resource, LoopInterface $loop, $autoClose = true) |
|
|
|
|
38
|
|
|
{ |
39
|
3 |
|
parent::__construct($resource, $autoClose); |
40
|
|
|
|
41
|
3 |
|
if (function_exists('stream_set_read_buffer')) |
42
|
|
|
{ |
43
|
3 |
|
stream_set_read_buffer($this->resource, 0); |
44
|
|
|
} |
45
|
|
|
|
46
|
3 |
|
$this->loop = $loop; |
47
|
3 |
|
$this->reading = false; |
48
|
3 |
|
$this->readingStarted = false; |
49
|
3 |
|
$this->paused = true; |
50
|
|
|
|
51
|
3 |
|
$this->resume(); |
52
|
3 |
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* |
56
|
|
|
*/ |
57
|
1 |
|
public function __destruct() |
58
|
|
|
{ |
59
|
1 |
|
parent::__destruct(); |
60
|
|
|
|
61
|
1 |
|
unset($this->loop); |
62
|
1 |
|
unset($this->reading); |
63
|
1 |
|
unset($this->readingStarted); |
64
|
1 |
|
unset($this->paused); |
65
|
1 |
|
} |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @override |
69
|
|
|
* @inheritDoc |
70
|
|
|
*/ |
71
|
|
|
public function isPaused() |
72
|
|
|
{ |
73
|
|
|
return $this->paused; |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @override |
78
|
|
|
* @inheritDoc |
79
|
|
|
*/ |
80
|
|
|
public function setBufferSize($bufferSize) |
81
|
|
|
{ |
82
|
|
|
$this->bufferSize = $bufferSize; |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* @override |
87
|
|
|
* @inheritDoc |
88
|
|
|
*/ |
89
|
|
|
public function getBufferSize() |
90
|
|
|
{ |
91
|
|
|
return $this->bufferSize; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
/** |
95
|
|
|
* @override |
96
|
|
|
* @inheritDoc |
97
|
|
|
*/ |
98
|
2 |
|
public function pause() |
99
|
|
|
{ |
100
|
2 |
|
if (!$this->paused) |
101
|
|
|
{ |
102
|
2 |
|
$this->paused = true; |
103
|
2 |
|
$this->reading = false; |
104
|
2 |
|
$this->loop->removeReadStream($this->resource); |
105
|
|
|
} |
106
|
2 |
|
} |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* @override |
110
|
|
|
* @inheritDoc |
111
|
|
|
*/ |
112
|
3 |
|
public function resume() |
113
|
|
|
{ |
114
|
3 |
|
if ($this->readable && $this->paused) |
115
|
|
|
{ |
116
|
3 |
|
$this->paused = false; |
117
|
3 |
|
if ($this->readingStarted) |
118
|
|
|
{ |
119
|
|
|
$this->reading = true; |
120
|
|
|
$this->loop->addReadStream($this->resource, $this->getHandleReadFunction()); |
121
|
|
|
} |
122
|
|
|
} |
123
|
3 |
|
} |
124
|
|
|
|
125
|
|
|
/** |
126
|
|
|
* @override |
127
|
|
|
* @inheritDoc |
128
|
|
|
*/ |
129
|
2 |
View Code Duplication |
public function read($length = null) |
|
|
|
|
130
|
|
|
{ |
131
|
2 |
|
if (!$this->readable) |
132
|
|
|
{ |
133
|
|
|
return $this->throwAndEmitException( |
134
|
|
|
new ReadException('Stream is no longer readable.') |
135
|
|
|
); |
136
|
|
|
} |
137
|
|
|
|
138
|
2 |
|
if (!$this->reading && !$this->paused) |
139
|
|
|
{ |
140
|
2 |
|
$this->reading = true; |
141
|
2 |
|
$this->readingStarted = true; |
142
|
2 |
|
$this->loop->addReadStream($this->resource, $this->getHandleReadFunction()); |
143
|
|
|
} |
144
|
|
|
|
145
|
2 |
|
return ''; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
|
|
* Handle the incoming stream. |
150
|
|
|
* |
151
|
|
|
* @internal |
152
|
|
|
*/ |
153
|
2 |
View Code Duplication |
public function handleRead() |
|
|
|
|
154
|
|
|
{ |
155
|
2 |
|
$length = $this->bufferSize; |
156
|
2 |
|
$ret = fread($this->resource, $length); |
157
|
|
|
|
158
|
2 |
|
if ($ret === false) |
159
|
|
|
{ |
160
|
|
|
$this->emit('error', [ $this, new ReadException('Error occurred while reading from the stream resource.') ]); |
161
|
|
|
return; |
162
|
|
|
} |
163
|
|
|
|
164
|
2 |
|
if ($ret !== '') |
165
|
|
|
{ |
166
|
2 |
|
$this->emit('data', [ $this, $ret ]); |
167
|
|
|
|
168
|
2 |
|
if (strlen($ret) < $length) |
169
|
|
|
{ |
170
|
2 |
|
$this->loop->removeReadStream($this->resource); |
171
|
2 |
|
$this->reading = false; |
172
|
2 |
|
$this->emit('end', [ $this ]); |
173
|
|
|
} |
174
|
|
|
} |
175
|
2 |
|
} |
176
|
|
|
|
177
|
|
|
/** |
178
|
|
|
* Get function that should be invoked on read event. |
179
|
|
|
* |
180
|
|
|
* @return callable |
181
|
|
|
*/ |
182
|
3 |
|
protected function getHandleReadFunction() |
183
|
|
|
{ |
184
|
3 |
|
return [ $this, 'handleRead' ]; |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
/** |
188
|
|
|
* Handle close. |
189
|
|
|
* |
190
|
|
|
* @internal |
191
|
|
|
*/ |
192
|
2 |
|
public function handleClose() |
193
|
|
|
{ |
194
|
2 |
|
$this->pause(); |
195
|
|
|
|
196
|
2 |
|
parent::handleClose(); |
197
|
2 |
|
} |
198
|
|
|
} |
199
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.