Completed
Push — master ( dea784...77743f )
by
unknown
21s queued 11s
created

SocketRelay::__construct()   B

Complexity

Conditions 7
Paths 6

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
nc 6
nop 3
dl 0
loc 37
rs 8.3946
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
5
 *
6
 * @author Wolfy-J
7
 */
8
9
declare(strict_types=1);
10
11
namespace Spiral\Goridge;
12
13
use Error;
14
use Exception;
15
16
/**
17
 * Communicates with remote server/client over be-directional socket using byte payload:
18
 *
19
 * [ prefix       ][ payload                               ]
20
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
21
 *
22
 * prefix:
23
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
24
 */
25
class SocketRelay implements RelayInterface
26
{
27
    /** Supported socket types. */
28
    public const SOCK_TCP  = 0;
29
    public const SOCK_UNIX = 1;
30
31
    // @deprecated
32
    public const SOCK_TPC = self::SOCK_TCP;
33
34
    /** @var string */
35
    private $address;
36
37
    /** @var int|null */
38
    private $port;
39
40
    /** @var int */
41
    private $type;
42
43
    /** @var resource|null */
44
    private $socket;
45
46
    /**
47
     * Example:
48
     * $relay = new SocketRelay("localhost", 7000);
49
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
50
     *
51
     * @param string   $address Localhost, ip address or hostname.
52
     * @param int|null $port    Ignored for UNIX sockets.
53
     * @param int      $type    Default: TCP_SOCKET
54
     *
55
     * @throws Exceptions\InvalidArgumentException
56
     */
57
    public function __construct(string $address, ?int $port = null, int $type = self::SOCK_TCP)
58
    {
59
        if (!extension_loaded('sockets')) {
60
            throw new Exceptions\InvalidArgumentException("'sockets' extension not loaded");
61
        }
62
63
        switch ($type) {
64
            case self::SOCK_TCP:
65
                if ($port === null) {
66
                    throw new Exceptions\InvalidArgumentException(sprintf(
67
                        "no port given for TPC socket on '%s'",
68
                        $address
69
                    ));
70
                }
71
72
                if ($port < 0 || $port > 65535) {
73
                    throw new Exceptions\InvalidArgumentException(sprintf(
74
                        "invalid port given for TPC socket on '%s'",
75
                        $address
76
                    ));
77
                }
78
                break;
79
            case self::SOCK_UNIX:
80
                $port = null;
81
                break;
82
            default:
83
                throw new Exceptions\InvalidArgumentException(sprintf(
84
                    "undefined connection type %s on '%s'",
85
                    $type,
86
                    $address
87
                ));
88
        }
89
90
        $this->address = $address;
91
        $this->port = $port;
92
        $this->type = $type;
93
    }
94
95
    /**
96
     * Destruct connection and disconnect.
97
     */
98
    public function __destruct()
99
    {
100
        if ($this->isConnected()) {
101
            $this->close();
102
        }
103
    }
104
105
    /**
106
     * @return string
107
     */
108
    public function __toString(): string
109
    {
110
        if ($this->type === self::SOCK_TCP) {
111
            return "tcp://{$this->address}:{$this->port}";
112
        }
113
114
        return "unix://{$this->address}";
115
    }
116
117
    /**
118
     * Send message package with header and body.
119
     *
120
     * @param string   $headerPayload
121
     * @param int|null $headerFlags
122
     * @param string   $bodyPayload
123
     * @param int|null $bodyFlags
124
     * @return self
125
     */
126
    public function sendPackage(
127
        string $headerPayload,
128
        ?int $headerFlags,
129
        string $bodyPayload,
130
        ?int $bodyFlags = null
131
    ): self {
132
        $this->connect();
133
134
        $headerPackage = packMessage($headerPayload, $headerFlags);
135
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
136
        if ($headerPackage === null || $bodyPackage === null) {
137
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
138
        }
139
140
        if (
141
            socket_send(
142
                $this->socket,
143
                $headerPackage['body'] . $bodyPackage['body'],
144
                34 + $headerPackage['size'] + $bodyPackage['size'],
145
                0
146
            ) === false
147
        ) {
148
            throw new Exceptions\TransportException('unable to write payload to the stream');
149
        }
150
151
        return $this;
152
    }
153
154
    /**
155
     * {@inheritdoc}
156
     * @return self
157
     */
158
    public function send(string $payload, ?int $flags = null): self
159
    {
160
        $this->connect();
161
162
        $package = packMessage($payload, $flags);
163
        if ($package === null) {
164
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
165
        }
166
167
        if (socket_send($this->socket, $package['body'], 17 + $package['size'], 0) === false) {
168
            throw new Exceptions\TransportException('unable to write payload to the stream');
169
        }
170
171
        return $this;
172
    }
173
174
    /**
175
     * {@inheritdoc}
176
     */
177
    public function receiveSync(?int &$flags = null): ?string
178
    {
179
        $this->connect();
180
181
        $prefix = $this->fetchPrefix();
182
        $flags = $prefix['flags'];
183
184
        $result = '';
185
        if ($prefix['size'] !== 0) {
186
            $readBytes = $prefix['size'];
187
188
            //Add ability to write to stream in a future
189
            while ($readBytes > 0) {
190
                $bufferLength = socket_recv(
191
                    $this->socket,
192
                    $buffer,
193
                    min(self::BUFFER_SIZE, $readBytes),
194
                    MSG_WAITALL
195
                );
196
                if ($bufferLength === false || $buffer === null) {
197
                    throw new Exceptions\PrefixException(sprintf(
198
                        'unable to read prefix from socket: %s',
199
                        socket_strerror(socket_last_error($this->socket))
200
                    ));
201
                }
202
203
                $result .= $buffer;
204
                $readBytes -= $bufferLength;
205
            }
206
        }
207
208
        return $result ?: null;
209
    }
210
211
    /**
212
     * @return string
213
     */
214
    public function getAddress(): string
215
    {
216
        return $this->address;
217
    }
218
219
    /**
220
     * @return int|null
221
     */
222
    public function getPort(): ?int
223
    {
224
        return $this->port;
225
    }
226
227
    /**
228
     * @return int
229
     */
230
    public function getType(): int
231
    {
232
        return $this->type;
233
    }
234
235
    /**
236
     * @return bool
237
     */
238
    public function isConnected(): bool
239
    {
240
        return $this->socket !== null;
241
    }
242
243
    /**
244
     * Ensure socket connection. Returns true if socket successfully connected
245
     * or have already been connected.
246
     *
247
     * @return bool
248
     *
249
     * @throws Exceptions\RelayException
250
     * @throws Error When sockets are used in unsupported environment.
251
     */
252
    public function connect(): bool
253
    {
254
        if ($this->isConnected()) {
255
            return true;
256
        }
257
258
        $this->socket = $this->createSocket();
259
        try {
260
            if (socket_connect($this->socket, $this->address, $this->port ?? 0) === false) {
261
                throw new Exceptions\RelayException(socket_strerror(socket_last_error($this->socket)));
262
            }
263
        } catch (Exception $e) {
264
            throw new Exceptions\RelayException("unable to establish connection {$this}", 0, $e);
265
        }
266
267
        return true;
268
    }
269
270
    /**
271
     * Close connection.
272
     *
273
     * @throws Exceptions\RelayException
274
     */
275
    public function close(): void
276
    {
277
        if (!$this->isConnected()) {
278
            throw new Exceptions\RelayException("unable to close socket '{$this}', socket already closed");
279
        }
280
281
        socket_close($this->socket);
282
        $this->socket = null;
283
    }
284
285
    /**
286
     * @return array Prefix [flag, length]
287
     *
288
     * @throws Exceptions\PrefixException
289
     */
290
    private function fetchPrefix(): array
291
    {
292
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
293
        if ($prefixBody === null || $prefixLength !== 17) {
294
            throw new Exceptions\PrefixException(sprintf(
295
                'unable to read prefix from socket: %s',
296
                socket_strerror(socket_last_error($this->socket))
297
            ));
298
        }
299
300
        $result = unpack('Cflags/Psize/Jrevs', $prefixBody);
301
        if (!is_array($result)) {
302
            throw new Exceptions\PrefixException('invalid prefix');
303
        }
304
305
        if ($result['size'] !== $result['revs']) {
306
            throw new Exceptions\PrefixException('invalid prefix (checksum)');
307
        }
308
309
        return $result;
310
    }
311
312
    /**
313
     * @return resource
314
     * @throws Exceptions\GoridgeException
315
     */
316
    private function createSocket()
317
    {
318
        if ($this->type === self::SOCK_UNIX) {
319
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
320
        }
321
322
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
323
    }
324
}
325