1 | <?php |
||
13 | class CompositeStream implements DuplexStreamInterface |
||
14 | { |
||
15 | use EventEmitterTrait; |
||
16 | use UtilsTrait; |
||
17 | |||
18 | /** |
||
19 | * @var ReadableStreamInterface |
||
20 | */ |
||
21 | protected $readable; |
||
22 | |||
23 | /** |
||
24 | * @var WritableStreamInterface |
||
25 | */ |
||
26 | protected $writable; |
||
27 | |||
28 | protected $pipeSource; |
||
29 | |||
30 | /** |
||
31 | * @param ReadableStreamInterface $readable |
||
32 | * @param WritableStreamInterface $writable |
||
33 | */ |
||
34 | 19 | public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable) |
|
35 | { |
||
36 | 19 | $this->readable = $readable; |
|
37 | 19 | $this->writable = $writable; |
|
38 | |||
39 | 19 | $this->forwardEvents($this->readable, $this, ['data', 'end', 'error', 'close']); |
|
40 | 19 | $this->forwardEvents($this->writable, $this, ['drain', 'error', 'close', 'pipe']); |
|
41 | |||
42 | 19 | $this->readable->on('close', [$this, 'close']); |
|
43 | 19 | $this->writable->on('close', [$this, 'close']); |
|
44 | |||
45 | 19 | $this->on('pipe', [$this, 'handlePipeEvent']); |
|
46 | 19 | } |
|
47 | |||
48 | /** |
||
49 | * @param $source |
||
50 | * |
||
51 | * @return CompositeStream |
||
52 | */ |
||
53 | 5 | public function handlePipeEvent($source) : self |
|
54 | { |
||
55 | 5 | $this->pipeSource = $source; |
|
56 | |||
57 | 5 | return $this; |
|
58 | } |
||
59 | |||
60 | /** |
||
61 | * {@inheritDoc} |
||
62 | */ |
||
63 | 6 | public function isReadable() : bool |
|
67 | |||
68 | /** |
||
69 | * {@inheritDoc} |
||
70 | */ |
||
71 | 2 | public function pause() : self |
|
72 | { |
||
73 | 2 | if ($this->pipeSource) { |
|
74 | 1 | $this->pipeSource->pause(); |
|
75 | } |
||
76 | |||
77 | 2 | $this->readable->pause(); |
|
78 | |||
79 | 2 | return $this; |
|
80 | } |
||
81 | |||
82 | /** |
||
83 | * {@inheritDoc} |
||
84 | */ |
||
85 | 2 | public function resume() : self |
|
86 | { |
||
87 | 2 | if ($this->pipeSource) { |
|
88 | 1 | $this->pipeSource->resume(); |
|
89 | } |
||
90 | |||
91 | 2 | $this->readable->resume(); |
|
92 | |||
93 | 2 | return $this; |
|
94 | } |
||
95 | |||
96 | /** |
||
97 | * {@inheritDoc} |
||
98 | */ |
||
99 | 2 | public function pipe(WritableStreamInterface $dest, array $options = []) : WritableStreamInterface |
|
100 | { |
||
101 | 2 | $this->pipeAll($this, $dest, $options); |
|
102 | |||
103 | 2 | return $dest; |
|
104 | } |
||
105 | |||
106 | /** |
||
107 | * {@inheritDoc} |
||
108 | */ |
||
109 | 6 | public function isWritable() : bool |
|
113 | |||
114 | /** |
||
115 | * {@inheritDoc} |
||
116 | */ |
||
117 | 3 | public function write($data) |
|
121 | |||
122 | /** |
||
123 | * {@inheritDoc} |
||
124 | */ |
||
125 | 1 | public function end($data = null) : self |
|
126 | { |
||
131 | |||
132 | /** |
||
133 | * {@inheritDoc} |
||
134 | */ |
||
135 | 5 | public function close() : self |
|
144 | } |
||
145 |