Completed
Push — master ( bd49b2...323004 )
by
unknown
18s queued 11s
created

StreamRelay::sendPackage()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
nc 3
nop 4
dl 0
loc 22
rs 9.568
c 0
b 0
f 0
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\TransportException;
11
12
/**
13
 * Communicates with remote server/client over streams using byte payload:
14
 *
15
 * [ prefix       ][ payload                               ]
16
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
17
 *
18
 * prefix:
19
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
20
 */
21
class StreamRelay implements RelayInterface
22
{
23
    /** @var resource */
24
    private $in;
25
26
    /** @var resource */
27
    private $out;
28
29
    /**
30
     * Example:
31
     * $relay = new StreamRelay(STDIN, STDOUT);
32
     *
33
     * @param resource $in  Must be readable.
34
     * @param resource $out Must be writable.
35
     *
36
     * @throws Exceptions\InvalidArgumentException
37
     */
38
    public function __construct($in, $out)
39
    {
40
        if (!is_resource($in) || get_resource_type($in) !== 'stream') {
41
            throw new Exceptions\InvalidArgumentException("expected a valid `in` stream resource");
42
        }
43
44
        if (!$this->assertReadable($in)) {
45
            throw new Exceptions\InvalidArgumentException("resource `in` must be readable");
46
        }
47
48
        if (!is_resource($out) || get_resource_type($out) !== 'stream') {
49
            throw new Exceptions\InvalidArgumentException("expected a valid `out` stream resource");
50
        }
51
52
        if (!$this->assertWritable($out)) {
53
            throw new Exceptions\InvalidArgumentException("resource `out` must be writable");
54
        }
55
56
        $this->in = $in;
57
        $this->out = $out;
58
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63
    public function sendPackage(
64
        string $headerPayload,
65
        ?int $headerFlags,
66
        string $bodyPayload,
67
        ?int $bodyFlags = null
68
    ) {
69
        $headerPackage = packMessage($headerPayload, $headerFlags);
70
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
71
        if ($headerPackage === null || $bodyPackage === null) {
72
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
73
        }
74
75
        if (fwrite(
76
                $this->out,
77
                $headerPackage['body'] . $bodyPackage['body'],
78
                34 + $headerPackage['size'] + $bodyPackage['size']
79
            ) === false) {
80
            throw new TransportException('unable to write payload to the stream');
81
        }
82
83
        return $this;
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    public function send($payload, int $flags = null)
90
    {
91
        $package = packMessage($payload, $flags);
92
        if ($package === null) {
93
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
94
        }
95
96
        if (fwrite($this->out, $package['body'], 17 + $package['size']) === false) {
97
            throw new TransportException('unable to write payload to the stream');
98
        }
99
100
        return $this;
101
    }
102
103
    /**
104
     * {@inheritdoc}
105
     */
106
    public function receiveSync(int &$flags = null)
107
    {
108
        $prefix = $this->fetchPrefix();
109
        $flags = $prefix['flags'];
110
111
        $result = null;
112
        if ($prefix['size'] !== 0) {
113
            $leftBytes = $prefix['size'];
114
115
            //Add ability to write to stream in a future
116
            while ($leftBytes > 0) {
117
                $buffer = fread($this->in, min($leftBytes, self::BUFFER_SIZE));
118
                if ($buffer === false) {
119
                    throw new Exceptions\TransportException("error reading payload from the stream");
120
                }
121
122
                $result .= $buffer;
123
                $leftBytes -= strlen($buffer);
124
            }
125
        }
126
127
        return $result;
128
    }
129
130
    /**
131
     * @return array Prefix [flag, length]
132
     *
133
     * @throws Exceptions\PrefixException
134
     */
135
    private function fetchPrefix(): array
136
    {
137
        $prefixBody = fread($this->in, 17);
138
        if ($prefixBody === false) {
139
            throw new Exceptions\PrefixException("unable to read prefix from the stream");
140
        }
141
142
        $result = unpack("Cflags/Psize/Jrevs", $prefixBody);
143
        if (!is_array($result)) {
144
            throw new Exceptions\PrefixException("invalid prefix");
145
        }
146
147
        if ($result['size'] != $result['revs']) {
148
            throw new Exceptions\PrefixException("invalid prefix (checksum)");
149
        }
150
151
        return $result;
152
    }
153
154
    /**
155
     * Checks if stream is readable.
156
     *
157
     * @param resource $stream
158
     *
159
     * @return bool
160
     */
161
    private function assertReadable($stream): bool
162
    {
163
        $meta = stream_get_meta_data($stream);
164
165
        return in_array($meta['mode'], ['r', 'rb', 'r+', 'rb+', 'w+', 'wb+', 'a+', 'ab+', 'x+', 'c+', 'cb+'], true);
166
    }
167
168
    /**
169
     * Checks if stream is writable.
170
     *
171
     * @param resource $stream
172
     *
173
     * @return bool
174
     */
175
    private function assertWritable($stream): bool
176
    {
177
        $meta = stream_get_meta_data($stream);
178
179
        return $meta['mode'] !== 'r';
180
    }
181
}
182