SftpResource::setBufferSize()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 0
cts 3
cp 0
crap 2
1
<?php
2
3
namespace Dazzle\SSH\Driver\Sftp;
4
5
use Dazzle\Loop\LoopAwareTrait;
6
use Dazzle\SSH\SSH2DriverInterface;
7
use Dazzle\SSH\SSH2ResourceInterface;
8
use Dazzle\Stream\Sync\Stream;
9
use Dazzle\Throwable\Exception\Runtime\ReadException;
10
use Dazzle\Throwable\Exception\Runtime\WriteException;
11
use Dazzle\Util\Buffer\Buffer;
12
use Dazzle\Util\Buffer\BufferInterface;
13
use Error;
14
use Exception;
15
16
class SftpResource extends Stream implements SSH2ResourceInterface
0 ignored issues
show
Bug introduced by
There is at least one abstract method in this class. Maybe declare it as abstract, or implement the remaining methods: copyEvent, copyEvents, delay, delayOnce, delayTimes, discardEvents, emit, findListener, flushListeners, forwardEvents, getMetadata, getMode, getResource, getResourceId, getStreamType, getWrapperType, isOpen, isReadable, isSeekable, isWritable, on, once, removeListener, removeListeners, rewind, seek, setMode, tell, times
Loading history...
17
{
18
    use LoopAwareTrait;
19
20
    /**
21
     * @var bool
22
     */
23
    protected $writing;
24
25
    /**
26
     * @var bool
27
     */
28
    protected $reading;
29
30
    /**
31
     * @var bool
32
     */
33
    protected $readingStarted;
34
35
    /**
36
     * @var bool
37
     */
38
    protected $paused;
39
40
    /**
41
     * @var BufferInterface
42
     */
43
    protected $buffer;
44
45
    /**
46
     * @param SSH2DriverInterface $driver
47
     * @param resource $resource
48
     */
49
    public function __construct(SSH2DriverInterface $driver, $resource)
50
    {
51
        parent::__construct($resource);
52
53
        $this->loop = $driver->getLoop();
54
        $this->writing = false;
55
        $this->reading = false;
56
        $this->readingStarted = false;
57
        $this->paused = false;
58
        $this->buffer = new Buffer();
59
    }
60
61
    /**
62
     * @return string
63
     */
64
    public function getId()
65
    {
66
        return (string) $this->getResourceId();
67
    }
68
69
    /**
70
     * @override
71
     * @inheritDoc
72
     */
73
    public function isPaused()
74
    {
75
        return $this->paused;
76
    }
77
78
    /**
79
     * @override
80
     * @inheritDoc
81
     */
82
    public function setBufferSize($bufferSize)
83
    {
84
        $this->bufferSize = $bufferSize;
85
    }
86
87
    /**
88
     * @override
89
     * @inheritDoc
90
     */
91
    public function getBufferSize()
92
    {
93
        return $this->bufferSize;
94
    }
95
96
    /**
97
     * @override
98
     * @inheritDoc
99
     */
100
    public function pause()
101
    {
102
        if (!$this->paused)
103
        {
104
            $this->paused = true;
105
            $this->writing = false;
106
            $this->reading = false;
107
        }
108
    }
109
110
    /**
111
     * @override
112
     * @inheritDoc
113
     */
114
    public function resume()
115
    {
116
        if (($this->writable || $this->readable) && $this->paused)
117
        {
118
            $this->paused = false;
119
120
            if ($this->readable && $this->readingStarted)
121
            {
122
                $this->reading = true;
123
            }
124
125
            if ($this->writable && $this->buffer->isEmpty() === false)
126
            {
127
                $this->writing = true;
128
            }
129
        }
130
    }
131
132
    /**
133
     * @override
134
     * @inheritDoc
135
     */
136
    public function write($text = '')
137
    {
138
        if (!$this->writable)
139
        {
140
            return $this->throwAndEmitException(
141
                new WriteException('Stream is no longer writable.')
142
            );
143
        }
144
145
        $this->buffer->push($text);
146
147
        if (!$this->writing && !$this->paused)
148
        {
149
            $this->writing = true;
150
        }
151
152
        return $this->buffer->length() < $this->bufferSize;
153
    }
154
155
    /**
156
     * @override
157
     * @inheritDoc
158
     */
159
    public function read($length = null)
160
    {
161
        if (!$this->readable)
162
        {
163
            return $this->throwAndEmitException(
164
                new ReadException('Stream is no longer readable.')
165
            );
166
        }
167
168
        if (!$this->reading && !$this->paused)
169
        {
170
            $this->reading = true;
171
            $this->readingStarted = true;
172
        }
173
174
        return '';
175
    }
176
177
    /**
178
     * @override
179
     * @inheritDoc
180
     */
181
    public function close()
182
    {
183
        if ($this->closing)
184
        {
185
            return;
186
        }
187
188
        $this->closing = true;
189
        $this->readable = false;
190
        $this->writable = false;
191
192
        if ($this->buffer->isEmpty() === false)
193
        {
194
            $this->writeEnd();
195
        }
196
197
        $this->emit('close', [ $this ]);
198
        $this->handleClose();
199
        $this->emit('done', [ $this ]);
200
    }
201
202
    /**
203
     * Handle the outcoming stream.
204
     *
205
     * @internal
206
     */
207
    public function handleWrite()
208
    {
209
        try
210
        {
211
            if (!$this->writing)
212
            {
213
                return;
214
            }
215
216
            $text = $this->buffer->peek();
217
218
            if ($text !== '')
219
            {
220
                $sent = fwrite($this->resource, $text);
221
            }
222
            else
223
            {
224
                $sent = 0;
225
            }
226
227
            if ($text !== '' && !$sent)
228
            {
229
                $this->emit('error', [ $this, new WriteException('Error occurred while writing to the stream resource.') ]);
230
                return;
231
            }
232
233
            $lenBefore = strlen($text);
234
            $lenAfter = $lenBefore - $sent;
235
            $this->buffer->remove($sent);
236
237
            $this->emit('drain', [ $this ]);
238
239
            if ($lenAfter === 0 && $this->buffer->isEmpty())
240
            {
241
                $this->writing = false;
242
                $this->emit('finish', [ $this ]);
243
            }
244
        }
245
        catch (Error $ex)
246
        {
247
            $this->emit('error', [ $this, $ex ]);
248
        }
249
        catch (Exception $ex)
250
        {
251
            $this->emit('error', [ $this, $ex ]);
252
        }
253
    }
254
255
    /**
256
     * Handle the incoming stream.
257
     *
258
     * @internal
259
     */
260
    public function handleRead()
261
    {
262
        try
263
        {
264
            if (!$this->reading)
265
            {
266
                return;
267
            }
268
269
            $length = $this->bufferSize;
270
            $ret = fread($this->resource, $length);
271
272
            if ($ret === false)
273
            {
274
                $this->emit('error', [ $this, new ReadException('Error occurred while reading from the stream resource.') ]);
275
                return;
276
            }
277
278 View Code Duplication
            if ($ret !== '')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
279
            {
280
                $this->emit('data', [ $this, $ret ]);
281
282
                if (strlen($ret) < $length)
283
                {
284
                    $this->reading = false;
285
                    $this->emit('end', [ $this ]);
286
                }
287
            }
288
        }
289
        catch (Error $ex)
290
        {
291
            $this->emit('error', [ $this, $ex ]);
292
        }
293
        catch (Exception $ex)
294
        {
295
            $this->emit('error', [ $this, $ex ]);
296
        }
297
    }
298
299
    /**
300
     * Handle close.
301
     *
302
     * @internal
303
     */
304
    public function handleClose()
305
    {
306
        $this->pause();
307
308
        parent::handleClose();
309
    }
310
311
    /**
312
     *
313
     */
314
    private function writeEnd()
315
    {
316
        do
317
        {
318
            try
319
            {
320
                $sent = fwrite($this->resource, $this->buffer->peek());
321
                $this->buffer->remove($sent);
322
            }
323
            catch (Error $ex)
324
            {
325
                $sent = 0;
326
            }
327
            catch (Exception $ex)
328
            {
329
                $sent = 0;
330
            }
331
        }
332
        while (is_resource($this->resource) && $sent > 0 && !$this->buffer->isEmpty());
333
    }
334
}
335