StreamRelay::receiveSync()   A
last analyzed

Complexity

Conditions 5
Paths 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
nc 5
nop 1
dl 0
loc 23
rs 9.2408
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
5
 *
6
 * @author Wolfy-J
7
 */
8
9
declare(strict_types=1);
10
11
namespace Spiral\Goridge;
12
13
/**
14
 * Communicates with remote server/client over streams using byte payload:
15
 *
16
 * [ prefix       ][ payload                               ]
17
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
18
 *
19
 * prefix:
20
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
21
 */
22
class StreamRelay implements RelayInterface, SendPackageRelayInterface
23
{
24
    /** @var resource */
25
    private $in;
26
27
    /** @var resource */
28
    private $out;
29
30
    /**
31
     * Example:
32
     * $relay = new StreamRelay(STDIN, STDOUT);
33
     *
34
     * @param resource $in  Must be readable.
35
     * @param resource $out Must be writable.
36
     *
37
     * @throws Exceptions\InvalidArgumentException
38
     */
39
    public function __construct($in, $out)
40
    {
41
        if (!is_resource($in) || get_resource_type($in) !== 'stream') {
42
            throw new Exceptions\InvalidArgumentException('expected a valid `in` stream resource');
43
        }
44
45
        if (!$this->assertReadable($in)) {
46
            throw new Exceptions\InvalidArgumentException('resource `in` must be readable');
47
        }
48
49
        if (!is_resource($out) || get_resource_type($out) !== 'stream') {
50
            throw new Exceptions\InvalidArgumentException('expected a valid `out` stream resource');
51
        }
52
53
        if (!$this->assertWritable($out)) {
54
            throw new Exceptions\InvalidArgumentException('resource `out` must be writable');
55
        }
56
57
        $this->in = $in;
58
        $this->out = $out;
59
    }
60
61
    /**
62
     * Send message package with header and body.
63
     *
64
     * @param string   $headerPayload
65
     * @param int|null $headerFlags
66
     * @param string   $bodyPayload
67
     * @param int|null $bodyFlags
68
     * @return self
69
     */
70
    public function sendPackage(
71
        string $headerPayload,
72
        ?int $headerFlags,
73
        string $bodyPayload,
74
        ?int $bodyFlags = null
75
    ): self {
76
        $headerPackage = packMessage($headerPayload, $headerFlags);
77
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
78
        if ($headerPackage === null || $bodyPackage === null) {
79
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
80
        }
81
82
        if (
83
            fwrite(
84
                $this->out,
85
                $headerPackage['body'] . $bodyPackage['body'],
86
                34 + $headerPackage['size'] + $bodyPackage['size']
87
            ) === false
88
        ) {
89
            throw new Exceptions\TransportException('unable to write payload to the stream');
90
        }
91
92
        return $this;
93
    }
94
95
    /**
96
     * {@inheritdoc}
97
     * @return self
98
     */
99
    public function send(string $payload, ?int $flags = null): self
100
    {
101
        $package = packMessage($payload, $flags);
102
        if ($package === null) {
103
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
104
        }
105
106
        if (fwrite($this->out, $package['body'], 17 + $package['size']) === false) {
107
            throw new Exceptions\TransportException('unable to write payload to the stream');
108
        }
109
110
        return $this;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function receiveSync(?int &$flags = null): ?string
117
    {
118
        $prefix = $this->fetchPrefix();
119
        $flags = $prefix['flags'];
120
121
        $result = '';
122
        if ($prefix['size'] !== 0) {
123
            $leftBytes = $prefix['size'];
124
125
            //Add ability to write to stream in a future
126
            while ($leftBytes > 0) {
127
                $buffer = fread($this->in, min($leftBytes, self::BUFFER_SIZE));
128
                if ($buffer === false) {
129
                    throw new Exceptions\TransportException('error reading payload from the stream');
130
                }
131
132
                $result .= $buffer;
133
                $leftBytes -= strlen($buffer);
134
            }
135
        }
136
137
        return ($result !== '') ? $result : null;
138
    }
139
140
    /**
141
     * @return array Prefix [flag, length]
142
     *
143
     * @throws Exceptions\PrefixException
144
     */
145
    private function fetchPrefix(): array
146
    {
147
        $prefixBody = fread($this->in, 17);
148
        if ($prefixBody === false) {
149
            throw new Exceptions\PrefixException('unable to read prefix from the stream');
150
        }
151
152
        $result = unpack('Cflags/Psize/Jrevs', $prefixBody);
153
        if (!is_array($result)) {
154
            throw new Exceptions\PrefixException('invalid prefix');
155
        }
156
157
        if ($result['size'] !== $result['revs']) {
158
            throw new Exceptions\PrefixException('invalid prefix (checksum)');
159
        }
160
161
        return $result;
162
    }
163
164
    /**
165
     * Checks if stream is readable.
166
     *
167
     * @param resource $stream
168
     *
169
     * @return bool
170
     */
171
    private function assertReadable($stream): bool
172
    {
173
        $meta = stream_get_meta_data($stream);
174
175
        return in_array($meta['mode'], ['r', 'rb', 'r+', 'rb+', 'w+', 'wb+', 'a+', 'ab+', 'x+', 'c+', 'cb+'], true);
176
    }
177
178
    /**
179
     * Checks if stream is writable.
180
     *
181
     * @param resource $stream
182
     *
183
     * @return bool
184
     */
185
    private function assertWritable($stream): bool
186
    {
187
        $meta = stream_get_meta_data($stream);
188
189
        return !in_array($meta['mode'], ['r', 'rb'], true);
190
    }
191
}
192