StreamDecorator::pipe()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Nucleus - XMPP Library for PHP
4
 *
5
 * Copyright (C) 2016, Some rights reserved.
6
 *
7
 * @author Kacper "Kadet" Donat <[email protected]>
8
 *
9
 * Contact with author:
10
 * Xmpp: [email protected]
11
 * E-mail: [email protected]
12
 *
13
 * From Kadet with love.
14
 */
15
16
declare (strict_types = 1);
17
/**
18
 * XMPP Library
19
 *
20
 * Copyright (C) 2016, Some right reserved.
21
 *
22
 * @author Kacper "Kadet" Donat <[email protected]>
23
 *
24
 * Contact with author:
25
 * Xmpp: [email protected]
26
 * E-mail: [email protected]
27
 *
28
 * From Kadet with love.
29
 */
30
31
namespace Kadet\Xmpp\Utils;
32
33
use React\Stream\DuplexStreamInterface;
34
use React\Stream\WritableStreamInterface;
35
36
abstract class StreamDecorator implements DuplexStreamInterface
37
{
38
    use BetterEmitter;
39
40
    private $_redirectors;
41
42
    /**
43
     * @var DuplexStreamInterface
44
     */
45
    protected $_decorated = null;
46
47
    /**
48
     * StreamDecorator constructor.
49
     * @param DuplexStreamInterface $decorated
50
     */
51 29
    public function __construct(DuplexStreamInterface $decorated = null)
52
    {
53 29
        if ($decorated !== null) {
54 20
            $this->exchangeStream($decorated);
55
        }
56 29
    }
57
58 1
    public function isReadable()
59
    {
60 1
        return $this->_decorated->isReadable();
61
    }
62
63 1
    public function pause()
64
    {
65 1
        $this->_decorated->pause();
66 1
    }
67
68 1
    public function resume()
69
    {
70 1
        $this->_decorated->resume();
71 1
    }
72
73 1
    public function pipe(WritableStreamInterface $destination, array $options = array())
74
    {
75 1
        $this->_decorated->pipe($destination, $options);
76 1
    }
77
78 1
    public function close()
79
    {
80 1
        $this->_decorated->close();
81 1
    }
82
83 1
    public function isWritable()
84
    {
85 1
        return $this->_decorated->isWritable();
86
    }
87
88 1
    public function write($data)
89
    {
90 1
        return $this->_decorated->write($data);
91
    }
92
93 1
    public function end($data = null)
94
    {
95 1
        return $this->_decorated->end($data);
96
    }
97
98 20
    public function exchangeStream(DuplexStreamInterface $stream)
99
    {
100 20
        static $events = ['data', 'end', 'drain', 'error', 'close', 'pipe'];
101
102 20
        if ($this->_decorated !== null) {
103 6
            $this->unsubscribe($this->_decorated, $events);
104
        }
105
106 20
        $this->subscribe($stream, $events);
107 20
        $this->_decorated = $stream;
108 20
    }
109
110 6
    private function unsubscribe(DuplexStreamInterface $stream, array $events)
111
    {
112 6
        foreach ($events as $event) {
113 6
            if (!isset($this->_redirectors[$event])) {
114
                continue;
115
            }
116
117 6
            $stream->removeListener($event, $this->_redirectors[$event]);
118
        }
119 6
    }
120
121 20
    private function subscribe(DuplexStreamInterface $stream, array $events)
122
    {
123 20
        foreach ($events as $event) {
124 20
            if (!isset($this->_redirectors[$event])) {
125 11
                $this->_redirectors[$event] = function (...$arguments) use ($event) {
126 11
                    $this->emit($event, $arguments);
127 11
                };
128
            }
129
130 20
            $stream->on($event, $this->_redirectors[$event]);
131
        }
132 20
    }
133
134 1
    public function getDecorated() : DuplexStreamInterface
135
    {
136 1
        return $this->_decorated;
137
    }
138
}
139