1
|
|
|
<?php |
|
|
|
|
2
|
|
|
|
3
|
|
|
namespace Dazzle\Redis; |
4
|
|
|
|
5
|
|
|
use Clue\Redis\Protocol\Model\ErrorReply; |
6
|
|
|
use Clue\Redis\Protocol\Model\ModelInterface; |
7
|
|
|
use Clue\Redis\Protocol\Parser\ParserException; |
8
|
|
|
use Dazzle\Event\BaseEventEmitter; |
9
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
10
|
|
|
use Dazzle\Loop\LoopInterface; |
11
|
|
|
use Dazzle\Promise\Deferred; |
12
|
|
|
use Dazzle\Promise\Promise; |
13
|
|
|
use Dazzle\Promise\PromiseInterface; |
14
|
|
|
use Dazzle\Redis\Driver\Request; |
15
|
|
|
use Dazzle\Redis\Driver\Driver; |
16
|
|
|
use Dazzle\Redis\Driver\DriverInterface; |
17
|
|
|
use Dazzle\Socket\Socket; |
18
|
|
|
use Dazzle\Socket\SocketInterface; |
19
|
|
|
use Dazzle\Throwable\Exception\Runtime\ExecutionException; |
20
|
|
|
use Dazzle\Throwable\Exception\Runtime\UnderflowException; |
21
|
|
|
use Dazzle\Throwable\Exception\Runtime\WriteException; |
22
|
|
|
use Error; |
23
|
|
|
use Exception; |
24
|
|
|
|
25
|
|
|
class Redis extends BaseEventEmitter implements RedisInterface |
26
|
|
|
{ |
27
|
|
|
use LoopAwareTrait; |
28
|
|
|
use Command\Compose\ApiChannelTrait; |
29
|
|
|
use Command\Compose\ApiClusterTrait; |
30
|
|
|
use Command\Compose\ApiConnTrait; |
31
|
|
|
use Command\Compose\ApiCoreTrait; |
32
|
|
|
use Command\Compose\ApiGeospatialTrait; |
33
|
|
|
use Command\Compose\ApiHyperLogTrait; |
34
|
|
|
use Command\Compose\ApiKeyValTrait; |
35
|
|
|
use Command\Compose\ApiListTrait; |
36
|
|
|
use Command\Compose\ApiSetTrait; |
37
|
|
|
use Command\Compose\ApiSetHashTrait; |
38
|
|
|
use Command\Compose\ApiSetSortedTrait; |
39
|
|
|
use Command\Compose\ApiTransactionTrait; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var string |
43
|
|
|
*/ |
44
|
|
|
protected $endpoint; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var SocketInterface |
48
|
|
|
*/ |
49
|
|
|
protected $stream; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var DriverInterface |
53
|
|
|
*/ |
54
|
|
|
protected $driver; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @var bool |
58
|
|
|
*/ |
59
|
|
|
protected $isConnected; |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* @var bool |
63
|
|
|
*/ |
64
|
|
|
protected $isBeingDisconnected; |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @var PromiseInterface|null; |
68
|
|
|
*/ |
69
|
|
|
protected $endPromise; |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* @var array |
73
|
|
|
*/ |
74
|
|
|
private $reqs; |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @param string $endpoint |
78
|
|
|
* @param LoopInterface $loop |
79
|
|
|
*/ |
80
|
|
|
public function __construct($endpoint, LoopInterface $loop) |
81
|
|
|
{ |
82
|
|
|
$this->endpoint = $endpoint; |
83
|
|
|
$this->loop = $loop; |
84
|
|
|
$this->stream = null; |
85
|
|
|
$this->driver = new Driver(); |
86
|
|
|
|
87
|
|
|
$this->isConnected = false; |
88
|
|
|
$this->isBeingDisconnected = false; |
89
|
|
|
$this->endPromise = null; |
90
|
|
|
|
91
|
|
|
$this->reqs = []; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
/** |
95
|
|
|
* |
96
|
|
|
*/ |
97
|
|
|
public function __destruct() |
98
|
|
|
{ |
99
|
|
|
$this->stop(); |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @override |
104
|
|
|
* @inheritDoc |
105
|
|
|
*/ |
106
|
|
|
public function isPaused() |
107
|
|
|
{ |
108
|
|
|
return $this->stream === null ? false : $this->stream->isPaused(); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
/** |
112
|
|
|
* @override |
113
|
|
|
* @inheritDoc |
114
|
|
|
*/ |
115
|
|
|
public function pause() |
116
|
|
|
{ |
117
|
|
|
if ($this->stream !== null) |
118
|
|
|
{ |
119
|
|
|
$this->stream->pause(); |
120
|
|
|
} |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
/** |
124
|
|
|
* @override |
125
|
|
|
* @inheritDoc |
126
|
|
|
*/ |
127
|
|
|
public function resume() |
128
|
|
|
{ |
129
|
|
|
if ($this->stream !== null) |
130
|
|
|
{ |
131
|
|
|
$this->stream->resume(); |
132
|
|
|
} |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
/** |
136
|
|
|
* @override |
137
|
|
|
* @inheritDoc |
138
|
|
|
*/ |
139
|
137 |
|
public function isStarted() |
140
|
|
|
{ |
141
|
137 |
|
return $this->isConnected; |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @override |
146
|
|
|
* @inheritDoc |
147
|
|
|
*/ |
148
|
137 |
|
public function isBusy() |
149
|
|
|
{ |
150
|
137 |
|
return !empty($this->reqs); |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* @override |
155
|
|
|
* @inheritDoc |
156
|
|
|
*/ |
157
|
137 |
|
public function start() |
158
|
|
|
{ |
159
|
137 |
|
if ($this->isStarted()) |
160
|
|
|
{ |
161
|
|
|
return Promise::doResolve($this); |
162
|
|
|
} |
163
|
|
|
|
164
|
137 |
|
$ex = null; |
165
|
137 |
|
$stream = null; |
166
|
|
|
|
167
|
|
|
try |
168
|
|
|
{ |
169
|
137 |
|
$stream = $this->createClient($this->endpoint); |
170
|
|
|
} |
171
|
|
|
catch (Error $ex) |
|
|
|
|
172
|
|
|
{} |
173
|
|
|
catch (Exception $ex) |
|
|
|
|
174
|
|
|
{} |
175
|
|
|
|
176
|
137 |
|
if ($ex !== null) |
177
|
|
|
{ |
178
|
|
|
return Promise::doReject($ex); |
179
|
|
|
} |
180
|
|
|
|
181
|
137 |
|
$this->isConnected = true; |
182
|
137 |
|
$this->isBeingDisconnected = false; |
183
|
137 |
|
$this->stream = $stream; |
184
|
137 |
|
$this->handleStart(); |
185
|
137 |
|
$this->emit('start', [ $this ]); |
186
|
|
|
|
187
|
137 |
|
return Promise::doResolve($this); |
188
|
|
|
} |
189
|
|
|
|
190
|
|
|
/** |
191
|
|
|
* @override |
192
|
|
|
* @inheritDoc |
193
|
|
|
*/ |
194
|
137 |
|
public function stop() |
195
|
|
|
{ |
196
|
137 |
|
if (!$this->isStarted()) |
197
|
|
|
{ |
198
|
137 |
|
return Promise::doResolve($this); |
199
|
|
|
} |
200
|
|
|
|
201
|
137 |
|
$this->isBeingDisconnected = true; |
202
|
137 |
|
$this->isConnected = false; |
203
|
|
|
|
204
|
137 |
|
$this->stream->close(); |
205
|
137 |
|
$this->stream = null; |
206
|
|
|
|
207
|
137 |
|
foreach ($this->reqs as $req) |
208
|
|
|
{ |
209
|
2 |
|
$req->reject(new ExecutionException('Connection has been closed!')); |
210
|
|
|
} |
211
|
|
|
|
212
|
137 |
|
$this->reqs = []; |
213
|
137 |
|
$this->handleStop(); |
214
|
137 |
|
$this->emit('stop', [ $this ]); |
215
|
|
|
|
216
|
137 |
|
if ($this->endPromise !== null) |
217
|
|
|
{ |
218
|
|
|
$promise = $this->endPromise; |
219
|
|
|
$this->endPromise = null; |
220
|
|
|
$promise->resolve($this); |
|
|
|
|
221
|
|
|
} |
222
|
|
|
|
223
|
137 |
|
return Promise::doResolve($this); |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
/** |
227
|
|
|
* @override |
228
|
|
|
* @inheritDoc |
229
|
|
|
*/ |
230
|
|
|
public function end() |
231
|
|
|
{ |
232
|
|
|
if (!$this->isStarted()) |
233
|
|
|
{ |
234
|
|
|
return Promise::doResolve($this); |
235
|
|
|
} |
236
|
|
|
if ($this->isBeingDisconnected) |
237
|
|
|
{ |
238
|
|
|
return Promise::doReject(new WriteException('Tried to double end same connection.')); |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
$promise = new Promise(); |
242
|
|
|
$this->isBeingDisconnected = true; |
243
|
|
|
$this->endPromise = $promise; |
244
|
|
|
|
245
|
|
|
return $promise; |
246
|
|
|
} |
247
|
|
|
|
248
|
|
|
/** |
249
|
|
|
* Dispatch Redis request. |
250
|
|
|
* |
251
|
|
|
* @param Request $command |
252
|
|
|
* @return PromiseInterface |
253
|
|
|
*/ |
254
|
137 |
|
protected function dispatch(Request $command) |
255
|
|
|
{ |
256
|
137 |
|
$request = new Deferred(); |
257
|
137 |
|
$promise = $request->getPromise(); |
258
|
|
|
|
259
|
137 |
|
if ($this->isBeingDisconnected) |
260
|
|
|
{ |
261
|
|
|
$request->reject(new ExecutionException('Redis client connection is being stopped now.')); |
262
|
|
|
} |
263
|
|
|
else |
264
|
|
|
{ |
265
|
137 |
|
$this->stream->write($this->driver->commands($command)); |
266
|
137 |
|
$this->reqs[] = $request; |
267
|
|
|
} |
268
|
|
|
|
269
|
137 |
|
return $promise; |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
/** |
273
|
|
|
* @internal |
274
|
|
|
*/ |
275
|
137 |
View Code Duplication |
protected function handleStart() |
|
|
|
|
276
|
|
|
{ |
277
|
137 |
|
if ($this->stream !== null) |
278
|
|
|
{ |
279
|
137 |
|
$this->stream->on('data', [ $this, 'handleData' ]); |
280
|
137 |
|
$this->stream->on('close', [ $this, 'stop' ]); |
281
|
|
|
} |
282
|
137 |
|
} |
283
|
|
|
|
284
|
|
|
/** |
285
|
|
|
* @internal |
286
|
|
|
*/ |
287
|
137 |
View Code Duplication |
protected function handleStop() |
|
|
|
|
288
|
|
|
{ |
289
|
137 |
|
if ($this->stream !== null) |
290
|
|
|
{ |
291
|
|
|
$this->stream->removeListener('data', [ $this, 'handleData' ]); |
292
|
|
|
$this->stream->removeListener('close', [ $this, 'stop' ]); |
293
|
|
|
} |
294
|
137 |
|
} |
295
|
|
|
|
296
|
|
|
/** |
297
|
|
|
* @internal |
298
|
|
|
* @param SocketInterface $stream |
299
|
|
|
* @param string $chunk |
300
|
|
|
*/ |
301
|
137 |
|
public function handleData($stream, $chunk) |
302
|
|
|
{ |
303
|
|
|
try |
304
|
|
|
{ |
305
|
137 |
|
$models = $this->driver->parseResponse($chunk); |
306
|
|
|
} |
307
|
|
|
catch (ParserException $error) |
308
|
|
|
{ |
309
|
|
|
$this->emit('error', [ $this, $error ]); |
310
|
|
|
$this->stop(); |
311
|
|
|
return; |
312
|
|
|
} |
313
|
|
|
|
314
|
137 |
|
foreach ($models as $data) |
315
|
|
|
{ |
316
|
|
|
try |
317
|
|
|
{ |
318
|
137 |
|
$this->handleMessage($data); |
319
|
|
|
} |
320
|
|
|
catch (UnderflowException $error) |
321
|
|
|
{ |
322
|
|
|
$this->emit('error', [ $this, $error ]); |
323
|
|
|
$this->stop(); |
324
|
137 |
|
return; |
325
|
|
|
} |
326
|
|
|
} |
327
|
137 |
|
} |
328
|
|
|
|
329
|
|
|
/** |
330
|
|
|
* @internal |
331
|
|
|
* @param ModelInterface $message |
332
|
|
|
*/ |
333
|
137 |
|
protected function handleMessage(ModelInterface $message) |
334
|
|
|
{ |
335
|
137 |
|
if (!$this->reqs) |
|
|
|
|
336
|
|
|
{ |
337
|
|
|
throw new UnderflowException('Unexpected reply received, no matching request found'); |
338
|
|
|
} |
339
|
|
|
|
340
|
137 |
|
$request = array_shift($this->reqs); |
341
|
|
|
|
342
|
137 |
|
if ($message instanceof ErrorReply) |
343
|
|
|
{ |
344
|
|
|
$request->reject($message); |
345
|
|
|
} |
346
|
|
|
else |
347
|
|
|
{ |
348
|
137 |
|
$request->resolve($message->getValueNative()); |
349
|
|
|
} |
350
|
|
|
|
351
|
137 |
|
if ($this->isBeingDisconnected && !$this->isBusy()) |
352
|
|
|
{ |
353
|
137 |
|
$this->stop(); |
354
|
|
|
} |
355
|
137 |
|
} |
356
|
|
|
|
357
|
|
|
/** |
358
|
|
|
* Create socket client with connection to Redis database. |
359
|
|
|
* |
360
|
|
|
* @param string $endpoint |
361
|
|
|
* @return SocketInterface |
362
|
|
|
* @throws ExecutionException |
363
|
|
|
*/ |
364
|
137 |
|
protected function createClient($endpoint) |
365
|
|
|
{ |
366
|
137 |
|
$ex = null; |
367
|
|
|
|
368
|
|
|
try |
369
|
|
|
{ |
370
|
137 |
|
return new Socket($endpoint, $this->loop); |
|
|
|
|
371
|
|
|
} |
372
|
|
|
catch (Error $ex) |
|
|
|
|
373
|
|
|
{} |
374
|
|
|
catch (Exception $ex) |
|
|
|
|
375
|
|
|
{} |
376
|
|
|
|
377
|
|
|
throw new ExecutionException('Redis connection socket could not be created!', 0, $ex); |
378
|
|
|
} |
379
|
|
|
}; |
380
|
|
|
|
The PSR-1: Basic Coding Standard recommends that a file should either introduce new symbols, that is classes, functions, constants or similar, or have side effects. Side effects are anything that executes logic, like for example printing output, changing ini settings or writing to a file.
The idea behind this recommendation is that merely auto-loading a class should not change the state of an application. It also promotes a cleaner style of programming and makes your code less prone to errors, because the logic is not spread out all over the place.
To learn more about the PSR-1, please see the PHP-FIG site on the PSR-1.