Completed
Push — master ( 4c1e33...23763c )
by Mariano
02:50
created

Socket::getType()   B

Complexity

Conditions 8
Paths 8

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 8.0291

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 0
loc 21
ccs 12
cts 13
cp 0.9231
rs 7.1428
cc 8
eloc 12
nc 8
nop 1
crap 8.0291
1
<?php
2
namespace Disque\Connection;
3
4
use Exception;
5
use Disque\Command\CommandInterface;
6
use Disque\Connection\Response\ResponseException;
7
use Disque\Connection\Response;
8
9
/**
10
 * This class is greatly inspired by `Predis\Connection\StreamConnection`,
11
 * which is part of [predis](https://github.com/nrk/predis) and was developed
12
 * by Daniele Alessandri <[email protected]>. All credits go to him where
13
 * relevant.
14
 */
15
class Socket extends BaseConnection implements ConnectionInterface
16
{
17
    /**
18
     * Socket handle
19
     *
20
     * @var resource
21
     */
22
    protected $socket;
23
24
    /**
25
     * Response handlers
26
     *
27
     * The characters used as keys are part of the Redis/Disque protocol.
28
     * Disque uses the same response protocol as Redis, therefore
29
     * @see http://redis.io/topics/protocol
30
     *
31
     * @var array
32
     */
33
    private $responseHandlers = [
34
        '+' => Response\StringResponse::class,
35
        '-' => Response\ErrorResponse::class,
36
        ':' => Response\IntResponse::class,
37
        '$' => Response\TextResponse::class,
38
        '*' => Response\ArrayResponse::class
39
    ];
40
41
    /**
42
     * @inheritdoc
43
     */
44 6
    public function connect($connectionTimeout = 0, $responseTimeout = null)
45
    {
46 6
        parent::connect($connectionTimeout, $responseTimeout);
47
48 2
        $this->socket = $this->getSocket(
49 2
            $this->host,
50 2
            $this->port,
51
            (float) $connectionTimeout
52 2
        );
53 2
        if (!is_resource($this->socket)) {
54 1
            throw new ConnectionException("Could not connect to {$this->host}:{$this->port}");
55
        }
56
57 1
        stream_set_blocking($this->socket, 1);
58 1
        if (!is_null($responseTimeout)) {
59
            stream_set_timeout($this->socket, $responseTimeout);
60
        }
61 1
    }
62
63
    /**
64
     * @inheritdoc
65
     */
66 41
    public function disconnect()
67
    {
68 41
        if (!$this->isConnected()) {
69 12
            return;
70
        }
71 30
        fclose($this->socket);
72 30
        $this->socket = null;
73 30
    }
74
75
    /**
76
     * @inheritdoc
77
     */
78 41
    public function isConnected()
79
    {
80 41
        return (isset($this->socket) && is_resource($this->socket));
81
    }
82
83
    /**
84
     * @inheritdoc
85
     */
86 3
    public function execute(CommandInterface $command)
87
    {
88 3
        $commandName = $command->getCommand();
89 3
        $arguments = $command->getArguments();
90 3
        $totalArguments = count($arguments);
91
92
        $parts = [
93 3
            '*' . ($totalArguments + 1),
94 3
            '$' . strlen($commandName),
95
            $commandName
96 3
        ];
97
98 3
        for ($i=0; $i < $totalArguments; $i++) {
99 2
            $argument = $arguments[$i];
100 2
            $parts[] = '$' . strlen($argument);
101 2
            $parts[] = $argument;
102 2
        }
103
104 3
        $this->send(implode("\r\n", $parts)."\r\n");
105 3
        return $this->receive($command->isBlocking());
106
    }
107
108
    /**
109
     * Execute a command on the connection
110
     *
111
     * @param string $data Data to send
112
     * @throws ConnectionException
113
     */
114 2
    public function send($data)
115
    {
116 2
        $this->shouldBeConnected();
117
118
        do {
119 1
            $length = strlen($data);
120 1
            $bytes = fwrite($this->socket, $data);
121 1
            if (empty($bytes)) {
122
                throw new ConnectionException("Could not write {$length} bytes to client");
123 1
            } elseif ($bytes === $length) {
124 1
                break;
125
            }
126
127
            $data = substr($data, $bytes);
128
        } while ($length > 0);
129 1
    }
130
131
    /**
132
     * Read data from connection
133
     *
134
     * @param bool $keepWaiting If `true`, timeouts on stream read will be ignored
135
     * @return mixed Data received
136
     *
137
     * @throws ConnectionException
138
     * @throws ResponseException
139
     */
140 27
    public function receive($keepWaiting = false)
141
    {
142 27
        $this->shouldBeConnected();
143
144 26
        $type = $this->getType($keepWaiting);
145 25
        if (!array_key_exists($type, $this->responseHandlers)) {
146 1
            throw new ResponseException("Don't know how to handle a response of type {$type}");
147
        }
148
149 24
        $responseHandlerClass = $this->responseHandlers[$type];
150 24
        $responseHandler = new $responseHandlerClass($this->getData());
151
        $responseHandler->setReader(function ($bytes) {
152 6
            return fread($this->socket, $bytes);
153 23
        });
154 23
        $responseHandler->setReceiver(function () use ($keepWaiting) {
155 5
            return $this->receive($keepWaiting);
156 23
        });
157 23
        $response = $responseHandler->parse();
158
159
        /**
160
         * If Disque returned an error, raise it in form of an exception
161
         * @see Disque\Connection\Response\ErrorResponse::parse()
162
         */
163 22
        if ($response instanceof ResponseException) {
164 2
            throw $response;
165
        }
166
167 20
        return $response;
168
    }
169
170
    /**
171
     * Build actual socket
172
     *
173
     * @param string $host Host
174
     * @param int $port Port
175
     * @param float $timeout Timeout
176
     * @return resource Socket
177
     */
178
    protected function getSocket($host, $port, $timeout)
179
    {
180
        return stream_socket_client("tcp://{$host}:{$port}", $error, $message, $timeout, STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT);
181
    }
182
183
    /**
184
     * Get the first byte from Disque, which contains the data type
185
     *
186
     * @param bool $keepWaiting If `true`, timeouts on stream read will be ignored
187
     * @return string A single char
188
     * @throws ConnectionException
189
     */
190 26
    private function getType($keepWaiting = false)
191
    {
192 26
        $type = null;
193 26
        while (!feof($this->socket)) {
194 26
            $type = fgetc($this->socket);
195 26
            if ($type !== false && $type !== '') {
196 25
                break;
197
            }
198
199 1
            $info = stream_get_meta_data($this->socket);
200 1
            if (!$keepWaiting || !$info['timed_out']) {
201 1
                break;
202
            }
203
        }
204
205 26
        if ($type === false || $type === '') {
206 1
            throw new ConnectionException('Nothing received while reading from client');
207
        }
208
209 25
        return $type;
210
    }
211
212
    /**
213
     * Get a line of data
214
     *
215
     * @return string Line of data
216
     * @throws ConnectionException
217
     */
218 24
    private function getData()
219
    {
220 24
        $data = fgets($this->socket);
221 24
        if ($data === false || $data === '') {
222 1
            throw new ConnectionException('Nothing received while reading from client');
223
        }
224 23
        return $data;
225
    }
226
227
    /**
228
     * We should be connected
229
     *
230
     * @return void
231
     * @throws ConnectionException
232
     */
233 29
    private function shouldBeConnected()
234
    {
235 29
        if (!$this->isConnected()) {
236 2
            throw new ConnectionException('No connection established');
237
        }
238
    }
239
}