1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\Stream; |
4
|
|
|
|
5
|
|
|
use Dazzle\Throwable\Exception\Runtime\WriteException; |
6
|
|
|
use Dazzle\Throwable\Exception\Logic\InvalidArgumentException; |
7
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
8
|
|
|
use Dazzle\Loop\LoopInterface; |
9
|
|
|
use Dazzle\Util\Buffer\Buffer; |
10
|
|
|
use Dazzle\Util\Buffer\BufferInterface; |
11
|
|
|
use Error; |
12
|
|
|
use Exception; |
13
|
|
|
|
14
|
|
|
class StreamWriter extends Sync\StreamWriter implements StreamWriterInterface |
15
|
|
|
{ |
16
|
|
|
use LoopAwareTrait; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var bool |
20
|
|
|
*/ |
21
|
|
|
protected $writing; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var bool |
25
|
|
|
*/ |
26
|
|
|
protected $paused; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var BufferInterface |
30
|
|
|
*/ |
31
|
|
|
protected $buffer; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @param resource $resource |
35
|
|
|
* @param LoopInterface $loop |
36
|
|
|
* @param bool $autoClose |
37
|
|
|
* @throws InvalidArgumentException |
38
|
|
|
*/ |
39
|
4 |
View Code Duplication |
public function __construct($resource, LoopInterface $loop, $autoClose = true) |
|
|
|
|
40
|
|
|
{ |
41
|
4 |
|
parent::__construct($resource, $autoClose); |
42
|
|
|
|
43
|
4 |
|
if (function_exists('stream_set_write_buffer')) |
44
|
|
|
{ |
45
|
4 |
|
stream_set_write_buffer($this->resource, 0); |
46
|
|
|
} |
47
|
|
|
|
48
|
4 |
|
$this->loop = $loop; |
49
|
4 |
|
$this->writing = false; |
50
|
4 |
|
$this->paused = true; |
51
|
4 |
|
$this->buffer = new Buffer(); |
52
|
|
|
|
53
|
4 |
|
$this->resume(); |
54
|
4 |
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* |
58
|
|
|
*/ |
59
|
1 |
|
public function __destruct() |
60
|
|
|
{ |
61
|
1 |
|
parent::__destruct(); |
62
|
|
|
|
63
|
1 |
|
unset($this->loop); |
64
|
1 |
|
unset($this->writing); |
65
|
1 |
|
unset($this->paused); |
66
|
1 |
|
unset($this->buffer); |
67
|
1 |
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @override |
71
|
|
|
* @inheritDoc |
72
|
|
|
*/ |
73
|
|
|
public function isPaused() |
74
|
|
|
{ |
75
|
|
|
return $this->paused; |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @override |
80
|
|
|
* @inheritDoc |
81
|
|
|
*/ |
82
|
|
|
public function setBufferSize($bufferSize) |
83
|
|
|
{ |
84
|
|
|
$this->bufferSize = $bufferSize; |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @override |
89
|
|
|
* @inheritDoc |
90
|
|
|
*/ |
91
|
2 |
|
public function getBufferSize() |
92
|
|
|
{ |
93
|
2 |
|
return $this->bufferSize; |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @override |
98
|
|
|
* @inheritDoc |
99
|
|
|
*/ |
100
|
2 |
|
public function pause() |
101
|
|
|
{ |
102
|
2 |
|
if (!$this->paused) |
103
|
|
|
{ |
104
|
2 |
|
$this->paused = true; |
105
|
2 |
|
$this->writing = false; |
106
|
2 |
|
$this->loop->removeWriteStream($this->resource); |
107
|
|
|
} |
108
|
2 |
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* @override |
112
|
|
|
* @inheritDoc |
113
|
|
|
*/ |
114
|
4 |
|
public function resume() |
115
|
|
|
{ |
116
|
4 |
|
if ($this->writable && $this->paused) |
117
|
|
|
{ |
118
|
4 |
|
$this->paused = false; |
119
|
4 |
View Code Duplication |
if ($this->buffer->isEmpty() === false) |
|
|
|
|
120
|
|
|
{ |
121
|
|
|
$this->writing = true; |
122
|
|
|
$this->loop->addWriteStream($this->resource, $this->getHandleWriteFunction()); |
123
|
|
|
} |
124
|
|
|
} |
125
|
4 |
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* @override |
129
|
|
|
* @inheritDoc |
130
|
|
|
*/ |
131
|
3 |
View Code Duplication |
public function write($text = '') |
|
|
|
|
132
|
|
|
{ |
133
|
3 |
|
if (!$this->writable) |
134
|
|
|
{ |
135
|
|
|
return $this->throwAndEmitException( |
136
|
|
|
new WriteException('Stream is no longer writable.') |
137
|
|
|
); |
138
|
|
|
} |
139
|
|
|
|
140
|
3 |
|
$this->buffer->push($text); |
141
|
|
|
|
142
|
3 |
|
if (!$this->writing && !$this->paused) |
143
|
|
|
{ |
144
|
3 |
|
$this->writing = true; |
145
|
3 |
|
$this->loop->addWriteStream($this->resource, $this->getHandleWriteFunction()); |
146
|
|
|
} |
147
|
|
|
|
148
|
3 |
|
return $this->buffer->length() < $this->bufferSize; |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* @override |
153
|
|
|
* @inheritDoc |
154
|
|
|
*/ |
155
|
1 |
View Code Duplication |
public function close() |
|
|
|
|
156
|
|
|
{ |
157
|
1 |
|
if ($this->closing) |
158
|
|
|
{ |
159
|
|
|
return; |
160
|
|
|
} |
161
|
|
|
|
162
|
1 |
|
$this->closing = true; |
163
|
1 |
|
$this->readable = false; |
|
|
|
|
164
|
1 |
|
$this->writable = false; |
165
|
|
|
|
166
|
1 |
|
if ($this->buffer->isEmpty() === false) |
167
|
|
|
{ |
168
|
|
|
$this->writeEnd(); |
169
|
|
|
} |
170
|
|
|
|
171
|
1 |
|
$this->emit('close', [ $this ]); |
172
|
1 |
|
$this->handleClose(); |
173
|
1 |
|
$this->emit('done', [ $this ]); |
174
|
1 |
|
} |
175
|
|
|
|
176
|
|
|
/** |
177
|
|
|
* Handle the outcoming stream. |
178
|
|
|
* |
179
|
|
|
* @internal |
180
|
|
|
*/ |
181
|
3 |
View Code Duplication |
public function handleWrite() |
|
|
|
|
182
|
|
|
{ |
183
|
3 |
|
$text = $this->buffer->peek(); |
184
|
3 |
|
$sent = fwrite($this->resource, $text, $this->bufferSize); |
185
|
|
|
|
186
|
3 |
|
if ($sent === false) |
187
|
|
|
{ |
188
|
|
|
$this->emit('error', [ $this, new WriteException('Error occurred while writing to the stream resource.') ]); |
189
|
|
|
return; |
190
|
|
|
} |
191
|
|
|
|
192
|
3 |
|
$lenBefore = strlen($text); |
193
|
3 |
|
$lenAfter = $lenBefore - $sent; |
194
|
3 |
|
$this->buffer->remove($sent); |
195
|
|
|
|
196
|
3 |
|
if ($lenAfter > 0 && $lenBefore >= $this->bufferSize && $lenAfter < $this->bufferSize) |
197
|
|
|
{ |
198
|
2 |
|
$this->emit('drain', [ $this ]); |
199
|
|
|
} |
200
|
3 |
|
else if ($lenAfter === 0) |
201
|
|
|
{ |
202
|
3 |
|
$this->loop->removeWriteStream($this->resource); |
203
|
3 |
|
$this->writing = false; |
204
|
3 |
|
$this->emit('drain', [ $this ]); |
205
|
3 |
|
$this->emit('finish', [ $this ]); |
206
|
|
|
} |
207
|
3 |
|
} |
208
|
|
|
|
209
|
|
|
/** |
210
|
|
|
* Handle close. |
211
|
|
|
* |
212
|
|
|
* @internal |
213
|
|
|
*/ |
214
|
2 |
|
public function handleClose() |
215
|
|
|
{ |
216
|
2 |
|
$this->pause(); |
217
|
|
|
|
218
|
2 |
|
parent::handleClose(); |
219
|
2 |
|
} |
220
|
|
|
|
221
|
|
|
/** |
222
|
|
|
* Get function that should be invoked on write event. |
223
|
|
|
* |
224
|
|
|
* @return callable |
225
|
|
|
*/ |
226
|
4 |
|
protected function getHandleWriteFunction() |
227
|
|
|
{ |
228
|
4 |
|
return [ $this, 'handleWrite' ]; |
229
|
|
|
} |
230
|
|
|
|
231
|
|
|
/** |
232
|
|
|
* |
233
|
|
|
*/ |
234
|
|
View Code Duplication |
private function writeEnd() |
|
|
|
|
235
|
|
|
{ |
236
|
|
|
do |
237
|
|
|
{ |
238
|
|
|
try |
239
|
|
|
{ |
240
|
|
|
$sent = @fwrite($this->resource, $this->buffer->peek()); |
241
|
|
|
$this->buffer->remove($sent); |
242
|
|
|
} |
243
|
|
|
catch (Error $ex) |
244
|
|
|
{ |
245
|
|
|
$sent = 0; |
246
|
|
|
} |
247
|
|
|
catch (Exception $ex) |
248
|
|
|
{ |
249
|
|
|
$sent = 0; |
250
|
|
|
} |
251
|
|
|
} |
252
|
|
|
while (is_resource($this->resource) && $sent > 0 && !$this->buffer->isEmpty()); |
253
|
|
|
} |
254
|
|
|
} |
255
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.