Completed
Push — master ( bd49b2...323004 )
by
unknown
18s queued 11s
created

SocketRelay   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 275
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
dl 0
loc 275
rs 9.68
c 0
b 0
f 0
wmc 34
lcom 1
cbo 4

14 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 30 5
A sendPackage() 0 23 3
A send() 0 13 2
A receiveSync() 0 28 3
A getAddress() 0 4 1
A getPort() 0 4 1
A getType() 0 4 1
A isConnected() 0 4 1
A connect() 0 17 4
A close() 0 9 2
A __destruct() 0 6 2
A __toString() 0 8 2
A fetchPrefix() 0 21 5
A createSocket() 0 8 2
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\GoridgeException;
11
use Spiral\Goridge\Exceptions\PrefixException;
12
use Spiral\Goridge\Exceptions\RelayException;
13
use Spiral\Goridge\Exceptions\TransportException;
14
15
/**
16
 * Communicates with remote server/client over be-directional socket using byte payload:
17
 *
18
 * [ prefix       ][ payload                               ]
19
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
20
 *
21
 * prefix:
22
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
23
 */
24
class SocketRelay implements RelayInterface
25
{
26
    /** Supported socket types. */
27
    const SOCK_TCP  = 0;
28
    const SOCK_UNIX = 1;
29
30
    // @deprecated
31
    const SOCK_TPC = self::SOCK_TCP;
32
33
    /** @var string */
34
    private $address;
35
36
    /** @var int|null */
37
    private $port;
38
39
    /** @var int */
40
    private $type;
41
42
    /** @var resource|null */
43
    private $socket;
44
45
    /**
46
     * Example:
47
     * $relay = new SocketRelay("localhost", 7000);
48
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
49
     *
50
     * @param string   $address Localhost, ip address or hostname.
51
     * @param int|null $port    Ignored for UNIX sockets.
52
     * @param int      $type    Default: TCP_SOCKET
53
     *
54
     * @throws Exceptions\InvalidArgumentException
55
     */
56
    public function __construct(string $address, int $port = null, int $type = self::SOCK_TCP)
57
    {
58
        if (!extension_loaded('sockets')) {
59
            throw new Exceptions\InvalidArgumentException("'sockets' extension not loaded");
60
        }
61
62
        switch ($type) {
63
            case self::SOCK_TCP:
64
                if ($port === null) {
65
                    throw new Exceptions\InvalidArgumentException(sprintf(
66
                        "no port given for TPC socket on '%s'",
67
                        $address
68
                    ));
69
                }
70
                break;
71
            case self::SOCK_UNIX:
72
                $port = null;
73
                break;
74
            default:
75
                throw new Exceptions\InvalidArgumentException(sprintf(
76
                    "undefined connection type %s on '%s'",
77
                    $type,
78
                    $address
79
                ));
80
        }
81
82
        $this->address = $address;
83
        $this->port = $port;
84
        $this->type = $type;
85
    }
86
87
    /**
88
     * {@inheritdoc}
89
     */
90
    public function sendPackage(
91
        string $headerPayload,
92
        ?int $headerFlags,
93
        string $bodyPayload,
94
        ?int $bodyFlags = null
95
    ) {
96
        $this->connect();
97
98
        $headerPackage = packMessage($headerPayload, $headerFlags);
99
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
100
        if ($headerPackage === null || $bodyPackage === null) {
101
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
102
        }
103
104
        socket_send(
105
            $this->socket,
106
            $headerPackage['body'] . $bodyPackage['body'],
107
            34 + $headerPackage['size'] + $bodyPackage['size'],
108
            0
109
        );
110
111
        return $this;
112
    }
113
114
    /**
115
     * {@inheritdoc}
116
     */
117
    public function send($payload, int $flags = null)
118
    {
119
        $this->connect();
120
121
        $package = packMessage($payload, $flags);
122
        if ($package === null) {
123
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
124
        }
125
126
        socket_send($this->socket, $package['body'], 17 + $package['size'], 0);
127
128
        return $this;
129
    }
130
131
    /**
132
     * {@inheritdoc}
133
     */
134
    public function receiveSync(int &$flags = null)
135
    {
136
        $this->connect();
137
138
        $prefix = $this->fetchPrefix();
139
        $flags = $prefix['flags'];
140
        $result = null;
141
142
        if ($prefix['size'] !== 0) {
143
            $readBytes = $prefix['size'];
144
            $buffer = null;
145
146
            //Add ability to write to stream in a future
147
            while ($readBytes > 0) {
148
                $bufferLength = socket_recv(
149
                    $this->socket,
150
                    $buffer,
151
                    min(self::BUFFER_SIZE, $readBytes),
152
                    MSG_WAITALL
153
                );
154
155
                $result .= $buffer;
156
                $readBytes -= $bufferLength;
157
            }
158
        }
159
160
        return $result;
161
    }
162
163
    /**
164
     * @return string
165
     */
166
    public function getAddress(): string
167
    {
168
        return $this->address;
169
    }
170
171
    /**
172
     * @return int|null
173
     */
174
    public function getPort()
175
    {
176
        return $this->port;
177
    }
178
179
    /**
180
     * @return int
181
     */
182
    public function getType(): int
183
    {
184
        return $this->type;
185
    }
186
187
    /**
188
     * @return bool
189
     */
190
    public function isConnected(): bool
191
    {
192
        return $this->socket != null;
193
    }
194
195
    /**
196
     * Ensure socket connection. Returns true if socket successfully connected
197
     * or have already been connected.
198
     *
199
     * @return bool
200
     *
201
     * @throws RelayException
202
     * @throws \Error When sockets are used in unsupported environment.
203
     */
204
    public function connect(): bool
205
    {
206
        if ($this->isConnected()) {
207
            return true;
208
        }
209
210
        $this->socket = $this->createSocket();
211
        try {
212
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
213
                throw new RelayException(socket_strerror(socket_last_error($this->socket)));
214
            }
215
        } catch (\Exception $e) {
216
            throw new RelayException("unable to establish connection {$this}", 0, $e);
217
        }
218
219
        return true;
220
    }
