Completed
Push — master ( 48e619...4ad90b )
by Aurimas
45:40 queued 41:42
created

BufferedSink::handlePipeEvent()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Thruster\Component\Stream;
4
5
use Thruster\Component\Promise\Deferred;
6
use Thruster\Component\Promise\PromiseInterface;
7
use Thruster\Component\Promise\PromisorInterface;
8
9
/**
10
 * Class BufferedSink
11
 *
12
 * @package Thruster\Component\Stream
13
 * @author  Aurimas Niekis <[email protected]>
14
 */
15
class BufferedSink extends WritableStream implements PromisorInterface
16
{
17
    use UtilsTrait;
18
19
    private $buffer = '';
20
    private $deferred;
21
22 11
    public function __construct()
23
    {
24 11
        $this->deferred = new Deferred();
25
26 11
        $this->on('pipe', [$this, 'handlePipeEvent']);
27 11
        $this->on('error', [$this, 'handleErrorEvent']);
28
29 11
        parent::__construct();
30 11
    }
31
32 3
    public function handlePipeEvent($source)
33
    {
34 3
        $this->forwardEvents($source, $this, ['error']);
35 3
    }
36
37 2
    public function handleErrorEvent($e)
38
    {
39 2
        $this->deferred->reject($e);
40 2
    }
41
42 6
    public function write($data)
43
    {
44 6
        $this->buffer .= $data;
45 6
    }
46
47 7
    public function close()
48
    {
49 7
        if (true === $this->closed) {
50 2
            return;
51
        }
52
53 7
        parent::close();
54
55 7
        $this->deferred->resolve($this->buffer);
56 7
    }
57
58 11
    public function promise() : PromiseInterface
59
    {
60 11
        return $this->deferred->promise();
61
    }
62
63 1
    public static function createPromise(ReadableStreamInterface $stream) : PromiseInterface
64
    {
65 1
        $sink = new static();
66
67 1
        $stream->pipe($sink);
68
69 1
        return $sink->promise();
70
    }
71
}
72