Completed
Push — master ( 3fef96...48e619 )
by Aurimas
52:25
created

BufferedSink::write()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Thruster\Component\Stream;
4
5
use Thruster\Component\Promise\Deferred;
6
use Thruster\Component\Promise\PromiseInterface;
7
use Thruster\Component\Promise\PromisorInterface;
8
9
/**
10
 * Class BufferedSink
11
 *
12
 * @package Thruster\Component\Stream
13
 * @author  Aurimas Niekis <[email protected]>
14
 */
15
class BufferedSink extends WritableStream implements PromisorInterface
16
{
17
    use UtilsTrait;
18
19
    private $buffer = '';
20
    private $deferred;
21
22
    public function __construct()
23
    {
24
        $this->deferred = new Deferred();
25
26
        $this->on('pipe', [$this, 'handlePipeEvent']);
27
        $this->on('error', [$this, 'handleErrorEvent']);
28
29
        parent::__construct();
30
    }
31
32
    public function handlePipeEvent($source)
33
    {
34
        $this->forwardEvents($source, $this, ['error']);
35
    }
36
37
    public function handleErrorEvent($e)
38
    {
39
        $this->deferred->reject($e);
40
    }
41
42
    public function write($data)
43
    {
44
        $this->buffer .= $data;
45
    }
46
47
    public function close()
48
    {
49
        if (true === $this->closed) {
50
            return;
51
        }
52
53
        parent::close();
54
55
        $this->deferred->resolve($this->buffer);
56
    }
57
58
    public function promise() : PromiseInterface
59
    {
60
        return $this->deferred->promise();
61
    }
62
63
    public static function createPromise(ReadableStreamInterface $stream) : PromiseInterface
64
    {
65
        $sink = new static();
66
67
        $stream->pipe($sink);
68
69
        return $sink->promise();
70
    }
71
}
72