Completed
Pull Request — master (#54)
by Valentin
01:19
created

SocketRelay::receiveSync()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
nc 5
nop 1
dl 0
loc 33
rs 8.7697
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
5
 *
6
 * @author Wolfy-J
7
 */
8
9
declare(strict_types=1);
10
11
namespace Spiral\Goridge;
12
13
use Error;
14
use Exception;
15
16
/**
17
 * Communicates with remote server/client over be-directional socket using byte payload:
18
 *
19
 * [ prefix       ][ payload                               ]
20
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
21
 *
22
 * prefix:
23
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
24
 */
25
class SocketRelay implements RelayInterface
26
{
27
    /** Supported socket types. */
28
    public const SOCK_TCP  = 0;
29
    public const SOCK_UNIX = 1;
30
31
    // @deprecated
32
    public const SOCK_TPC = self::SOCK_TCP;
33
34
    /** @var string */
35
    private $address;
36
37
    /** @var int|null */
38
    private $port;
39
40
    /** @var int */
41
    private $type;
42
43
    /** @var resource|null */
44
    private $socket;
45
46
    /**
47
     * Example:
48
     * $relay = new SocketRelay("localhost", 7000);
49
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
50
     *
51
     * @param string   $address Localhost, ip address or hostname.
52
     * @param int|null $port    Ignored for UNIX sockets.
53
     * @param int      $type    Default: TCP_SOCKET
54
     *
55
     * @throws Exceptions\InvalidArgumentException
56
     */
57
    public function __construct(string $address, ?int $port = null, int $type = self::SOCK_TCP)
58
    {
59
        if (!extension_loaded('sockets')) {
60
            throw new Exceptions\InvalidArgumentException("'sockets' extension not loaded");
61
        }
62
63
        switch ($type) {
64
            case self::SOCK_TCP:
65
                if ($port === null) {
66
                    throw new Exceptions\InvalidArgumentException(sprintf(
67
                        "no port given for TPC socket on '%s'",
68
                        $address
69
                    ));
70
                }
71
72
                if ($port < 0 || $port > 65535) {
73
                    throw new Exceptions\InvalidArgumentException(sprintf(
74
                        "invalid port given for TPC socket on '%s'",
75
                        $address
76
                    ));
77
                }
78
                break;
79
            case self::SOCK_UNIX:
80
                $port = null;
81
                break;
82
            default:
83
                throw new Exceptions\InvalidArgumentException(sprintf(
84
                    "undefined connection type %s on '%s'",
85
                    $type,
86
                    $address
87
                ));
88
        }
89
90
        $this->address = $address;
91
        $this->port = $port;
92
        $this->type = $type;
93
    }
94
95
    /**
96
     * Destruct connection and disconnect.
97
     */
98
    public function __destruct()
99
    {
100
        if ($this->isConnected()) {
101
            $this->close();
102
        }
103
    }
104
105
    /**
106
     * @return string
107
     */
108
    public function __toString(): string
109
    {
110
        if ($this->type === self::SOCK_TCP) {
111
            return "tcp://{$this->address}:{$this->port}";
112
        }
113
114
        return "unix://{$this->address}";
115
    }
116
117
    /**
118
     * {@inheritdoc}
119
     * @return self
120
     */
121
    public function sendPackage(
122
        string $headerPayload,
123
        ?int $headerFlags,
124
        string $bodyPayload,
125
        ?int $bodyFlags = null
126
    ): self {
127
        $this->connect();
128
129
        $headerPackage = packMessage($headerPayload, $headerFlags);
130
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
131
        if ($headerPackage === null || $bodyPackage === null) {
132
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
133
        }
134
135
        if (
136
            socket_send(
137
                $this->socket,
138
                $headerPackage['body'] . $bodyPackage['body'],
139
                34 + $headerPackage['size'] + $bodyPackage['size'],
140
                0
141
            ) === false
142
        ) {
143
            throw new Exceptions\TransportException('unable to write payload to the stream');
144
        }
145
146
        return $this;
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     * @return self
152
     */
153
    public function send(string $payload, ?int $flags = null): self
154
    {
155
        $this->connect();
156
157
        $package = packMessage($payload, $flags);
158
        if ($package === null) {
159
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
160
        }
161
162
        if (socket_send($this->socket, $package['body'], 17 + $package['size'], 0) === false) {
163
            throw new Exceptions\TransportException('unable to write payload to the stream');
164
        }
165
166
        return $this;
167
    }
168
169
    /**
170
     * {@inheritdoc}
171
     */
172
    public function receiveSync(?int &$flags = null): ?string
173
    {
174
        $this->connect();
175
176
        $prefix = $this->fetchPrefix();
177
        $flags = $prefix['flags'];
178
179
        $result = '';
180
        if ($prefix['size'] !== 0) {
181
            $readBytes = $prefix['size'];
182
183
            //Add ability to write to stream in a future
184
            while ($readBytes > 0) {
185
                $bufferLength = socket_recv(
186
                    $this->socket,
187
                    $buffer,
188
                    min(self::BUFFER_SIZE, $readBytes),
189
                    MSG_WAITALL
190
                );
191
                if ($bufferLength === false || $buffer === null) {
192
                    throw new Exceptions\PrefixException(sprintf(
193
                        'unable to read prefix from socket: %s',
194
                        socket_strerror(socket_last_error($this->socket))
195
                    ));
196
                }
197
198
                $result .= $buffer;
199
                $readBytes -= $bufferLength;
200
            }
201
        }
202
203
        return $result ?: null;
204
    }
205
206
    /**
207
     * @return string
208
     */
209
    public function getAddress(): string
210
    {
211
        return $this->address;
212
    }
213
214
    /**
215
     * @return int|null
216
     */
217
    public function getPort(): ?int
218
    {
219
        return $this->port;
220
    }
221
222
    /**
223
     * @return int
224
     */
225
    public function getType(): int
226
    {
227
        return $this->type;
228
    }
229
230
    /**
231
     * @return bool
232
     */
233
    public function isConnected(): bool
234
    {
235
        return $this->socket !== null;
236
    }
237
238
    /**
239
     * Ensure socket connection. Returns true if socket successfully connected
240
     * or have already been connected.
241
     *
242
     * @return bool
243
     *
244
     * @throws Exceptions\RelayException
245
     * @throws Error When sockets are used in unsupported environment.
246
     */
247
    public function connect(): bool
248
    {
249
        if ($this->isConnected()) {
250
            return true;
251
        }
252
253
        $this->socket = $this->createSocket();
254
        try {
255
            if (socket_connect($this->socket, $this->address, $this->port ?? 0) === false) {
256
                throw new Exceptions\RelayException(socket_strerror(socket_last_error($this->socket)));
257
            }
258
        } catch (Exception $e) {
259
            throw new Exceptions\RelayException("unable to establish connection {$this}", 0, $e);
260
        }
261
262
        return true;
263
    }
264
265
    /**
266
     * Close connection.
267
     *
268
     * @throws Exceptions\RelayException
269
     */
270
    public function close(): void
271
    {
272
        if (!$this->isConnected()) {
273
            throw new Exceptions\RelayException("unable to close socket '{$this}', socket already closed");
274
        }
275
276
        socket_close($this->socket);
277
        $this->socket = null;
278
    }
279
280
    /**
281
     * @return array Prefix [flag, length]
282
     *
283
     * @throws Exceptions\PrefixException
284
     */
285
    private function fetchPrefix(): array
286
    {
287
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
288
        if ($prefixBody === null || $prefixLength !== 17) {
289
            throw new Exceptions\PrefixException(sprintf(
290
                'unable to read prefix from socket: %s',
291
                socket_strerror(socket_last_error($this->socket))
292
            ));
293
        }
294
295
        $result = unpack('Cflags/Psize/Jrevs', $prefixBody);
296
        if (!is_array($result)) {
297
            throw new Exceptions\PrefixException('invalid prefix');
298
        }
299
300
        if ($result['size'] !== $result['revs']) {
301
            throw new Exceptions\PrefixException('invalid prefix (checksum)');
302
        }
303
304
        return $result;
305
    }
306
307
    /**
308
     * @return resource
309
     * @throws Exceptions\GoridgeException
310
     */
311
    private function createSocket()
312
    {
313
        if ($this->type === self::SOCK_UNIX) {
314
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
315
        }
316
317
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
318
    }
319
}
320