1
|
|
|
<?php |
2
|
|
|
namespace PHPDaemon\Clients\Redis; |
3
|
|
|
|
4
|
|
|
use PHPDaemon\Core\CallbackWrapper; |
5
|
|
|
use PHPDaemon\Core\Daemon; |
6
|
|
|
use PHPDaemon\Core\Debug; |
7
|
|
|
use PHPDaemon\Network\ClientConnection; |
8
|
|
|
|
9
|
|
|
/** |
10
|
|
|
* @package NetworkClients |
11
|
|
|
* @subpackage RedisClient |
12
|
|
|
* @author Vasily Zorin <[email protected]> |
13
|
|
|
*/ |
14
|
|
|
class Connection extends ClientConnection implements \Iterator |
15
|
|
|
{ |
16
|
|
|
/** |
17
|
|
|
* Default flag |
18
|
|
|
*/ |
19
|
|
|
const RESULT_TYPE_DEFAULT = 0; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* Flag - parse message response |
23
|
|
|
*/ |
24
|
|
|
const RESULT_TYPE_MESSAGE = 1; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* Flag - parse response with arguments |
28
|
|
|
*/ |
29
|
|
|
const RESULT_TYPE_ARGSVALS = 2; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* Flag - parse response to associative array |
33
|
|
|
*/ |
34
|
|
|
const RESULT_TYPE_ASSOC = 3; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var array|null Current result |
38
|
|
|
*/ |
39
|
|
|
public $result = null; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var string Channel name |
43
|
|
|
*/ |
44
|
|
|
public $channel = null; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var string Message |
48
|
|
|
*/ |
49
|
|
|
public $msg = null; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var string Current error message |
53
|
|
|
*/ |
54
|
|
|
public $error; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @var array Subcriptions |
58
|
|
|
*/ |
59
|
|
|
public $subscribeCb = []; |
60
|
|
|
|
61
|
|
|
public $psubscribeCb = []; |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* @var string Current incoming key |
65
|
|
|
*/ |
66
|
|
|
protected $key; |
67
|
|
|
|
68
|
|
|
protected $stack = []; |
69
|
|
|
|
70
|
|
|
protected $ptr; |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @var integer Current value length |
74
|
|
|
*/ |
75
|
|
|
protected $valueLength = 0; |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* @var integer Current level length |
79
|
|
|
*/ |
80
|
|
|
protected $levelLength = null; |
81
|
|
|
|
82
|
|
|
/** |
83
|
|
|
* @var string |
84
|
|
|
*/ |
85
|
|
|
protected $EOL = "\r\n"; |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @var boolean Is it a subscription connection? |
89
|
|
|
*/ |
90
|
|
|
protected $subscribed = false; |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @var float Timeout |
94
|
|
|
*/ |
95
|
|
|
protected $timeoutRead = 120; |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* @var integer Iterator position |
99
|
|
|
*/ |
100
|
|
|
protected $pos = 0; |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @var \SplStack Stack of results types |
104
|
|
|
*/ |
105
|
|
|
protected $resultTypeStack; |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* @var null|array Current result type |
109
|
|
|
*/ |
110
|
|
|
protected $resultType = 0; |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @var \SplStack Stack of commands arguments |
114
|
|
|
*/ |
115
|
|
|
protected $argsStack; |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* @var null|array Current arguments |
119
|
|
|
*/ |
120
|
|
|
protected $args = null; |
121
|
|
|
|
122
|
|
|
/** |
123
|
|
|
* @var null|array Associative result storage |
124
|
|
|
*/ |
125
|
|
|
protected $assocData = null; |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* In the middle of binary response part |
129
|
|
|
*/ |
130
|
|
|
const STATE_BINARY = 1; |
131
|
|
|
|
132
|
|
|
public function __construct($fd, $pool = null) |
133
|
|
|
{ |
134
|
|
|
parent::__construct($fd, $pool); |
135
|
|
|
$this->resultTypeStack = new \SplStack; |
136
|
|
|
$this->argsStack = new \SplStack; |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
public function rewind() |
140
|
|
|
{ |
141
|
|
|
$this->pos = 0; |
142
|
|
|
} |
143
|
|
|
|
144
|
|
View Code Duplication |
public function current() |
|
|
|
|
145
|
|
|
{ |
146
|
|
|
if (!is_array($this->result)) { |
147
|
|
|
return $this->pos === 0 ? $this->result : null; |
148
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_DEFAULT || $this->resultType === static::RESULT_TYPE_ARGSVALS) { |
149
|
|
|
return $this->result[$this->pos]; |
150
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_MESSAGE) { |
151
|
|
|
// message |
152
|
|
|
return $this->result[2]; |
153
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ASSOC) { |
154
|
|
|
return $this->result[$this->pos * 2 + 1]; |
155
|
|
|
} |
156
|
|
|
} |
157
|
|
|
|
158
|
|
View Code Duplication |
public function key() |
|
|
|
|
159
|
|
|
{ |
160
|
|
|
if (!is_array($this->result)) { |
161
|
|
|
return $this->pos === 0 ? 0 : null; |
162
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_DEFAULT) { |
163
|
|
|
return $this->pos; |
164
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_MESSAGE) { |
165
|
|
|
// channel |
166
|
|
|
return $this->result[1]; |
167
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ARGSVALS) { |
168
|
|
|
return $this->args[$this->pos]; |
169
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ASSOC) { |
170
|
|
|
return $this->result[$this->pos * 2]; |
171
|
|
|
} |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
public function next() |
175
|
|
|
{ |
176
|
|
|
++$this->pos; |
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
public function valid() |
180
|
|
|
{ |
181
|
|
|
if (!is_array($this->result)) { |
182
|
|
|
return false; |
183
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_DEFAULT) { |
184
|
|
|
return isset($this->result[$this->pos]); |
185
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ARGSVALS) { |
186
|
|
|
return isset($this->args[$this->pos]) && isset($this->result[$this->pos]); |
187
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_MESSAGE || $this->resultType === static::RESULT_TYPE_ASSOC) { |
188
|
|
|
return isset($this->result[$this->pos * 2 + 1]); |
189
|
|
|
} |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
/** |
193
|
|
|
* Get associative result |
194
|
|
|
* @param string $name |
195
|
|
|
* @return array |
|
|
|
|
196
|
|
|
*/ |
197
|
|
|
public function __get($name) |
198
|
|
|
{ |
199
|
|
|
if ($name === 'assoc') { |
200
|
|
|
if ($this->assocData === null) { |
201
|
|
|
if (!is_array($this->result) || empty($this->result)) { |
202
|
|
|
$this->assocData = []; |
203
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_MESSAGE) { |
204
|
|
|
$this->assocData = [$this->result[1] => $this->result[2]]; |
205
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ARGSVALS) { |
206
|
|
|
$hash = []; |
207
|
|
View Code Duplication |
for ($i = 0, $s = sizeof($this->result); $i < $s; ++$i) { |
|
|
|
|
208
|
|
|
$hash[$this->args[$i]] = $this->result[$i]; |
209
|
|
|
} |
210
|
|
|
$this->assocData = $hash; |
211
|
|
|
} elseif ($this->resultType === static::RESULT_TYPE_ASSOC) { |
212
|
|
|
$hash = []; |
213
|
|
View Code Duplication |
for ($i = 0, $s = sizeof($this->result) - 1; $i < $s; ++$i) { |
|
|
|
|
214
|
|
|
$hash[$this->result[$i]] = $this->result[++$i]; |
215
|
|
|
} |
216
|
|
|
$this->assocData = $hash; |
217
|
|
|
} else { |
218
|
|
|
$this->assocData = $this->result; |
219
|
|
|
} |
220
|
|
|
} |
221
|
|
|
return $this->assocData; |
222
|
|
|
} |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
/** |
226
|
|
|
* Converts array into hash |
227
|
|
|
* @param array $array |
228
|
|
|
* @return array $hash |
229
|
|
|
*/ |
230
|
|
|
public function arrayToHash($array) |
231
|
|
|
{ |
232
|
|
|
$hash = []; |
233
|
|
View Code Duplication |
for ($i = 0, $s = sizeof($array) - 1; $i < $s; ++$i) { |
|
|
|
|
234
|
|
|
$hash[$array[$i]] = $array[++$i]; |
235
|
|
|
} |
236
|
|
|
return $hash; |
237
|
|
|
} |
238
|
|
|
|
239
|
|
|
/** |
240
|
|
|
* @TODO |
241
|
|
|
* @param string $key |
242
|
|
|
* @param integer $timeout |
243
|
|
|
* @return Lock |
244
|
|
|
*/ |
245
|
|
|
public function lock($key, $timeout) |
246
|
|
|
{ |
247
|
|
|
return new Lock($key, $timeout, $this); |
|
|
|
|
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
/** |
251
|
|
|
* Easy wrapper for queue of eval's |
252
|
|
|
* @param callable $cb |
|
|
|
|
253
|
|
|
* @return MultiEval |
254
|
|
|
*/ |
255
|
|
|
public function meval($cb = null) |
256
|
|
|
{ |
257
|
|
|
return new MultiEval($cb, $this); |
|
|
|
|
258
|
|
|
} |
259
|
|
|
|
260
|
|
|
/** |
261
|
|
|
* Wrapper for scans commands |
262
|
|
|
* @param string $cmd Command |
263
|
|
|
* @param array $args Arguments |
264
|
|
|
* @param cllable $cbEnd Callback |
|
|
|
|
265
|
|
|
* @param integer $limit Limit |
|
|
|
|
266
|
|
|
* @return AutoScan |
267
|
|
|
*/ |
268
|
|
|
public function autoscan($cmd, $args = [], $cbEnd = null, $limit = null) |
269
|
|
|
{ |
270
|
|
|
return new AutoScan($this, $cmd, $args, $cbEnd, $limit); |
|
|
|
|
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
/** |
274
|
|
|
* @TODO |
275
|
|
|
* @param string $chan |
276
|
|
|
* @return integer |
277
|
|
|
*/ |
278
|
|
|
public function getLocalSubscribersCount($chan) |
279
|
|
|
{ |
280
|
|
|
if (!isset($this->subscribeCb[$chan])) { |
281
|
|
|
return 0; |
282
|
|
|
} |
283
|
|
|
return sizeof($this->subscribeCb[$chan]); |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
/** |
287
|
|
|
* Called when the connection is handshaked (at low-level), and peer is ready to recv. data |
288
|
|
|
* @return void |
289
|
|
|
*/ |
290
|
|
|
public function onReady() |
291
|
|
|
{ |
292
|
|
|
$this->ptr =& $this->result; |
293
|
|
|
if (!isset($this->password)) { |
294
|
|
|
if (isset($this->pool->config->select->value)) { |
295
|
|
|
$this->select($this->pool->config->select->value); |
|
|
|
|
296
|
|
|
} |
297
|
|
|
parent::onReady(); |
298
|
|
|
$this->setWatermark(null, $this->pool->maxAllowedPacket + 2); |
299
|
|
|
return; |
300
|
|
|
} |
301
|
|
|
$this->sendCommand('AUTH', [$this->password], function () { |
302
|
|
|
if ($this->result !== 'OK') { |
303
|
|
|
$this->log('Auth. error: ' . json_encode($this->result)); |
304
|
|
|
$this->finish(); |
305
|
|
|
} |
306
|
|
|
if (isset($this->pool->config->select->value)) { |
307
|
|
|
$this->select($this->pool->config->select->value); |
|
|
|
|
308
|
|
|
} |
309
|
|
|
parent::onReady(); |
310
|
|
|
$this->setWatermark(null, $this->pool->maxAllowedPacket + 2); |
311
|
|
|
}); |
312
|
|
|
} |
313
|
|
|
|
314
|
|
|
/** |
315
|
|
|
* Magic __call |
316
|
|
|
* Example: |
317
|
|
|
* $redis->lpush('mylist', microtime(true)); |
318
|
|
|
* @param sting $cmd |
319
|
|
|
* @param array $args |
320
|
|
|
* @return void |
321
|
|
|
*/ |
322
|
|
|
public function __call($cmd, $args) |
323
|
|
|
{ |
324
|
|
|
$cb = null; |
325
|
|
View Code Duplication |
for ($i = sizeof($args) - 1; $i >= 0; --$i) { |
|
|
|
|
326
|
|
|
$a = $args[$i]; |
327
|
|
|
if ((is_array($a) || is_object($a)) && is_callable($a)) { |
328
|
|
|
$cb = CallbackWrapper::wrap($a); |
329
|
|
|
$args = array_slice($args, 0, $i); |
330
|
|
|
break; |
331
|
|
|
} elseif ($a !== null) { |
332
|
|
|
break; |
333
|
|
|
} |
334
|
|
|
} |
335
|
|
|
$cmd = strtoupper($cmd); |
336
|
|
|
$this->command($cmd, $args, $cb); |
337
|
|
|
} |
338
|
|
|
|
339
|
|
|
/** |
340
|
|
|
* @TODO |
341
|
|
|
* @param string $name |
342
|
|
|
* @param array $args |
343
|
|
|
* @param callable $cb |
|
|
|
|
344
|
|
|
* @callback $cb ( ) |
345
|
|
|
* @return void |
346
|
|
|
*/ |
347
|
|
|
public function command($name, $args, $cb = null) |
348
|
|
|
{ |
349
|
|
|
if ($name === 'MULTI' || $name === 'WATCH') { |
350
|
|
|
$this->acquire(); |
351
|
|
|
} // PUB/SUB handling |
352
|
|
|
elseif (mb_orig_substr($name, -9) === 'SUBSCRIBE') { |
353
|
|
|
if (!$this->subscribed) { |
354
|
|
|
$this->subscribed = true; |
355
|
|
|
$this->pool->servConnSub[$this->url] = $this; |
356
|
|
|
$this->acquire(); |
357
|
|
|
$this->setTimeouts(86400, 86400); // @TODO: remove timeout |
358
|
|
|
} |
359
|
|
|
|
360
|
|
|
$opcb = null; |
361
|
|
View Code Duplication |
for ($i = sizeof($args) - 1; $i >= 0; --$i) { |
|
|
|
|
362
|
|
|
$a = $args[$i]; |
363
|
|
|
if ((is_array($a) || is_object($a)) && is_callable($a)) { |
364
|
|
|
$opcb = $cb; |
365
|
|
|
$cb = CallbackWrapper::wrap($a); |
366
|
|
|
$args = array_slice($args, 0, $i); |
367
|
|
|
break; |
368
|
|
|
} elseif ($a !== null) { |
369
|
|
|
break; |
370
|
|
|
} |
371
|
|
|
} |
372
|
|
|
} |
373
|
|
|
|
374
|
|
|
if ($name === 'SUBSCRIBE') { |
375
|
|
|
$this->subscribed(); |
|
|
|
|
376
|
|
|
$channels = []; |
377
|
|
View Code Duplication |
foreach ($args as $arg) { |
|
|
|
|
378
|
|
|
if (!is_array($arg)) { |
379
|
|
|
$arg = [$arg]; |
380
|
|
|
} |
381
|
|
|
foreach ($arg as $chan) { |
382
|
|
|
$b = !isset($this->subscribeCb[$chan]); |
383
|
|
|
CallbackWrapper::addToArray($this->subscribeCb[$chan], $cb); |
384
|
|
|
if ($b) { |
385
|
|
|
$channels[] = $chan; |
386
|
|
|
} else { |
387
|
|
|
if ($opcb !== null) { |
388
|
|
|
$opcb($this); |
|
|
|
|
389
|
|
|
} |
390
|
|
|
} |
391
|
|
|
} |
392
|
|
|
} |
393
|
|
|
if (sizeof($channels)) { |
394
|
|
|
$this->sendCommand($name, $channels, $opcb); |
395
|
|
|
} |
396
|
|
|
} elseif ($name === 'PSUBSCRIBE') { |
397
|
|
|
$this->subscribed(); |
|
|
|
|
398
|
|
|
$channels = []; |
399
|
|
View Code Duplication |
foreach ($args as $arg) { |
|
|
|
|
400
|
|
|
if (!is_array($arg)) { |
401
|
|
|
$arg = [$arg]; |
402
|
|
|
} |
403
|
|
|
foreach ($arg as $chan) { |
404
|
|
|
$b = !isset($this->psubscribeCb[$chan]); |
405
|
|
|
CallbackWrapper::addToArray($this->psubscribeCb[$chan], $cb); |
406
|
|
|
if ($b) { |
407
|
|
|
$channels[] = $chan; |
408
|
|
|
} else { |
409
|
|
|
if ($opcb !== null) { |
410
|
|
|
$opcb($this); |
411
|
|
|
} |
412
|
|
|
} |
413
|
|
|
} |
414
|
|
|
} |
415
|
|
|
if (sizeof($channels)) { |
416
|
|
|
$this->sendCommand($name, $channels, $opcb); |
417
|
|
|
} |
418
|
|
|
} elseif ($name === 'UNSUBSCRIBE') { |
419
|
|
|
$channels = []; |
420
|
|
|
foreach ($args as $arg) { |
421
|
|
|
if (!is_array($arg)) { |
422
|
|
|
$arg = [$arg]; |
423
|
|
|
} |
424
|
|
|
foreach ($arg as $chan) { |
425
|
|
|
if (!isset($this->subscribeCb[$chan])) { |
426
|
|
|
if ($opcb !== null) { |
427
|
|
|
$opcb($this); |
428
|
|
|
} |
429
|
|
|
return; |
430
|
|
|
} |
431
|
|
|
CallbackWrapper::removeFromArray($this->subscribeCb[$chan], $cb); |
432
|
|
View Code Duplication |
if (sizeof($this->subscribeCb[$chan]) === 0) { |
|
|
|
|
433
|
|
|
$channels[] = $chan; |
434
|
|
|
unset($this->subscribeCb[$chan]); |
435
|
|
|
} else { |
436
|
|
|
if ($opcb !== null) { |
437
|
|
|
$opcb($this); |
438
|
|
|
} |
439
|
|
|
} |
440
|
|
|
} |
441
|
|
|
} |
442
|
|
|
if (sizeof($channels)) { |
443
|
|
|
$this->sendCommand($name, $channels, $opcb); |
444
|
|
|
} |
445
|
|
View Code Duplication |
} elseif ($name === 'UNSUBSCRIBEREAL') { |
|
|
|
|
446
|
|
|
/* Race-condition-free UNSUBSCRIBE */ |
447
|
|
|
$old = $this->subscribeCb; |
448
|
|
|
$this->sendCommand('UNSUBSCRIBE', $args, function ($redis) use ($cb, $args, $old) { |
449
|
|
|
if (!$redis) { |
450
|
|
|
$cb($redis); |
451
|
|
|
return; |
452
|
|
|
} |
453
|
|
|
foreach ($args as $arg) { |
454
|
|
|
if (!isset($this->subscribeCb[$arg])) { |
455
|
|
|
continue; |
456
|
|
|
} |
457
|
|
|
foreach ($old[$arg] as $oldcb) { |
458
|
|
|
CallbackWrapper::removeFromArray($this->subscribeCb[$arg], $oldcb); |
459
|
|
|
} |
460
|
|
|
if (!sizeof($this->subscribeCb[$arg])) { |
461
|
|
|
unset($this->subscribeCb[$arg]); |
462
|
|
|
} |
463
|
|
|
} |
464
|
|
|
if ($cb !== null) { |
465
|
|
|
$cb($this); |
466
|
|
|
} |
467
|
|
|
}); |
468
|
|
|
} elseif ($name === 'PUNSUBSCRIBE') { |
469
|
|
|
$channels = []; |
470
|
|
|
foreach ($args as $arg) { |
471
|
|
|
if (!is_array($arg)) { |
472
|
|
|
$arg = [$arg]; |
473
|
|
|
} |
474
|
|
|
foreach ($arg as $chan) { |
475
|
|
|
CallbackWrapper::removeFromArray($this->psubscribeCb[$chan], $cb); |
476
|
|
View Code Duplication |
if (sizeof($this->psubscribeCb[$chan]) === 0) { |
|
|
|
|
477
|
|
|
$channels[] = $chan; |
478
|
|
|
unset($this->psubscribeCb[$chan]); |
479
|
|
|
} else { |
480
|
|
|
if ($opcb !== null) { |
481
|
|
|
$opcb($this); |
482
|
|
|
} |
483
|
|
|
} |
484
|
|
|
} |
485
|
|
|
} |
486
|
|
|
if (sizeof($channels)) { |
487
|
|
|
$this->sendCommand($name, $channels, $opcb); |
488
|
|
|
} |
489
|
|
View Code Duplication |
} elseif ($name === 'PUNSUBSCRIBEREAL') { |
|
|
|
|
490
|
|
|
/* Race-condition-free PUNSUBSCRIBE */ |
491
|
|
|
$old = $this->psubscribeCb; |
492
|
|
|
$this->sendCommand('PUNSUBSCRIBE', $args, function ($redis) use ($cb, $args, $old) { |
493
|
|
|
if (!$redis) { |
494
|
|
|
$cb($redis); |
495
|
|
|
return; |
496
|
|
|
} |
497
|
|
|
foreach ($args as $arg) { |
498
|
|
|
if (!isset($this->psubscribeCb[$arg])) { |
499
|
|
|
continue; |
500
|
|
|
} |
501
|
|
|
foreach ($old[$arg] as $oldcb) { |
502
|
|
|
CallbackWrapper::removeFromArray($this->psubscribeCb[$arg], $oldcb); |
503
|
|
|
} |
504
|
|
|
if (!sizeof($this->psubscribeCb[$arg])) { |
505
|
|
|
unset($this->psubscribeCb[$arg]); |
506
|
|
|
} |
507
|
|
|
} |
508
|
|
|
if ($cb !== null) { |
509
|
|
|
$cb($this); |
510
|
|
|
} |
511
|
|
|
}); |
512
|
|
|
} else { |
513
|
|
|
if ($name === 'MGET') { |
514
|
|
|
$this->resultTypeStack->push(static::RESULT_TYPE_ARGSVALS); |
515
|
|
|
$this->argsStack->push($args); |
516
|
|
|
} elseif ($name === 'HMGET') { |
517
|
|
|
$this->resultTypeStack->push(static::RESULT_TYPE_ARGSVALS); |
518
|
|
|
$a = $args; |
519
|
|
|
array_shift($a); |
520
|
|
|
$this->argsStack->push($a); |
521
|
|
|
} elseif ($name === 'HMSET') { |
522
|
|
|
if (sizeof($args) === 2) { |
523
|
|
|
if (is_array($args[1])) { |
524
|
|
|
$newArgs = [$args[0]]; |
525
|
|
|
foreach ($args[1] as $key => $value) { |
526
|
|
|
$newArgs[] = $key; |
527
|
|
|
$newArgs[] = $value; |
528
|
|
|
} |
529
|
|
|
$args = $newArgs; |
530
|
|
|
} |
531
|
|
|
} |
532
|
|
|
} elseif ($name === 'HGETALL') { |
533
|
|
|
$this->resultTypeStack->push(static::RESULT_TYPE_ASSOC); |
534
|
|
|
} elseif (($name === 'ZRANGE' || $name === 'ZRANGEBYSCORE' || $name === 'ZREVRANGE' || $name === 'ZREVRANGEBYSCORE') |
535
|
|
|
&& preg_grep('/WITHSCORES/i', $args) |
536
|
|
|
) { |
537
|
|
|
$this->resultTypeStack->push(static::RESULT_TYPE_ASSOC); |
538
|
|
|
} else { |
539
|
|
|
$this->resultTypeStack->push(static::RESULT_TYPE_DEFAULT); |
540
|
|
|
} |
541
|
|
|
|
542
|
|
|
$this->sendCommand($name, $args, $cb); |
543
|
|
|
|
544
|
|
|
if ($name === 'EXEC' || $name === 'DISCARD') { |
545
|
|
|
$this->release(); |
546
|
|
|
} |
547
|
|
|
} |
548
|
|
|
} |
549
|
|
|
|
550
|
|
|
/** |
551
|
|
|
* @TODO |
552
|
|
|
* @param string $name |
553
|
|
|
* @param array $args |
554
|
|
|
* @param callable $cb |
|
|
|
|
555
|
|
|
* @callback $cb ( ) |
556
|
|
|
* @return void |
557
|
|
|
*/ |
558
|
|
|
public function sendCommand($name, $args, $cb = null) |
559
|
|
|
{ |
560
|
|
|
if ($name === 'MULTI') { |
561
|
|
|
$this->onResponse(null); |
|
|
|
|
562
|
|
|
} else { |
563
|
|
|
$this->onResponse($cb); |
|
|
|
|
564
|
|
|
} |
565
|
|
|
if (!is_array($args)) { |
566
|
|
|
$args = [$args]; |
567
|
|
|
} |
568
|
|
|
array_unshift($args, $name); |
569
|
|
|
$this->writeln('*' . sizeof($args)); |
570
|
|
|
foreach ($args as $arg) { |
571
|
|
|
$this->writeln('$' . mb_orig_strlen($arg) . $this->EOL . $arg); |
572
|
|
|
} |
573
|
|
|
if ($name === 'MULTI') { |
574
|
|
|
if ($cb !== null) { |
575
|
|
|
$cb($this); |
576
|
|
|
} |
577
|
|
|
} |
578
|
|
|
} |
579
|
|
|
|
580
|
|
|
/** |
581
|
|
|
* Called when connection finishes |
582
|
|
|
* @return void |
583
|
|
|
*/ |
584
|
|
|
public function onFinish() |
585
|
|
|
{ |
586
|
|
|
parent::onFinish(); |
587
|
|
|
if ($this->subscribed) { |
588
|
|
|
unset($this->pool->servConnSub[$this->url]); |
589
|
|
|
} |
590
|
|
|
/* we should reassign subscriptions */ |
591
|
|
|
foreach ($this->subscribeCb as $sub => $cbs) { |
592
|
|
|
foreach ($cbs as $cb) { |
593
|
|
|
$this->pool->subscribe($sub, $cb); |
594
|
|
|
} |
595
|
|
|
} |
596
|
|
|
foreach ($this->psubscribeCb as $sub => $cbs) { |
597
|
|
|
foreach ($cbs as $cb) { |
598
|
|
|
$this->pool->psubscribe($sub, $cb); |
599
|
|
|
} |
600
|
|
|
} |
601
|
|
|
} |
602
|
|
|
|
603
|
|
|
protected function onPacket() |
604
|
|
|
{ |
605
|
|
|
$this->result = $this->ptr; |
606
|
|
|
if (!$this->subscribed) { |
607
|
|
|
$this->resultType = !$this->resultTypeStack->isEmpty() ? $this->resultTypeStack->shift() : static::RESULT_TYPE_DEFAULT; |
608
|
|
View Code Duplication |
if ($this->resultType === static::RESULT_TYPE_ARGSVALS) { |
|
|
|
|
609
|
|
|
$this->args = !$this->argsStack->isEmpty() ? $this->argsStack->shift() : []; |
610
|
|
|
} |
611
|
|
|
$this->onResponse->executeOne($this); |
612
|
|
|
goto clean; |
613
|
|
|
} elseif ($this->result[0] === 'message') { |
614
|
|
|
$t = &$this->subscribeCb; |
615
|
|
|
} elseif ($this->result[0] === 'pmessage') { |
616
|
|
|
$t = &$this->psubscribeCb; |
617
|
|
|
} else { |
618
|
|
|
$this->resultType = !$this->resultTypeStack->isEmpty() ? $this->resultTypeStack->shift() : static::RESULT_TYPE_DEFAULT; |
619
|
|
View Code Duplication |
if ($this->resultType === static::RESULT_TYPE_ARGSVALS) { |
|
|
|
|
620
|
|
|
$this->args = !$this->argsStack->isEmpty() ? $this->argsStack->shift() : []; |
621
|
|
|
} |
622
|
|
|
$this->onResponse->executeOne($this); |
623
|
|
|
goto clean; |
624
|
|
|
} |
625
|
|
|
if (isset($t[$this->result[1]])) { |
626
|
|
|
$this->resultType = static::RESULT_TYPE_MESSAGE; |
|
|
|
|
627
|
|
|
$this->channel = $this->result[1]; |
628
|
|
|
$this->msg = $this->result[2]; |
629
|
|
|
foreach ($t[$this->result[1]] as $cb) { |
630
|
|
|
if (is_callable($cb)) { |
631
|
|
|
$cb($this); |
632
|
|
|
} |
633
|
|
|
} |
634
|
|
|
} elseif ($this->pool->config->logpubsubracecondition->value) { |
635
|
|
|
Daemon::log('[Redis client]' . ': PUB/SUB race condition at channel ' . Debug::json($this->result[1])); |
636
|
|
|
} |
637
|
|
|
clean: |
638
|
|
|
$this->args = null; |
639
|
|
|
$this->result = null; |
640
|
|
|
$this->channel = null; |
641
|
|
|
$this->msg = null; |
642
|
|
|
$this->error = false; |
|
|
|
|
643
|
|
|
$this->pos = 0; |
644
|
|
|
$this->resultType = static::RESULT_TYPE_DEFAULT; |
|
|
|
|
645
|
|
|
$this->assocData = null; |
646
|
|
|
if (!isset($t)) { |
647
|
|
|
$this->checkFree(); |
648
|
|
|
} |
649
|
|
|
} |
650
|
|
|
|
651
|
|
|
/** |
652
|
|
|
* @TODO |
653
|
|
|
* @param mixed $val |
654
|
|
|
* @return void |
655
|
|
|
*/ |
656
|
|
|
public function pushValue($val) |
657
|
|
|
{ |
658
|
|
|
if (is_array($this->ptr)) { |
659
|
|
|
$this->ptr[] = $val; |
660
|
|
|
} else { |
661
|
|
|
$this->ptr = $val; |
662
|
|
|
} |
663
|
|
|
start: |
664
|
|
|
$size = is_array($this->ptr) ? sizeof($this->ptr) : 1; |
665
|
|
|
if ($size < $this->levelLength) { |
666
|
|
|
return; |
667
|
|
|
} |
668
|
|
|
array_pop($this->stack); |
669
|
|
|
if (!sizeof($this->stack)) { |
670
|
|
|
$this->levelLength = null; |
671
|
|
|
$this->onPacket(); |
672
|
|
|
$this->ptr =& $dummy; |
|
|
|
|
673
|
|
|
$this->ptr = null; |
674
|
|
|
return; |
675
|
|
|
} |
676
|
|
|
|
677
|
|
|
$this->ptr =& $dummy; |
678
|
|
|
|
679
|
|
|
list($this->ptr, $this->levelLength) = end($this->stack); |
680
|
|
|
|
681
|
|
|
goto start; |
682
|
|
|
} |
683
|
|
|
|
684
|
|
|
/** |
685
|
|
|
* Called when new data received |
686
|
|
|
* @return void |
687
|
|
|
*/ |
688
|
|
|
protected function onRead() |
689
|
|
|
{ |
690
|
|
|
start: |
691
|
|
|
if ($this->state === static::STATE_STANDBY) { // outside of packet |
692
|
|
|
while (($l = $this->readline()) !== null) { |
693
|
|
|
if ($l === '') { |
694
|
|
|
continue; |
695
|
|
|
} |
696
|
|
|
$char = $l[0]; |
697
|
|
|
$val = mb_orig_substr($l, 1); |
698
|
|
|
if ($char === ':') { // inline integer |
699
|
|
|
$this->pushValue((int)$val); |
700
|
|
|
goto start; |
701
|
|
|
} elseif (($char === '+') || ($char === '-')) { // inline string |
702
|
|
|
$this->error = $char === '-'; |
|
|
|
|
703
|
|
|
$this->pushValue($val); |
704
|
|
|
goto start; |
705
|
|
|
} elseif ($char === '*') { // defines number of elements of incoming array |
706
|
|
|
$length = (int)$val; |
707
|
|
|
if ($length <= 0) { |
708
|
|
|
$this->pushValue([]); |
709
|
|
|
goto start; |
710
|
|
|
} |
711
|
|
|
|
712
|
|
|
$ptr = []; |
713
|
|
|
|
714
|
|
|
if (is_array($this->ptr)) { |
715
|
|
|
$this->ptr[] =& $ptr; |
716
|
|
|
} |
717
|
|
|
|
718
|
|
|
$this->ptr =& $ptr; |
719
|
|
|
$this->stack[] = [&$ptr, $length]; |
720
|
|
|
$this->levelLength = $length; |
721
|
|
|
unset($ptr); |
722
|
|
|
|
723
|
|
|
goto start; |
724
|
|
|
} elseif ($char === '$') { // defines size of the data block |
725
|
|
|
if ($l[1]=== '-') { |
726
|
|
|
$this->pushValue(null); |
727
|
|
|
goto start; |
728
|
|
|
} |
729
|
|
|
$this->valueLength = (int)$val; |
730
|
|
|
if ($this->valueLength + 2 > $this->pool->maxAllowedPacket) { |
731
|
|
|
$this->log('max-allowed-packet (' . $this->pool->config->maxallowedpacket->getHumanValue() . ') exceed, aborting connection'); |
732
|
|
|
$this->finish(); |
733
|
|
|
return; |
734
|
|
|
} |
735
|
|
|
$this->setWatermark($this->valueLength + 2); |
736
|
|
|
$this->state = static::STATE_BINARY; // binary data block |
737
|
|
|
break; // stop reading line-by-line |
738
|
|
|
} |
739
|
|
|
} |
740
|
|
|
} |
741
|
|
|
|
742
|
|
|
if ($this->state === static::STATE_BINARY) { // inside of binary data block |
743
|
|
|
if ($this->getInputLength() < $this->valueLength + 2) { |
744
|
|
|
return; //we do not have a whole packet |
745
|
|
|
} |
746
|
|
|
$value = $this->read($this->valueLength); |
747
|
|
|
if ($this->read(2) !== $this->EOL) { |
748
|
|
|
$this->finish(); |
749
|
|
|
return; |
750
|
|
|
} |
751
|
|
|
$this->state = static::STATE_STANDBY; |
752
|
|
|
$this->setWatermark(3); |
753
|
|
|
$this->pushValue($value); |
754
|
|
|
goto start; |
755
|
|
|
} |
756
|
|
|
} |
757
|
|
|
} |
758
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.