Completed
Push — master ( 186cf3...8d9839 )
by Anton
02:02 queued 11s
created

StreamRelay::assertReadable()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
/**
11
 * Communicates with remote server/client over streams using byte payload:
12
 *
13
 * [ prefix       ][ payload                               ]
14
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
15
 *
16
 * prefix:
17
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
18
 */
19
class StreamRelay implements RelayInterface
20
{
21
    /** @var resource */
22
    private $in;
23
24
    /** @var resource */
25
    private $out;
26
27
    /**
28
     * Example:
29
     * $relay = new StreamRelay(STDIN, STDOUT);
30
     *
31
     * @param resource $in  Must be readable.
32
     * @param resource $out Must be writable.
33
     *
34
     * @throws Exceptions\InvalidArgumentException
35
     */
36
    public function __construct($in, $out)
37
    {
38
        if (!is_resource($in) || get_resource_type($in) !== 'stream') {
39
            throw new Exceptions\InvalidArgumentException("expected a valid `in` stream resource");
40
        }
41
42
        if (!$this->assertReadable($in)) {
43
            throw new Exceptions\InvalidArgumentException("resource `in` must be readable");
44
        }
45
46
        if (!is_resource($out) || get_resource_type($out) !== 'stream') {
47
            throw new Exceptions\InvalidArgumentException("expected a valid `out` stream resource");
48
        }
49
50
        if (!$this->assertWritable($out)) {
51
            throw new Exceptions\InvalidArgumentException("resource `out` must be writable");
52
        }
53
54
        $this->in = $in;
55
        $this->out = $out;
56
    }
57
58
    /**
59
     * {@inheritdoc}
60
     */
61
    public function send($payload, int $flags = null)
62
    {
63
        $size = strlen($payload);
64
        if ($flags & self::PAYLOAD_NONE && $size != 0) {
65
            throw new Exceptions\TransportException("unable to send payload with PAYLOAD_NONE flag");
66
        }
67
68
        $body = pack('CPJ', $flags, $size, $size);
69
70
        if (!($flags & self::PAYLOAD_NONE)) {
71
            $body .= $payload;
72
        }
73
74
        if (fwrite($this->out, $body, 17 + $size) === false) {
75
            throw new Exceptions\TransportException("unable to write payload to the stream");
76
        }
77
78
        return $this;
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84
    public function receiveSync(int &$flags = null)
85
    {
86
        $prefix = $this->fetchPrefix();
87
        $flags = $prefix['flags'];
88
89
        $result = null;
90
        if ($prefix['size'] !== 0) {
91
            $leftBytes = $prefix['size'];
92
93
            //Add ability to write to stream in a future
94
            while ($leftBytes > 0) {
95
                $buffer = fread($this->in, min($leftBytes, self::BUFFER_SIZE));
96
                if ($buffer === false) {
97
                    throw new Exceptions\TransportException("error reading payload from the stream");
98
                }
99
100
                $result .= $buffer;
101
                $leftBytes -= strlen($buffer);
102
            }
103
        }
104
105
        return $result;
106
    }
107
108
    /**
109
     * @return array Prefix [flag, length]
110
     *
111
     * @throws Exceptions\PrefixException
112
     */
113
    private function fetchPrefix(): array
114
    {
115
        $prefixBody = fread($this->in, 17);
116
        if ($prefixBody === false) {
117
            throw new Exceptions\PrefixException("unable to read prefix from the stream");
118
        }
119
120
        $result = unpack("Cflags/Psize/Jrevs", $prefixBody);
121
        if (!is_array($result)) {
122
            throw new Exceptions\PrefixException("invalid prefix");
123
        }
124
125
        if ($result['size'] != $result['revs']) {
126
            throw new Exceptions\PrefixException("invalid prefix (checksum)");
127
        }
128
129
        return $result;
130
    }
131
132
    /**
133
     * Checks if stream is readable.
134
     *
135
     * @param resource $stream
136
     *
137
     * @return bool
138
     */
139
    private function assertReadable($stream): bool
140
    {
141
        $meta = stream_get_meta_data($stream);
142
143
        return in_array($meta['mode'], ['r', 'r+', 'w+', 'a+', 'x+', 'c+'], true);
144
    }
145
146
    /**
147
     * Checks if stream is writable.
148
     *
149
     * @param resource $stream
150
     *
151
     * @return bool
152
     */
153
    private function assertWritable($stream): bool
154
    {
155
        $meta = stream_get_meta_data($stream);
156
157
        return $meta['mode'] !== 'r';
158
    }
159
}
160