Completed
Push — master ( e54134...681657 )
by Anton
06:19
created

SocketRelay   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 252
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 33
lcom 1
cbo 5
dl 0
loc 252
rs 9.3999
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B __construct() 0 26 4
A send() 0 17 4
B receiveSync() 0 28 3
A getAddress() 0 4 1
A getPort() 0 4 1
A getType() 0 4 1
A isConnected() 0 4 1
A connect() 0 17 4
A close() 0 9 2
A __destruct() 0 6 2
A __toString() 0 8 2
B fetchPrefix() 0 21 5
A createSocket() 0 12 3
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\GoridgeException;
11
use Spiral\Goridge\Exceptions\TransportException;
12
use Spiral\Goridge\Exceptions\PrefixException;
13
use Spiral\Goridge\Exceptions\RelayException;
14
15
/**
16
 * Communicates with remote server/client over be-directional socket using byte payload:
17
 *
18
 * [ prefix       ][ payload                               ]
19
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
20
 *
21
 * prefix:
22
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
23
 */
24
class SocketRelay implements RelayInterface
25
{
26
    /** Supported socket types. */
27
    const SOCK_TCP = 0;
28
    const SOCK_UNIX = 1;
29
    
30
    // @deprecated
31
    const SOCK_TPC = self::SOCK_TCP;
32
    
33
    /** @var string */
34
    private $address;
35
36
    /** @var int|null */
37
    private $port;
38
39
    /** @var int */
40
    private $type;
41
42
    /** @var resource|null */
43
    private $socket;
44
45
    /**
46
     * Example:
47
     * $relay = new SocketRelay("localhost", 7000);
48
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
49
     *
50
     * @param string   $address Localhost, ip address or hostname.
51
     * @param int|null $port    Ignored for UNIX sockets.
52
     * @param int      $type    Default: TCP_SOCKET
53
     *
54
     * @throws Exceptions\InvalidArgumentException
55
     */
56
    public function __construct(string $address, int $port = null, int $type = self::SOCK_TCP)
57
    {
58
        switch ($type) {
59
            case self::SOCK_TCP:
60
                if ($port === null) {
61
                    throw new Exceptions\InvalidArgumentException(sprintf(
62
                        "no port given for TPC socket on '%s'",
63
                        $address
64
                    ));
65
                }
66
                break;
67
            case self::SOCK_UNIX:
68
                $port = null;
69
                break;
70
            default:
71
                throw new Exceptions\InvalidArgumentException(sprintf(
72
                    "undefined connection type %s on '%s'",
73
                    $type,
74
                    $address
75
                ));
76
        }
77
78
        $this->address = $address;
79
        $this->port = $port;
80
        $this->type = $type;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86
    public function send($payload, int $flags = null): self
87
    {
88
        $this->connect();
89
90
        $size = strlen($payload);
91
        if ($flags & self::PAYLOAD_NONE && $size != 0) {
92
            throw new TransportException("unable to send payload with PAYLOAD_NONE flag");
93
        }
94
95
        socket_send($this->socket, pack('CPJ', $flags, $size, $size), 17, 0);
96
97
        if (!($flags & self::PAYLOAD_NONE)) {
98
            socket_send($this->socket, $payload, $size, 0);
99
        }
100
101
        return $this;
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function receiveSync(int &$flags = null)
108
    {
109
        $this->connect();
110
111
        $prefix = $this->fetchPrefix();
112
        $flags = $prefix['flags'];
113
        $result = null;
114
115
        if ($prefix['size'] !== 0) {
116
            $readBytes = $prefix['size'];
117
            $buffer = null;
118
119
            //Add ability to write to stream in a future
120
            while ($readBytes > 0) {
121
                $bufferLength = socket_recv(
122
                    $this->socket,
123
                    $buffer,
124
                    min(self::BUFFER_SIZE, $readBytes),
125
                    MSG_WAITALL
126
                );
127
128
                $result .= $buffer;
129
                $readBytes -= $bufferLength;
130
            }
131
        }
132
133
        return $result;
134
    }
135
136
    /**
137
     * @return string
138
     */
139
    public function getAddress(): string
140
    {
141
        return $this->address;
142
    }
143
144
    /**
145
     * @return int|null
146
     */
147
    public function getPort()
148
    {
149
        return $this->port;
150
    }
151
152
    /**
153
     * @return int
154
     */
155
    public function getType(): int
156
    {
157
        return $this->type;
158
    }
159
160
    /**
161
     * @return bool
162
     */
163
    public function isConnected(): bool
164
    {
165
        return $this->socket != null;
166
    }
167
168
    /**
169
     * Ensure socket connection. Returns true if socket successfully connected
170
     * or have already been connected.
171
     *
172
     * @return bool
173
     *
174
     * @throws RelayException
175
     * @throws \Error When sockets are used in unsupported environment.
176
     */
177
    public function connect(): bool
178
    {
179
        if ($this->isConnected()) {
180
            return true;
181
        }
182
183
        $this->socket = $this->createSocket();
184
        try {
185
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
186
                throw new RelayException(socket_strerror(socket_last_error($this->socket)));
187
            }
188
        } catch (\Exception $e) {
189
            throw new RelayException("unable to establish connection {$this}", 0, $e);
190
        }
191
192
        return true;
193
    }
194
195
    /**
196
     * Close connection.
197
     *
198
     * @throws RelayException
199
     */
200
    public function close()
201
    {
202
        if (!$this->isConnected()) {
203
            throw new RelayException("unable to close socket '{$this}', socket already closed");
204
        }
205
206
        socket_close($this->socket);
207
        $this->socket = null;
208
    }
209
210
    /**
211
     * Destruct connection and disconnect.
212
     */
213
    public function __destruct()
214
    {
215
        if ($this->isConnected()) {
216
            $this->close();
217
        }
218
    }
219
220
    /**
221
     * @return string
222
     */
223
    public function __toString(): string
224
    {
225
        if ($this->type == self::SOCK_TCP) {
226
            return "tcp://{$this->address}:{$this->port}";
227
        }
228
229
        return "unix://{$this->address}";
230
    }
231
232
    /**
233
     * @return array Prefix [flag, length]
234
     *
235
     * @throws PrefixException
236
     */
237
    private function fetchPrefix(): array
238
    {
239
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
240
        if ($prefixBody === false || $prefixLength !== 17) {
241
            throw new PrefixException(sprintf(
242
                "unable to read prefix from socket: %s",
243
                socket_strerror(socket_last_error($this->socket))
244
            ));
245
        }
246
247
        $result = unpack("Cflags/Psize/Jrevs", $prefixBody);
248
        if (!is_array($result)) {
249
            throw new Exceptions\PrefixException("invalid prefix");
250
        }
251
252
        if ($result['size'] != $result['revs']) {
253
            throw new Exceptions\PrefixException("invalid prefix (checksum)");
254
        }
255
256
        return $result;
257
    }
258
259
    /**
260
     * @return resource
261
     * @throws GoridgeException
262
     */
263
    private function createSocket()
264
    {
265
        if ($this->type === self::SOCK_UNIX) {
266
            if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
267
                throw new GoridgeException("socket {$this} unavailable on Windows");
268
            }
269
270
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
271
        }
272
273
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
274
    }
275
}
276