Completed
Pull Request — master (#54)
by Valentin
01:16
created

SocketRelay::connect()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 17
rs 9.7
c 0
b 0
f 0
cc 4
nc 4
nop 0
1
<?php
2
3
/**
4
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
5
 *
6
 * @author Wolfy-J
7
 */
8
9
declare(strict_types=1);
10
11
namespace Spiral\Goridge;
12
13
use Error;
14
15
/**
16
 * Communicates with remote server/client over be-directional socket using byte payload:
17
 *
18
 * [ prefix       ][ payload                               ]
19
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
20
 *
21
 * prefix:
22
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
23
 */
24
class SocketRelay implements RelayInterface
25
{
26
    /** Supported socket types. */
27
    public const SOCK_TCP  = 0;
28
    public const SOCK_UNIX = 1;
29
30
    // @deprecated
31
    public const SOCK_TPC = self::SOCK_TCP;
32
33
    /** @var string */
34
    private $address;
35
36
    /** @var int|null */
37
    private $port;
38
39
    /** @var int */
40
    private $type;
41
42
    /** @var resource|null */
43
    private $socket;
44
45
    /**
46
     * Example:
47
     * $relay = new SocketRelay("localhost", 7000);
48
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
49
     *
50
     * @param string   $address Localhost, ip address or hostname.
51
     * @param int|null $port    Ignored for UNIX sockets.
52
     * @param int      $type    Default: TCP_SOCKET
53
     *
54
     * @throws Exceptions\InvalidArgumentException
55
     */
56
    public function __construct(string $address, ?int $port = null, int $type = self::SOCK_TCP)
57
    {
58
        if (!extension_loaded('sockets')) {
59
            throw new Exceptions\InvalidArgumentException("'sockets' extension not loaded");
60
        }
61
62
        switch ($type) {
63
            case self::SOCK_TCP:
64
                if ($port === null) {
65
                    throw new Exceptions\InvalidArgumentException(sprintf(
66
                        "no port given for TPC socket on '%s'",
67
                        $address
68
                    ));
69
                }
70
                break;
71
            case self::SOCK_UNIX:
72
                $port = null;
73
                break;
74
            default:
75
                throw new Exceptions\InvalidArgumentException(sprintf(
76
                    "undefined connection type %s on '%s'",
77
                    $type,
78
                    $address
79
                ));
80
        }
81
82
        $this->address = $address;
83
        $this->port = $port;
84
        $this->type = $type;
85
    }
86
87
    /**
88
     * Destruct connection and disconnect.
89
     */
90
    public function __destruct()
91
    {
92
        if ($this->isConnected()) {
93
            $this->close();
94
        }
95
    }
96
97
    /**
98
     * @return string
99
     */
100
    public function __toString(): string
101
    {
102
        if ($this->type === self::SOCK_TCP) {
103
            return "tcp://{$this->address}:{$this->port}";
104
        }
105
106
        return "unix://{$this->address}";
107
    }
108
109
    /**
110
     * {@inheritdoc}
111
     * @return self
112
     */
113
    public function sendPackage(
114
        string $headerPayload,
115
        ?int $headerFlags,
116
        string $bodyPayload,
117
        ?int $bodyFlags = null
118
    ): self {
119
        $this->connect();
120
121
        $headerPackage = packMessage($headerPayload, $headerFlags);
122
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
123
        if ($headerPackage === null || $bodyPackage === null) {
124
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
125
        }
126
127
        socket_send(
128
            $this->socket,
129
            $headerPackage['body'] . $bodyPackage['body'],
130
            34 + $headerPackage['size'] + $bodyPackage['size'],
131
            0
132
        );
133
134
        return $this;
135
    }
136
137
    /**
138
     * {@inheritdoc}
139
     * @return self
140
     */
141
    public function send(string $payload, ?int $flags = null): self
142
    {
143
        $this->connect();
144
145
        $package = packMessage($payload, $flags);
146
        if ($package === null) {
147
            throw new Exceptions\TransportException('unable to send payload with PAYLOAD_NONE flag');
148
        }
149
150
        socket_send($this->socket, $package['body'], 17 + $package['size'], 0);
151
152
        return $this;
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158
    public function receiveSync(?int &$flags = null): ?string
159
    {
160
        $this->connect();
161
162
        $prefix = $this->fetchPrefix();
163
        $flags = $prefix['flags'];
164
165
        $result = '';
166
        if ($prefix['size'] !== 0) {
167
            $readBytes = $prefix['size'];
168
169
            //Add ability to write to stream in a future
170
            while ($readBytes > 0) {
171
                $bufferLength = socket_recv(
172
                    $this->socket,
173
                    $buffer,
174
                    min(self::BUFFER_SIZE, $readBytes),
175
                    MSG_WAITALL
176
                );
177
                if ($bufferLength === false || $buffer === null) {
178
                    throw new Exceptions\PrefixException(sprintf(
179
                        'unable to read prefix from socket: %s',
180
                        socket_strerror(socket_last_error($this->socket))
181
                    ));
182
                }
183
184
                $result .= $buffer;
185
                $readBytes -= $bufferLength;
186
            }
187
        }
188
189
        return $result ?: null;
190
    }
191
192
    /**
193
     * @return string
194
     */
195
    public function getAddress(): string
196
    {
197
        return $this->address;
198
    }
199
200
    /**
201
     * @return int|null
202
     */
203
    public function getPort(): ?int
204
    {
205
        return $this->port;
206
    }
207
208
    /**
209
     * @return int
210
     */
211
    public function getType(): int
212
    {
213
        return $this->type;
214
    }
215
216
    /**
217
     * @return bool
218
     */
219
    public function isConnected(): bool
220
    {
221
        return $this->socket !== null;
222
    }
223
224
    /**
225
     * Ensure socket connection. Returns true if socket successfully connected
226
     * or have already been connected.
227
     *
228
     * @return bool
229
     *
230
     * @throws Exceptions\RelayException
231
     * @throws Error When sockets are used in unsupported environment.
232
     */
233
    public function connect(): bool
234
    {
235
        if ($this->isConnected()) {
236
            return true;
237
        }
238
239
        $this->socket = $this->createSocket();
240
        try {
241
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
242
                throw new Exceptions\RelayException(socket_strerror(socket_last_error($this->socket)));
243
            }
244
        } catch (\Exception $e) {
245
            throw new Exceptions\RelayException("unable to establish connection {$this}", 0, $e);
246
        }
247
248
        return true;
249
    }
250
251
    /**
252
     * Close connection.
253
     *
254
     * @throws Exceptions\RelayException
255
     */
256
    public function close(): void
257
    {
258
        if (!$this->isConnected()) {
259
            throw new Exceptions\RelayException("unable to close socket '{$this}', socket already closed");
260
        }
261
262
        socket_close($this->socket);
263
        $this->socket = null;
264
    }
265
266
    /**
267
     * @return array Prefix [flag, length]
268
     *
269
     * @throws Exceptions\PrefixException
270
     */
271
    private function fetchPrefix(): array
272
    {
273
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
274
        if ($prefixBody === null || $prefixLength !== 17) {
275
            throw new Exceptions\PrefixException(sprintf(
276
                'unable to read prefix from socket: %s',
277
                socket_strerror(socket_last_error($this->socket))
278
            ));
279
        }
280
281
        $result = unpack('Cflags/Psize/Jrevs', $prefixBody);
282
        if (!is_array($result)) {
283
            throw new Exceptions\PrefixException('invalid prefix');
284
        }
285
286
        if ($result['size'] !== $result['revs']) {
287
            throw new Exceptions\PrefixException('invalid prefix (checksum)');
288
        }
289
290
        return $result;
291
    }
292
293
    /**
294
     * @return resource
295
     * @throws Exceptions\GoridgeException
296
     */
297
    private function createSocket()
298
    {
299
        if ($this->type === self::SOCK_UNIX) {
300
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
301
        }
302
303
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
304
    }
305
}
306