CompositeStream   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 100%
Metric Value
wmc 12
lcom 1
cbo 4
dl 0
loc 132
ccs 39
cts 39
cp 1
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A isReadable() 0 4 1
A isWritable() 0 4 1
A write() 0 4 1
A __construct() 0 13 1
A handlePipeEvent() 0 6 1
A pause() 0 10 2
A resume() 0 10 2
A pipe() 0 6 1
A end() 0 6 1
A close() 0 9 1
1
<?php
2
3
namespace Thruster\Component\Stream;
4
5
use Thruster\Component\EventEmitter\EventEmitterTrait;
6
7
/**
8
 * Class CompositeStream
9
 *
10
 * @package Thruster\Component\Stream
11
 * @author  Aurimas Niekis <[email protected]>
12
 */
13
class CompositeStream implements DuplexStreamInterface
14
{
15
    use EventEmitterTrait;
16
    use UtilsTrait;
17
18
    /**
19
     * @var ReadableStreamInterface
20
     */
21
    protected $readable;
22
23
    /**
24
     * @var WritableStreamInterface
25
     */
26
    protected $writable;
27
28
    protected $pipeSource;
29
30
    /**
31
     * @param ReadableStreamInterface $readable
32
     * @param WritableStreamInterface $writable
33
     */
34 19
    public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
35
    {
36 19
        $this->readable = $readable;
37 19
        $this->writable = $writable;
38
39 19
        $this->forwardEvents($this->readable, $this, ['data', 'end', 'error', 'close']);
40 19
        $this->forwardEvents($this->writable, $this, ['drain', 'error', 'close', 'pipe']);
41
42 19
        $this->readable->on('close', [$this, 'close']);
43 19
        $this->writable->on('close', [$this, 'close']);
44
45 19
        $this->on('pipe', [$this, 'handlePipeEvent']);
46 19
    }
47
48
    /**
49
     * @param $source
50
     *
51
     * @return CompositeStream
52
     */
53 5
    public function handlePipeEvent($source) : self
54
    {
55 5
        $this->pipeSource = $source;
56
57 5
        return $this;
58
    }
59
60
    /**
61
     * {@inheritDoc}
62
     */
63 6
    public function isReadable() : bool
64
    {
65 6
        return $this->readable->isReadable();
66
    }
67
68
    /**
69
     * {@inheritDoc}
70
     */
71 2
    public function pause() : self
72
    {
73 2
        if ($this->pipeSource) {
74 1
            $this->pipeSource->pause();
75
        }
76
77 2
        $this->readable->pause();
78
79 2
        return $this;
80
    }
81
82
    /**
83
     * {@inheritDoc}
84
     */
85 2
    public function resume() : self
86
    {
87 2
        if ($this->pipeSource) {
88 1
            $this->pipeSource->resume();
89
        }
90
91 2
        $this->readable->resume();
92
93 2
        return $this;
94
    }
95
96
    /**
97
     * {@inheritDoc}
98
     */
99 2
    public function pipe(WritableStreamInterface $dest, array $options = []) : WritableStreamInterface
100
    {
101 2
        $this->pipeAll($this, $dest, $options);
102
103 2
        return $dest;
104
    }
105
106
    /**
107
     * {@inheritDoc}
108
     */
109 6
    public function isWritable() : bool
110
    {
111 6
        return $this->writable->isWritable();
112
    }
113
114
    /**
115
     * {@inheritDoc}
116
     */
117 3
    public function write($data)
118
    {
119 3
        return $this->writable->write($data);
120
    }
121
122
    /**
123
     * {@inheritDoc}
124
     */
125 1
    public function end($data = null) : self
126
    {
127 1
        $this->writable->end($data);
128
129 1
        return $this;
130
    }
131
132
    /**
133
     * {@inheritDoc}
134
     */
135 5
    public function close() : self
136
    {
137 5
        $this->pipeSource = null;
138
139 5
        $this->readable->close();
140 5
        $this->writable->close();
141
142 5
        return $this;
143
    }
144
}
145