Completed
Push — master ( 1de5f0...bc31f8 )
by Anton
01:27
created

StreamRelay::__construct()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 13
c 0
b 0
f 0
rs 9.4285
cc 3
eloc 7
nc 3
nop 2
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
/**
11
 * Communicates with remote server/client over streams using byte payload:
12
 *
13
 * [ prefix     ][ payload        ]
14
 * [ 1+8 bytes  ][ message length ]
15
 *
16
 * prefix:
17
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
18
 */
19
class StreamRelay implements RelayInterface
20
{
21
    /** @var resource */
22
    private $in;
23
24
    /** @var resource */
25
    private $out;
26
27
    /**
28
     * Example:
29
     * $relay = new StreamRelay(STDIN, STDOUT);
30
     *
31
     * @param resource $in  Must be readable.
32
     * @param resource $out Must be writable.
33
     *
34
     * @throws Exceptions\InvalidArgumentException
35
     */
36
    public function __construct($in, $out)
37
    {
38
        if (!$this->assertMode($in, 'r')) {
39
            throw new Exceptions\InvalidArgumentException("resource `in` must be readable");
40
        }
41
42
        if (!$this->assertMode($out, 'w')) {
43
            throw new Exceptions\InvalidArgumentException("resource `out` must be readable");
44
        }
45
46
        $this->in = $in;
47
        $this->out = $out;
48
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53
    public function send($payload, int $flags = null)
54
    {
55
        $size = strlen($payload);
56
        if ($flags & self::PAYLOAD_NONE && $size != 0) {
57
            throw new Exceptions\TransportException("unable to send payload with PAYLOAD_NONE flag");
58
        }
59
60
        if (fwrite($this->out, pack('CP', $flags, $size), 9) === false) {
61
            throw new Exceptions\TransportException("unable to write prefix to the stream");
62
        }
63
64
        if (!($flags & self::PAYLOAD_NONE) && !fwrite($this->out, $payload, $size)) {
65
            throw new Exceptions\TransportException("unable to write payload to the stream");
66
        }
67
68
        return $this;
69
    }
70
71
    /**
72
     * {@inheritdoc}
73
     */
74
    public function receiveSync(int &$flags = null)
75
    {
76
        $prefix = $this->fetchPrefix();
77
        $flags = $prefix['flags'];
78
79
        $result = null;
80
        if ($prefix['size'] !== 0) {
81
            $leftBytes = $prefix['size'];
82
            $buffer = null;
0 ignored issues
show
Unused Code introduced by
$buffer is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
83
84
            //Add ability to write to stream in a future
85
            while ($leftBytes > 0) {
86
                $buffer = fread($this->in, min($leftBytes, self::BUFFER_SIZE));
87
                if ($buffer === false) {
88
                    throw new Exceptions\TransportException("error reading payload from the stream");
89
                }
90
91
                $result .= $buffer;
92
                $leftBytes -= strlen($buffer);
93
            }
94
        }
95
96
        return $result;
97
    }
98
99
    /**
100
     * @return array Prefix [flag, length]
101
     *
102
     * @throws Exceptions\PrefixException
103
     */
104
    private function fetchPrefix(): array
105
    {
106
        $prefixBody = fread($this->in, 9);
107
        if ($prefixBody === false) {
108
            throw new Exceptions\PrefixException("unable to read prefix from the stream");
109
        }
110
111
        $result = unpack("Cflags/Psize", $prefixBody);
112
        if (!is_array($result)) {
113
            throw new Exceptions\PrefixException("invalid prefix");
114
        }
115
116
        return $result;
117
    }
118
119
    /**
120
     * Checks if stream is writable or readable.
121
     *
122
     * @param resource $stream
123
     * @param string   $mode
124
     *
125
     * @return bool
126
     */
127
    private function assertMode($stream, $mode): bool
128
    {
129
        $meta = stream_get_meta_data($stream);
130
131
        return strpos($meta['mode'], $mode) !== false;
132
    }
133
}
134