Completed
Pull Request — master (#51)
by
unknown
02:42
created

SocketRelay::sendPackage()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
nc 2
nop 4
dl 0
loc 23
rs 9.552
c 0
b 0
f 0
1
<?php
2
/**
3
 * Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies
4
 *
5
 * @author Wolfy-J
6
 */
7
8
namespace Spiral\Goridge;
9
10
use Spiral\Goridge\Exceptions\GoridgeException;
11
use Spiral\Goridge\Exceptions\PrefixException;
12
use Spiral\Goridge\Exceptions\RelayException;
13
use Spiral\Goridge\Exceptions\TransportException;
14
15
/**
16
 * Communicates with remote server/client over be-directional socket using byte payload:
17
 *
18
 * [ prefix       ][ payload                               ]
19
 * [ 1+8+8 bytes  ][ message length|LE ][message length|BE ]
20
 *
21
 * prefix:
22
 * [ flag       ][ message length, unsigned int 64bits, LittleEndian ]
23
 */
24
class SocketRelay implements RelayInterface
25
{
26
    /** Supported socket types. */
27
    const SOCK_TCP  = 0;
28
    const SOCK_UNIX = 1;
29
30
    // @deprecated
31
    const SOCK_TPC = self::SOCK_TCP;
32
33
    /** @var string */
34
    private $address;
35
36
    /** @var int|null */
37
    private $port;
38
39
    /** @var int */
40
    private $type;
41
42
    /** @var resource|null */
43
    private $socket;
44
45
    /**
46
     * Example:
47
     * $relay = new SocketRelay("localhost", 7000);
48
     * $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET);
49
     *
50
     * @param string   $address Localhost, ip address or hostname.
51
     * @param int|null $port    Ignored for UNIX sockets.
52
     * @param int      $type    Default: TCP_SOCKET
53
     *
54
     * @throws Exceptions\InvalidArgumentException
55
     */
56
    public function __construct(string $address, int $port = null, int $type = self::SOCK_TCP)
57
    {
58
        switch ($type) {
59
            case self::SOCK_TCP:
60
                if ($port === null) {
61
                    throw new Exceptions\InvalidArgumentException(sprintf(
62
                        "no port given for TPC socket on '%s'",
63
                        $address
64
                    ));
65
                }
66
                break;
67
            case self::SOCK_UNIX:
68
                $port = null;
69
                break;
70
            default:
71
                throw new Exceptions\InvalidArgumentException(sprintf(
72
                    "undefined connection type %s on '%s'",
73
                    $type,
74
                    $address
75
                ));
76
        }
77
78
        $this->address = $address;
79
        $this->port = $port;
80
        $this->type = $type;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86
    public function sendPackage(
87
        string $headerPayload,
88
        ?int $headerFlags,
89
        string $bodyPayload,
90
        ?int $bodyFlags = null
91
    ) {
92
        $this->connect();
93
94
        $headerPackage = packMessage($headerPayload, $headerFlags);
95
        $bodyPackage = packMessage($bodyPayload, $bodyFlags);
96
        if ($headerPackage === null || $bodyPackage === null) {
97
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
98
        }
99
100
        socket_send(
101
            $this->socket,
102
            $headerPackage['body'] . $bodyPackage['body'],
103
            34 + $headerPackage['size'] + $bodyPackage['size'],
104
            0
105
        );
106
107
        return $this;
108
    }
109
110
    /**
111
     * {@inheritdoc}
112
     */
113
    public function send($payload, int $flags = null)
114
    {
115
        $this->connect();
116
117
        $package = packMessage($payload, $flags);
118
        if ($package === null) {
119
            throw new TransportException('unable to send payload with PAYLOAD_NONE flag');
120
        }
121
122
        socket_send($this->socket, $package['body'], 17 + $package['size'], 0);
123
124
        return $this;
125
    }
126
127
    /**
128
     * {@inheritdoc}
129
     */
130
    public function receiveSync(int &$flags = null)
131
    {
132
        $this->connect();
133
134
        $prefix = $this->fetchPrefix();
135
        $flags = $prefix['flags'];
136
        $result = null;
137
138
        if ($prefix['size'] !== 0) {
139
            $readBytes = $prefix['size'];
140
            $buffer = null;
141
142
            //Add ability to write to stream in a future
143
            while ($readBytes > 0) {
144
                $bufferLength = socket_recv(
145
                    $this->socket,
146
                    $buffer,
147
                    min(self::BUFFER_SIZE, $readBytes),
148
                    MSG_WAITALL
149
                );
150
151
                $result .= $buffer;
152
                $readBytes -= $bufferLength;
153
            }
154
        }
155
156
        return $result;
157
    }
158
159
    /**
160
     * @return string
161
     */
162
    public function getAddress(): string
163
    {
164
        return $this->address;
165
    }
166
167
    /**
168
     * @return int|null
169
     */
170
    public function getPort()
171
    {
172
        return $this->port;
173
    }
174
175
    /**
176
     * @return int
177
     */
178
    public function getType(): int
179
    {
180
        return $this->type;
181
    }
182
183
    /**
184
     * @return bool
185
     */
186
    public function isConnected(): bool
187
    {
188
        return $this->socket != null;
189
    }
190
191
    /**
192
     * Ensure socket connection. Returns true if socket successfully connected
193
     * or have already been connected.
194
     *
195
     * @return bool
196
     *
197
     * @throws RelayException
198
     * @throws \Error When sockets are used in unsupported environment.
199
     */
200
    public function connect(): bool
201
    {
202
        if ($this->isConnected()) {
203
            return true;
204
        }
205
206
        $this->socket = $this->createSocket();
207
        try {
208
            if (socket_connect($this->socket, $this->address, $this->port) === false) {
209
                throw new RelayException(socket_strerror(socket_last_error($this->socket)));
210
            }
211
        } catch (\Exception $e) {
212
            throw new RelayException("unable to establish connection {$this}", 0, $e);
213
        }
214
215
        return true;
216
    }
217
218
    /**
219
     * Close connection.
220
     *
221
     * @throws RelayException
222
     */
223
    public function close()
224
    {
225
        if (!$this->isConnected()) {
226
            throw new RelayException("unable to close socket '{$this}', socket already closed");
227
        }
228
229
        socket_close($this->socket);
230
        $this->socket = null;
231
    }
232
233
    /**
234
     * Destruct connection and disconnect.
235
     */
236
    public function __destruct()
237
    {
238
        if ($this->isConnected()) {
239
            $this->close();
240
        }
241
    }
242
243
    /**
244
     * @return string
245
     */
246
    public function __toString(): string
247
    {
248
        if ($this->type == self::SOCK_TCP) {
249
            return "tcp://{$this->address}:{$this->port}";
250
        }
251
252
        return "unix://{$this->address}";
253
    }
254
255
    /**
256
     * @return array Prefix [flag, length]
257
     *
258
     * @throws PrefixException
259
     */
260
    private function fetchPrefix(): array
261
    {
262
        $prefixLength = socket_recv($this->socket, $prefixBody, 17, MSG_WAITALL);
263
        if ($prefixBody === false || $prefixLength !== 17) {
264
            throw new PrefixException(sprintf(
265
                "unable to read prefix from socket: %s",
266
                socket_strerror(socket_last_error($this->socket))
267
            ));
268
        }
269
270
        $result = unpack("Cflags/Psize/Jrevs", $prefixBody);
271
        if (!is_array($result)) {
272
            throw new Exceptions\PrefixException("invalid prefix");
273
        }
274
275
        if ($result['size'] != $result['revs']) {
276
            throw new Exceptions\PrefixException("invalid prefix (checksum)");
277
        }
278
279
        return $result;
280
    }
281
282
    /**
283
     * @return resource
284
     * @throws GoridgeException
285
     */
286
    private function createSocket()
287
    {
288
        if ($this->type === self::SOCK_UNIX) {
289
            return socket_create(AF_UNIX, SOCK_STREAM, 0);
290
        }
291
292
        return socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
293
    }
294
}
295