Completed
Push — master ( 1de5f0...bc31f8 )
by Anton
01:27
created

SocketRelay::__toString()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
c 0
b 0
f 0
rs 9.4285
cc 2
eloc 4
nc 2
nop 0
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\TransportException;
11
use Spiral\Goridge\Exceptions\PrefixException;
12
use Spiral\Goridge\Exceptions\RelayException;
13
14
/**
15
 * Communicates with remote server/client over be-directional socket using byte payload:
16
 *
17
 * [ prefix     ][ payload        ]
18
 * [ 1+8 bytes  ][ message length ]
19
 *
20
 * prefix:
21
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
22
 */
23
class SocketRelay implements RelayInterface
24
{
25
    /** Supported socket types. */
26
    const SOCK_TPC = 0;
27
    const SOCK_UNIX = 1;
28
29
    /** @var string */
30
    private $address;
31
32
    /** @var int|null */
33
    private $port;
34
35
    /** @var int */
36
    private $type;
37
38
    /** @var resource|null */
39
    private $socket;
40
41
    /**
42
     * Example:
43
     * $relay = new SocketRelay("localhost", 7000);
44
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
45
     *
46
     * @param string   $address Localhost, ip address or hostname.
47
     * @param int|null $port    Ignored for UNIX sockets.
48
     * @param int      $type    Default: TPC_SOCKET
49
     *
50
     * @throws Exceptions\InvalidArgumentException
51
     */
52
    public function __construct(string $address, int $port = null, int $type = self::SOCK_TPC)
53
    {
54
        switch ($type) {
55
            case self::SOCK_TPC:
56
                if ($port === null) {
57
                    throw new Exceptions\InvalidArgumentException(sprintf(
58
                        "no port given for TPC socket on '%s'",
59
                        $address
60
                    ));
61
                }
62
                break;
63
            case self::SOCK_UNIX:
64
                $port = null;
65
                break;
66
            default:
67
                throw new Exceptions\InvalidArgumentException(sprintf(
68
                    "undefined connection type %s on '%s'",
69
                    $type,
70
                    $address
71
                ));
72
        }
73
74
        $this->address = $address;
75
        $this->port = $port;
76
        $this->type = $type;
77
    }
78
79
    /**
80
     * {@inheritdoc}
81
     */
82
    public function send($payload, int $flags = null): self
83
    {
84
        $this->connect();
85
86
        $size = strlen($payload);
87
        if ($flags & self::PAYLOAD_NONE && $size != 0) {
88
            throw new TransportException("unable to send payload with PAYLOAD_NONE flag");
89
        }
90
91
        socket_send($this->socket, pack('CP', $flags, $size), 9, 0);
92
93
        if (!($flags & self::PAYLOAD_NONE)) {
94
            socket_send($this->socket, $payload, $size, 0);
95
        }
96
97
        return $this;
98
    }
99
100
    /**
101
     * {@inheritdoc}
102
     */
103
    public function receiveSync(int &$flags = null)
104
    {
105
        $this->connect();
106
107
        $prefix = $this->fetchPrefix();
108
        $flags = $prefix['flags'];
109
        $result = null;
110
111
        if ($prefix['size'] !== 0) {
112
            $readBytes = $prefix['size'];
113
            $buffer = null;
114
115
            //Add ability to write to stream in a future
116
            while ($readBytes > 0) {
117
                $bufferLength = socket_recv(
118
                    $this->socket,
119
                    $buffer,
120
                    min(self::BUFFER_SIZE, $readBytes),
121
                    MSG_WAITALL
122
                );
123
124
                $result .= $buffer;
125
                $readBytes -= $bufferLength;
126
            }
127
        }
128
129
        return $result;
130
    }
131
132
    /**
133
     * @return string
134
     */
135
    public function getAddress(): string
136
    {
137
        return $this->address;
138
    }
139
140
    /**
141
     * @return int|null
142
     */
143
    public function getPort()
144
    {
145
        return $this->port;
146
    }
147
148
    /**
149
     * @return int
150
     */
151
    public function getType(): int
152
    {
153
        return $this->type;
154
    }
155
156
    /**
157
     * @return bool
158
     */
159
    public function isConnected(): bool
160
    {
161
        return $this->socket != null;
162
    }
163
164
    /**
165
     * Ensure socket connection. Returns true if socket successfully connected
166
     * or have already been connected.
167
     *
168
     * @return bool
169
     *
170
     * @throws RelayException
171
     * @throws \Error When sockets are used in unsupported environment.
172
     */
173
    public function connect(): bool
174
    {
175
        if ($this->isConnected()) {
176
            return true;
177
        }
178
179
        $this->socket = $this->createSocket();
180
        try {
181
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
182
                throw new RelayException(socket_strerror(socket_last_error($this->socket)));
183
            }
184
        } catch (\Exception $e) {
185
            throw new RelayException("unable to establish connection {$this}", 0, $e);
186
        }
187
188
        return true;
189
    }
190
191
    /**
192
     * Close connection.
193
     *
194
     * @throws RelayException
195
     */
196
    public function close()
197
    {
198
        if (!$this->isConnected()) {
199
            throw new RelayException("unable to close socket '{$this}', socket already closed");
200
        }
201
202
        socket_close($this->socket);
203
        $this->socket = null;
204
    }
205
206
    /**
207
     * Destruct connection and disconnect.
208
     */
209
    public function __destruct()
210
    {
211
        if ($this->isConnected()) {
212
            $this->close();
213
        }
214
    }
215
216
    /**
217
     * @return string
218
     */
219
    public function __toString(): string
220
    {
221
        if ($this->type == self::SOCK_TPC) {
222
            return "tcp://{$this->address}:{$this->port}";
223
        }
224
225
        return "unix://{$this->address}";
226
    }
227
228
    /**
229
     * @return array Prefix [flag, length]
230
     *
231
     * @throws PrefixException
232
     */
233
    private function fetchPrefix(): array
234
    {
235
        $prefixLength = socket_recv($this->socket, $prefixBody, 9, MSG_WAITALL);
236
        if ($prefixBody === false || $prefixLength !== 9) {
237
            throw new PrefixException(sprintf(
238
                "unable to read prefix from socket: %s",
239
                socket_strerror(socket_last_error($this->socket))
240
            ));
241
        }
242
243
        $result = unpack("Cflags/Psize", $prefixBody);
244
        if (!is_array($result)) {
245
            throw new Exceptions\PrefixException("invalid prefix");
246
        }
247
248
        return $result;
249
    }
250
251
    /**
252
     * @return resource
253
     * @throws \Error
254
     */
255
    private function createSocket()
256
    {
257
        if ($this->type === self::SOCK_UNIX) {
258
            if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
259
                throw new \Error("socket {$this} unavailable on Windows");
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with "socket {$this} unavailable on Windows".

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
260
            }
261
262
            return socket_create(1, SOCK_STREAM, SOL_SOCKET);
263
        }
264
265
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
266
    }
267
}
268