221
222
    /**
223
     * Close connection.
224
     *
225
     * @throws RelayException
226
     */
227
    public function close()
228
    {
229
        if (!$this->isConnected()) {
230
            throw new RelayException("unable to close socket '{$this}', socket already closed");
231
        }
232
233
        socket_close($this->socket);
234
        $this->socket = null;
235
    }
236
237
    /**
238
     * Destruct connection and disconnect.
239
     */
240
    public function __destruct()
241
    {
242
        if ($this->isConnected()) {
243
            $this->close();
244
        }
245
    }
246
247
    /**
248
     * @return string
249
     */
250
    public function __toString(): string
251
    {
252
        if ($this->type == self::SOCK_TCP) {
253
            return "tcp://{$this->address}:{$this->port}";
254
        }
255
256
        return "unix://{$this->address}";
257
    }
258
259
    /**
260
     * @return array Prefix [flag, length]
261
     *
262
     * @throws PrefixException
263
     */
264
    private function fetchPrefix(): array
265
    {
266
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
267
        if ($prefixBody === false || $prefixLength !== 17) {
268
            throw new PrefixException(sprintf(
269
                "unable to read prefix from socket: %s",
270
                socket_strerror(socket_last_error($this->socket))
271
            ));
272
        }
273
274
        $result = unpack("Cflags/Psize/Jrevs", $prefixBody);
275
        if (!is_array($result)) {
276
            throw new Exceptions\PrefixException("invalid prefix");
277
        }
278
279
        if ($result['size'] != $result['revs']) {
280
            throw new Exceptions\PrefixException("invalid prefix (checksum)");
281
        }
282
283
        return $result;
284
    }
285
286
    /**
287
     * @return resource
288
     * @throws GoridgeException
289
     */
290
    private function createSocket()
291
    {
292
        if ($this->type === self::SOCK_UNIX) {
293
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
294
        }
295
296
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
297
    }
298
}
299