Completed
Push — master ( f0409c...877555 )
by Anton
01:16
created

Connection::createSocket()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 13
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 6
nc 3
nop 0
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\InvalidArgumentException;
11
use Spiral\Goridge\Exceptions\MessageException;
12
use Spiral\Goridge\Exceptions\PrefixException;
13
use Spiral\Goridge\Exceptions\TransportException;
14
15
/**
16
 * Sends byte payload to RCP server using protocol:
17
 *
18
 * [ prefix     ][ payload        ]
19
 * [ 1+8 bytes  ][ message length ]
20
 *
21
 * prefix:
22
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
23
 *
24
 * flag options:
25
 * KEEP_CONNECTION  - keep socket connection.
26
 * CLOSE_CONNECTION - end socket connection.
27
 */
28
class Connection implements ConnectionInterface
29
{
30
    const CHUNK_SIZE = 65536;
31
32
    /** Supported socket types. */
33
    const SOCK_TPC  = 0;
34
    const SOCK_UNIX = 1;
35
36
    /** @var string */
37
    private $address;
38
39
    /** @var int|null */
40
    private $port;
41
42
    /** @var int */
43
    private $type;
44
45
    /** @var resource|null */
46
    private $socket;
47
48
    /**
49
     * Example:
50
     * $conn = new Connection("localhost", 7000);
51
     *
52
     * $conn = new Connection("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
53
     *
54
     * @param string   $address Localhost, ip address or hostname.
55
     * @param int|null $port    Ignored for UNIX sockets.
56
     * @param int      $type    Default: TPC_SOCKET
57
     *
58
     * @throws \Spiral\Goridge\Exceptions\InvalidArgumentException
59
     */
60
    public function __construct(string $address, int $port = null, int $type = self::SOCK_TPC)
61
    {
62
        switch ($type) {
63
            case self::SOCK_TPC:
64
                if ($port === null) {
65
                    throw new InvalidArgumentException(sprintf(
66
                        "no port given for TPC socket on '%s'",
67
                        $address
68
                    ));
69
                }
70
                break;
71
            case self::SOCK_UNIX:
72
                $port = null;
73
                break;
74
            default:
75
                throw new InvalidArgumentException(sprintf(
76
                    "undefined connection type %s on '%s'",
77
                    $type,
78
                    $address
79
                ));
80
        }
81
82
        $this->address = $address;
83
        $this->port = $port;
84
        $this->type = $type;
85
    }
86
87
    /**
88
     * @return string
89
     */
90
    public function getAddress(): string
91
    {
92
        return $this->address;
93
    }
94
95
    /**
96
     * @return int|null
97
     */
98
    public function getPort(): ? int
99
    {
100
        return $this->port;
101
    }
102
103
    /**
104
     * @return int
105
     */
106
    public function getType(): int
107
    {
108
        return $this->type;
109
    }
110
111
    /**
112
     * @return bool
113
     */
114
    public function isConnected(): bool
115
    {
116
        return $this->socket != null;
117
    }
118
119
    /**
120
     * Send payload message to another party.
121
     *
122
     * @param string|binary $payload
123
     * @param int           $flags
124
     *
125
     * @return self
126
     *
127
     * @throws \Spiral\Goridge\Exceptions\MessageException When message can not be send.
128
     */
129
    public function send($payload, int $flags = self::KEEP_CONNECTION): self
130
    {
131
        $this->connect();
132
133
        $size = strlen($payload);
134
135
        if ($flags & self::NO_BODY && $size != 0) {
136
            throw new MessageException("unable to set body with NO_BODY flag");
137
        }
138
139
        socket_send($this->socket, pack('CP', $flags, $size), 10, 0);
140
141
        if (!($flags & self::NO_BODY)) {
142
            socket_send($this->socket, $payload, $size, 0);
143
        }
144
145
        return $this;
146
    }
147
148
    /**
149
     * Receive message from another party in sync/blocked mode. Message can be null.
150
     *
151
     * @param int $flags Response flags.
152
     *
153
     * @return null|string
154
     *
155
     * @throws \Spiral\Goridge\Exceptions\TransportException When unable to connect or maintain
156
     *                                                         socket.
157
     * @throws \Spiral\Goridge\Exceptions\MessageException When messages can not be retrieved.
158
     */
159
    public function receiveSync(int & $flags = null): ? string
160
    {
161
        $this->connect();
162
163
        $prefix = $this->fetchPrefixSync();
164
        $flags = $prefix['flags'];
165
        $result = null;
166
167
        if ($prefix['size'] !== 0) {
168
            $readBytes = $prefix['size'];
169
            $buffer = null;
170
171
            //Add ability to write to stream in a future
172
            while ($readBytes > 0) {
173
                $bufferLength = socket_recv(
174
                    $this->socket,
175
                    $buffer,
176
                    min(self::CHUNK_SIZE, $readBytes),
177
                    MSG_WAITALL
178
                );
179
180
                $result .= $buffer;
181
                $readBytes -= $bufferLength;
182
            }
183
        }
184
185
        if ($flags & self::CLOSE_CONNECTION) {
186
            $this->close();
187
        }
188
189
        return $result;
190
    }
191
192
    /**
193
     * Ensure socket connection. Returns true if socket successfully connected
194
     * or have already been connected.
195
     *
196
     * @return bool
197
     *
198
     * @throws \Spiral\Goridge\Exceptions\TransportException
199
     * @throws \Error When sockets are used in unsupported environment.
200
     */
201
    public function connect(): bool
202
    {
203
        if ($this->isConnected()) {
204
            return true;
205
        }
206
207
        try {
208
            $this->socket = $this->createSocket();
1 ignored issue
show
Documentation Bug introduced by
It seems like $this->createSocket() of type object<Spiral\Goridge\resource> is incompatible with the declared type resource|null of property $socket.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
209
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
210
                throw new \Exception(socket_strerror(socket_last_error($this->socket)));
211
            }
212
        } catch (\Exception $e) {
213
            throw new TransportException("unable to establish connection {$this}", 0, $e);
214
        }
215
216
        return true;
217
    }
