Completed
Push — master ( 48e619...4ad90b )
by Aurimas
45:40 queued 41:42
created

BufferedSink::close()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
cc 2
eloc 5
nc 2
nop 0
crap 2
1
<?php
2
3
namespace Thruster\Component\Stream;
4
5
use Thruster\Component\Promise\Deferred;
6
use Thruster\Component\Promise\PromiseInterface;
7
use Thruster\Component\Promise\PromisorInterface;
8
9
/**
10
 * Class BufferedSink
11
 *
12
 * @package Thruster\Component\Stream
13
 * @author  Aurimas Niekis <[email protected]>
14
 */
15
class BufferedSink extends WritableStream implements PromisorInterface
16
{
17
    use UtilsTrait;
18
19
    private $buffer = '';
20
    private $deferred;
21
22 11
    public function __construct()
23
    {
24 11
        $this->deferred = new Deferred();
25
26 11
        $this->on('pipe', [$this, 'handlePipeEvent']);
27 11
        $this->on('error', [$this, 'handleErrorEvent']);
28
29 11
        parent::__construct();
30 11
    }
31
32 3
    public function handlePipeEvent($source)
33
    {
34 3
        $this->forwardEvents($source, $this, ['error']);
35 3
    }
36
37 2
    public function handleErrorEvent($e)
38
    {
39 2
        $this->deferred->reject($e);
40 2
    }
41
42 6
    public function write($data)
43
    {
44 6
        $this->buffer .= $data;
45 6
    }
46
47 7
    public function close()
48
    {
49 7
        if (true === $this->closed) {
50 2
            return;
51
        }
52
53 7
        parent::close();
54
55 7
        $this->deferred->resolve($this->buffer);
56 7
    }
57
58 11
    public function promise() : PromiseInterface
59
    {
60 11
        return $this->deferred->promise();
61
    }
62
63 1
    public static function createPromise(ReadableStreamInterface $stream) : PromiseInterface
64
    {
65 1
        $sink = new static();
66
67 1
        $stream->pipe($sink);
68
69 1
        return $sink->promise();
70
    }
71
}
72