1 | <?php |
||
15 | class Socket extends BaseConnection implements ConnectionInterface |
||
16 | { |
||
17 | /** |
||
18 | * Socket handle |
||
19 | * |
||
20 | * @var resource |
||
21 | */ |
||
22 | protected $socket; |
||
23 | |||
24 | /** |
||
25 | * Response handlers |
||
26 | * |
||
27 | * The characters used as keys are part of the Redis/Disque protocol. |
||
28 | * Disque uses the same response protocol as Redis, therefore |
||
29 | * @see http://redis.io/topics/protocol |
||
30 | * |
||
31 | * @var array |
||
32 | */ |
||
33 | private $responseHandlers = [ |
||
34 | '+' => Response\StringResponse::class, |
||
35 | '-' => Response\ErrorResponse::class, |
||
36 | ':' => Response\IntResponse::class, |
||
37 | '$' => Response\TextResponse::class, |
||
38 | '*' => Response\ArrayResponse::class |
||
39 | ]; |
||
40 | |||
41 | /** |
||
42 | * @inheritdoc |
||
43 | */ |
||
44 | 6 | public function connect($connectionTimeout = 0, $responseTimeout = null) |
|
45 | { |
||
46 | 6 | parent::connect($connectionTimeout, $responseTimeout); |
|
47 | |||
48 | 2 | $this->socket = $this->getSocket( |
|
49 | 2 | $this->host, |
|
50 | 2 | $this->port, |
|
51 | (float) $connectionTimeout |
||
52 | 2 | ); |
|
53 | 2 | if (!is_resource($this->socket)) { |
|
54 | 1 | throw new ConnectionException("Could not connect to {$this->host}:{$this->port}"); |
|
55 | } |
||
56 | |||
57 | 1 | stream_set_blocking($this->socket, 1); |
|
58 | 1 | if (!is_null($responseTimeout)) { |
|
59 | stream_set_timeout($this->socket, $responseTimeout); |
||
60 | } |
||
61 | 1 | } |
|
62 | |||
63 | /** |
||
64 | * @inheritdoc |
||
65 | */ |
||
66 | 41 | public function disconnect() |
|
74 | |||
75 | /** |
||
76 | * @inheritdoc |
||
77 | */ |
||
78 | 41 | public function isConnected() |
|
82 | |||
83 | /** |
||
84 | * @inheritdoc |
||
85 | */ |
||
86 | 3 | public function execute(CommandInterface $command) |
|
87 | { |
||
88 | 3 | $commandName = $command->getCommand(); |
|
89 | 3 | $arguments = $command->getArguments(); |
|
90 | 3 | $totalArguments = count($arguments); |
|
91 | |||
92 | $parts = [ |
||
93 | 3 | '*' . ($totalArguments + 1), |
|
94 | 3 | '$' . strlen($commandName), |
|
95 | $commandName |
||
96 | 3 | ]; |
|
97 | |||
98 | 3 | for ($i=0; $i < $totalArguments; $i++) { |
|
99 | 2 | $argument = $arguments[$i]; |
|
100 | 2 | $parts[] = '$' . strlen($argument); |
|
101 | 2 | $parts[] = $argument; |
|
102 | 2 | } |
|
103 | |||
104 | 3 | $this->send(implode("\r\n", $parts)."\r\n"); |
|
105 | 3 | return $this->receive($command->isBlocking()); |
|
106 | } |
||
107 | |||
108 | /** |
||
109 | * Execute a command on the connection |
||
110 | * |
||
111 | * @param string $data Data to send |
||
112 | * @throws ConnectionException |
||
113 | */ |
||
114 | 2 | public function send($data) |
|
130 | |||
131 | /** |
||
132 | * Read data from connection |
||
133 | * |
||
134 | * @param bool $keepWaiting If `true`, timeouts on stream read will be ignored |
||
135 | * @return mixed Data received |
||
136 | * |
||
137 | * @throws ConnectionException |
||
138 | * @throws ResponseException |
||
139 | */ |
||
140 | 27 | public function receive($keepWaiting = false) |
|
169 | |||
170 | /** |
||
171 | * Build actual socket |
||
172 | * |
||
173 | * @param string $host Host |
||
174 | * @param int $port Port |
||
175 | * @param float $timeout Timeout |
||
176 | * @return resource Socket |
||
177 | */ |
||
178 | protected function getSocket($host, $port, $timeout) |
||
182 | |||
183 | /** |
||
184 | * Get the first byte from Disque, which contains the data type |
||
185 | * |
||
186 | * @param bool $keepWaiting If `true`, timeouts on stream read will be ignored |
||
187 | * @return string A single char |
||
188 | * @throws ConnectionException |
||
189 | */ |
||
190 | 26 | private function getType($keepWaiting = false) |
|
191 | { |
||
192 | 26 | $type = null; |
|
193 | 26 | while (!feof($this->socket)) { |
|
194 | 26 | $type = fgetc($this->socket); |
|
195 | 26 | if ($type !== false && $type !== '') { |
|
196 | 25 | break; |
|
197 | } |
||
198 | |||
199 | 1 | $info = stream_get_meta_data($this->socket); |
|
200 | 1 | if (!$keepWaiting || !$info['timed_out']) { |
|
201 | 1 | break; |
|
202 | } |
||
203 | } |
||
204 | |||
205 | 26 | if ($type === false || $type === '') { |
|
206 | 1 | throw new ConnectionException('Nothing received while reading from client'); |
|
207 | } |
||
208 | |||
209 | 25 | return $type; |
|
210 | } |
||
211 | |||
212 | /** |
||
213 | * Get a line of data |
||
214 | * |
||
215 | * @return string Line of data |
||
216 | * @throws ConnectionException |
||
217 | */ |
||
218 | 24 | private function getData() |
|
226 | |||
227 | /** |
||
228 | * We should be connected |
||
229 | * |
||
230 | * @return void |
||
231 | * @throws ConnectionException |
||
232 | */ |
||
233 | 29 | private function shouldBeConnected() |
|
239 | } |