218
219
    /**
220
     * Close connection.
221
     *
222
     * @throws \Spiral\Goridge\Exceptions\TransportException
223
     */
224
    public function close()
225
    {
226
        if (!$this->isConnected()) {
227
            throw new TransportException("unable to close socket '{$this}', socket already closed");
228
        }
229
230
        socket_close($this->socket);
231
        $this->socket = null;
232
    }
233
234
    /**
235
     * Destruct connection and disconnect.
236
     */
237
    public function __destruct()
238
    {
239
        if ($this->isConnected()) {
240
            $this->close();
241
        }
242
    }
243
244
    /**
245
     * @return string
246
     */
247
    public function __toString(): string
248
    {
249
        if ($this->type == self::SOCK_TPC) {
250
            return "tcp://{$this->address}:{$this->port}";
251
        }
252
253
        return "unix://{$this->address}";
254
    }
255
256
    /**
257
     * @return array Prefix [flag, length]
258
     *
259
     * @throws \Spiral\Goridge\Exceptions\PrefixException
260
     */
261
    private function fetchPrefixSync(): array
262
    {
263
        $prefixLength = socket_recv($this->socket, $prefixBody, 9, MSG_WAITALL);
264
        if ($prefixBody === false || $prefixLength !== 9) {
265
            throw new PrefixException(sprintf(
266
                "unable to read prefix from socket: %s",
267
                socket_strerror(socket_last_error($this->socket))
268
            ));
269
        }
270
271
        return unpack("Cflags/Psize", $prefixBody);
272
    }
273
274
    /**
275
     * @return resource
276
     * @throws \Error
277
     */
278
    private function createSocket(): resource
279
    {
280
        if ($this->type === self::SOCK_UNIX) {
281
            if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
282
                throw new \Error("socket {$this} unavailable in windows");
0 ignored issues
show
Unused Code introduced by
The call to Error::__construct() has too many arguments starting with "socket {$this} unavailable in windows".

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
283
            }
284
285
            return socket_create(1, SOCK_STREAM, 0);
286
287
        }
288
289
        return socket_create(AF_INET, SOCK_STREAM, 0);
290
    }
291
}