SocketRelay::getAddress()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
/**
4
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
5
 *
6
 * @author Wolfy-J
7
 */
8
9
declare(strict_types=1);
10
11
namespace Spiral\Goridge;
12
13
use Error;
14
use Exception;
15
16
/**
17
 * Communicates with remote server/client over be-directional socket using byte payload:
18
 *
19
 * [ prefix       ][ payload                               ]
20
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
21
 *
22
 * prefix:
23
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
24
 */
25
class SocketRelay implements RelayInterface, SendPackageRelayInterface, StringableRelayInterface
26
{
27
    /** Supported socket types. */
28
    public const SOCK_TCP  = 0;
29
    public const SOCK_UNIX = 1;
30
31
    // @deprecated
32
    public const SOCK_TPC = self::SOCK_TCP;
33
34
    /** @var string */
35
    private $address;
36
37
    /** @var int|null */
38
    private $port;
39
40
    /** @var int */
41
    private $type;
42
43
    /** @var resource */
44
    private $socket;
45
46
    /** @var bool */
47
    private $connected = false;
48
49
    /**
50
     * Example:
51
     * $relay = new SocketRelay("localhost", 7000);
52
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
53
     *
54
     * @param string   $address Localhost, ip address or hostname.
55
     * @param int|null $port    Ignored for UNIX sockets.
56
     * @param int      $type    Default: TCP_SOCKET
57
     *
58
     * @throws Exceptions\InvalidArgumentException
59
     */
60
    public function __construct(string $address, ?int $port = null, int $type = self::SOCK_TCP)
61
    {
62
        if (!extension_loaded('sockets')) {
63
            throw new Exceptions\InvalidArgumentException("'sockets' extension not loaded");
64
        }
65
66
        switch ($type) {
67
            case self::SOCK_TCP:
68
                if ($port === null) {
69
                    throw new Exceptions\InvalidArgumentException(sprintf(
70
                        "no port given for TPC socket on '%s'",
71
                        $address
72
                    ));
73
                }
74
75
                if ($port < 0 || $port > 65535) {
76
                    throw new Exceptions\InvalidArgumentException(sprintf(
77
                        "invalid port given for TPC socket on '%s'",
78
                        $address
79
                    ));
80
                }
81
                break;
82
            case self::SOCK_UNIX:
83
                $port = null;
84
                break;
85
            default:
86
                throw new Exceptions\InvalidArgumentException(sprintf(
87
                    "undefined connection type %s on '%s'",
88
                    $type,
89
                    $address
90
                ));
91
        }
92
93
        $this->address = $address;
94
        $this->port = $port;
95
        $this->type = $type;
96
    }
97
98
    /**
99
     * Destruct connection and disconnect.
100
     */
101
    public function __destruct()
102
    {
103
        if ($this->isConnected()) {
104
            $this->close();
105
        }
106
    }
107
108
    /**
109
     * @return string
110
     */
111
    public function __toString(): string
112
    {
113
        if ($this->type === self::SOCK_TCP) {
114
            return "tcp://{$this->address}:{$this->port}";
115
        }
116
117
        return "unix://{$this->address}";
118
    }
119
120
    /**
121
     * Send message package with header and body.
122
     *
123
     * @param string   $headerPayload
124
     * @param int|null $headerFlags
125
     * @param string   $bodyPayload
126
     * @param int|null $bodyFlags
127
     * @return self
128
     */
129
    public function sendPackage(
130
        string $headerPayload,
131
        ?int $headerFlags,
132
        string $bodyPayload,
133
        ?int $bodyFlags = null
134
    ): self {
135
        $this->connect();
136
137
        $headerPackage = packMessage($headerPayload, $headerFlags);
138
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
139
        if ($headerPackage === null || $bodyPackage === null) {
140
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
141
        }
142
143
        if (
144
            socket_send(
145
                $this->socket,
146
                $headerPackage['body'] . $bodyPackage['body'],
147
                34 + $headerPackage['size'] + $bodyPackage['size'],
148
                0
149
            ) === false
150
        ) {
151
            throw new Exceptions\TransportException('unable to write payload to the stream');
152
        }
153
154
        return $this;
155
    }
156
157
    /**
158
     * {@inheritdoc}
159
     * @return self
160
     */
161
    public function send(string $payload, ?int $flags = null): self
162
    {
163
        $this->connect();
164
165
        $package = packMessage($payload, $flags);
166
        if ($package === null) {
167
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
168
        }
169
170
        if (socket_send($this->socket, $package['body'], 17 + $package['size'], 0) === false) {
171
            throw new Exceptions\TransportException('unable to write payload to the stream');
172
        }
173
174
        return $this;
175
    }
176
177
    /**
178
     * {@inheritdoc}
179
     */
180
    public function receiveSync(?int &$flags = null): ?string
181
    {
182
        $this->connect();
183
184
        $prefix = $this->fetchPrefix();
185
        $flags = $prefix['flags'];
186
187
        $result = '';
188
        if ($prefix['size'] !== 0) {
189
            $readBytes = $prefix['size'];
190
191
            //Add ability to write to stream in a future
192
            while ($readBytes > 0) {
193
                $bufferLength = socket_recv(
194
                    $this->socket,
195
                    $buffer,
196
                    min(self::BUFFER_SIZE, $readBytes),
197
                    MSG_WAITALL
198
                );
199
                if ($bufferLength === false || $buffer === null) {
200
                    throw new Exceptions\PrefixException(sprintf(
201
                        'unable to read prefix from socket: %s',
202
                        socket_strerror(socket_last_error($this->socket))
203
                    ));
204
                }
205
206
                $result .= $buffer;
207
                $readBytes -= $bufferLength;
208
            }
209
        }
210
211
        return ($result !== '') ? $result : null;
212
    }
213
214
    /**
215
     * @return string
216
     */
217
    public function getAddress(): string
218
    {
219
        return $this->address;
220
    }
221
222
    /**
223
     * @return int|null
224
     */
225
    public function getPort(): ?int
226
    {
227
        return $this->port;
228
    }
229
230
    /**
231
     * @return int
232
     */
233
    public function getType(): int
234
    {
235
        return $this->type;
236
    }
237
238
    /**
239
     * @return bool
240
     */
241
    public function isConnected(): bool
242
    {
243
        return $this->connected;
244
    }
245
246
    /**
247
     * Ensure socket connection. Returns true if socket successfully connected
248
     * or have already been connected.
249
     *
250
     * @return bool
251
     *
252
     * @throws Exceptions\RelayException
253
     * @throws Error When sockets are used in unsupported environment.
254
     */
255
    public function connect(): bool
256
    {
257
        if ($this->isConnected()) {
258
            return true;
259
        }
260
261
        $socket = $this->createSocket();
262
        if ($socket === false) {
263
            throw new Exceptions\RelayException("unable to create socket {$this}");
264
        }
265
266
        try {
267
            if (socket_connect($socket, $this->address, $this->port ?? 0) === false) {
268
                throw new Exceptions\RelayException(socket_strerror(socket_last_error($socket)));
269
            }
270
        } catch (Exception $e) {
271
            throw new Exceptions\RelayException("unable to establish connection {$this}", 0, $e);
272
        }
273
274
        $this->socket = $socket;
275
        $this->connected = true;
276
277
        return true;
278
    }
279
280
    /**
281
     * Close connection.
282
     *
283
     * @throws Exceptions\RelayException
284
     */
285
    public function close(): void
286
    {
287
        if (!$this->isConnected()) {
288
            throw new Exceptions\RelayException("unable to close socket '{$this}', socket already closed");
289
        }
290
291
        socket_close($this->socket);
292
        $this->connected = false;
293
        unset($this->socket);
294
    }
295
296
    /**
297
     * @return array Prefix [flag, length]
298
     *
299
     * @throws Exceptions\PrefixException
300
     */
301
    private function fetchPrefix(): array
302
    {
303
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
304
        if ($prefixBody === null || $prefixLength !== 17) {
305
            throw new Exceptions\PrefixException(sprintf(
306
                'unable to read prefix from socket: %s',
307
                socket_strerror(socket_last_error($this->socket))
308
            ));
309
        }
310
311
        $result = unpack('Cflags/Psize/Jrevs', $prefixBody);
312
        if (!is_array($result)) {
313
            throw new Exceptions\PrefixException('invalid prefix');
314
        }
315
316
        if ($result['size'] !== $result['revs']) {
317
            throw new Exceptions\PrefixException('invalid prefix (checksum)');
318
        }
319
320
        return $result;
321
    }
322
323
    /**
324
     * @return resource|false
325
     * @throws Exceptions\GoridgeException
326
     */
327
    private function createSocket()
328
    {
329
        if ($this->type === self::SOCK_UNIX) {
330
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
331
        }
332
333
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
334
    }
335
}
336