1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\Loop\Model; |
4
|
|
|
|
5
|
|
|
use Dazzle\Loop\Flow\FlowController; |
6
|
|
|
use Dazzle\Loop\Tick\TickContinousQueue; |
7
|
|
|
use Dazzle\Loop\Tick\TickFiniteQueue; |
8
|
|
|
use Dazzle\Loop\Timer\Timer; |
9
|
|
|
use Dazzle\Loop\Timer\TimerBox; |
10
|
|
|
use Dazzle\Loop\Timer\TimerInterface; |
11
|
|
|
use Dazzle\Loop\LoopModelInterface; |
12
|
|
|
|
13
|
|
|
class SelectLoop implements LoopModelInterface |
14
|
|
|
{ |
15
|
|
|
/** |
16
|
|
|
* @var int |
17
|
|
|
*/ |
18
|
|
|
const MICROSECONDS_PER_SECOND = 1e6; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var TickContinousQueue |
22
|
|
|
*/ |
23
|
|
|
protected $startTickQueue; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var TickContinousQueue |
27
|
|
|
*/ |
28
|
|
|
protected $stopTickQueue; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var TickContinousQueue |
32
|
|
|
*/ |
33
|
|
|
protected $nextTickQueue; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @var TickFiniteQueue |
37
|
|
|
*/ |
38
|
|
|
protected $futureTickQueue; |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* @var FlowController |
42
|
|
|
*/ |
43
|
|
|
protected $flowController; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* @var TimerBox |
47
|
|
|
*/ |
48
|
|
|
protected $timers; |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @var resource[] |
52
|
|
|
*/ |
53
|
|
|
protected $readStreams = []; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @var callable[] |
57
|
|
|
*/ |
58
|
|
|
protected $readListeners = []; |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @var resource[] |
62
|
|
|
*/ |
63
|
|
|
protected $writeStreams = []; |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* @var callable[] |
67
|
|
|
*/ |
68
|
|
|
protected $writeListeners = []; |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* |
72
|
|
|
*/ |
73
|
55 |
|
public function __construct() |
74
|
|
|
{ |
75
|
55 |
|
$this->startTickQueue = new TickContinousQueue($this); |
76
|
55 |
|
$this->stopTickQueue = new TickContinousQueue($this); |
77
|
55 |
|
$this->nextTickQueue = new TickContinousQueue($this); |
78
|
55 |
|
$this->futureTickQueue = new TickFiniteQueue($this); |
79
|
55 |
|
$this->flowController = new FlowController(); |
80
|
55 |
|
$this->timers = new TimerBox(); |
81
|
55 |
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* |
85
|
|
|
*/ |
86
|
3 |
|
public function __destruct() |
87
|
|
|
{ |
88
|
3 |
|
unset($this->startTickQueue); |
89
|
3 |
|
unset($this->stopTickQueue); |
90
|
3 |
|
unset($this->nextTickQueue); |
91
|
3 |
|
unset($this->futureTickQueue); |
92
|
3 |
|
unset($this->flowController); |
93
|
3 |
|
unset($this->timers); |
94
|
3 |
|
unset($this->readStreams); |
95
|
3 |
|
unset($this->readListeners); |
96
|
3 |
|
unset($this->writeStreams); |
97
|
3 |
|
unset($this->writeListeners); |
98
|
3 |
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @override |
102
|
|
|
* @inheritDoc |
103
|
|
|
*/ |
104
|
12 |
|
public function isRunning() |
105
|
|
|
{ |
106
|
12 |
|
return isset($this->flowController->isRunning) ? $this->flowController->isRunning : false; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* @override |
111
|
|
|
* @inheritDoc |
112
|
|
|
*/ |
113
|
14 |
View Code Duplication |
public function addReadStream($stream, callable $listener) |
|
|
|
|
114
|
|
|
{ |
115
|
14 |
|
$key = (int) $stream; |
116
|
|
|
|
117
|
14 |
|
if (!isset($this->readStreams[$key])) |
118
|
|
|
{ |
119
|
14 |
|
$this->readStreams[$key] = $stream; |
120
|
14 |
|
$this->readListeners[$key] = $listener; |
121
|
|
|
} |
122
|
14 |
|
} |
123
|
|
|
|
124
|
|
|
/** |
125
|
|
|
* @override |
126
|
|
|
* @inheritDoc |
127
|
|
|
*/ |
128
|
14 |
View Code Duplication |
public function addWriteStream($stream, callable $listener) |
|
|
|
|
129
|
|
|
{ |
130
|
14 |
|
$key = (int) $stream; |
131
|
|
|
|
132
|
14 |
|
if (!isset($this->writeStreams[$key])) |
133
|
|
|
{ |
134
|
14 |
|
$this->writeStreams[$key] = $stream; |
135
|
14 |
|
$this->writeListeners[$key] = $listener; |
136
|
|
|
} |
137
|
14 |
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* @override |
141
|
|
|
* @inheritDoc |
142
|
|
|
*/ |
143
|
10 |
|
public function removeReadStream($stream) |
144
|
|
|
{ |
145
|
10 |
|
$key = (int) $stream; |
146
|
|
|
|
147
|
|
|
unset( |
148
|
10 |
|
$this->readStreams[$key], |
149
|
10 |
|
$this->readListeners[$key] |
150
|
|
|
); |
151
|
10 |
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* @override |
155
|
|
|
* @inheritDoc |
156
|
|
|
*/ |
157
|
14 |
|
public function removeWriteStream($stream) |
158
|
|
|
{ |
159
|
14 |
|
$key = (int) $stream; |
160
|
|
|
|
161
|
|
|
unset( |
162
|
14 |
|
$this->writeStreams[$key], |
163
|
14 |
|
$this->writeListeners[$key] |
164
|
|
|
); |
165
|
14 |
|
} |
166
|
|
|
|
167
|
|
|
/** |
168
|
|
|
* @override |
169
|
|
|
* @inheritDoc |
170
|
|
|
*/ |
171
|
6 |
|
public function removeStream($stream) |
172
|
|
|
{ |
173
|
6 |
|
$this->removeReadStream($stream); |
174
|
6 |
|
$this->removeWriteStream($stream); |
175
|
6 |
|
} |
176
|
|
|
|
177
|
|
|
/** |
178
|
|
|
* @override |
179
|
|
|
* @inheritDoc |
180
|
|
|
*/ |
181
|
8 |
|
public function addTimer($interval, callable $callback) |
182
|
|
|
{ |
183
|
8 |
|
$timer = new Timer($this, $interval, $callback, false); |
184
|
|
|
|
185
|
8 |
|
$this->timers->add($timer); |
186
|
|
|
|
187
|
8 |
|
return $timer; |
188
|
|
|
} |
189
|
|
|
|
190
|
|
|
/** |
191
|
|
|
* @override |
192
|
|
|
* @inheritDoc |
193
|
|
|
*/ |
194
|
10 |
|
public function addPeriodicTimer($interval, callable $callback) |
195
|
|
|
{ |
196
|
10 |
|
$timer = new Timer($this, $interval, $callback, true); |
197
|
|
|
|
198
|
10 |
|
$this->timers->add($timer); |
199
|
|
|
|
200
|
10 |
|
return $timer; |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
/** |
204
|
|
|
* @override |
205
|
|
|
* @inheritDoc |
206
|
|
|
*/ |
207
|
4 |
|
public function cancelTimer(TimerInterface $timer) |
208
|
|
|
{ |
209
|
4 |
|
if (isset($this->timers)) |
210
|
|
|
{ |
211
|
4 |
|
$this->timers->remove($timer); |
212
|
|
|
} |
213
|
4 |
|
} |
214
|
|
|
|
215
|
|
|
/** |
216
|
|
|
* @override |
217
|
|
|
* @inheritDoc |
218
|
|
|
*/ |
219
|
4 |
|
public function isTimerActive(TimerInterface $timer) |
220
|
|
|
{ |
221
|
4 |
|
return $this->timers->contains($timer); |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @override |
226
|
|
|
* @inheritDoc |
227
|
|
|
*/ |
228
|
2 |
|
public function onStart(callable $listener) |
229
|
|
|
{ |
230
|
2 |
|
$this->startTickQueue->add($listener); |
231
|
2 |
|
} |
232
|
|
|
|
233
|
|
|
/** |
234
|
|
|
* @override |
235
|
|
|
* @inheritDoc |
236
|
|
|
*/ |
237
|
2 |
|
public function onStop(callable $listener) |
238
|
|
|
{ |
239
|
2 |
|
$this->stopTickQueue->add($listener); |
240
|
2 |
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @override |
244
|
|
|
* @inheritDoc |
245
|
|
|
*/ |
246
|
8 |
|
public function onBeforeTick(callable $listener) |
247
|
|
|
{ |
248
|
8 |
|
$this->nextTickQueue->add($listener); |
249
|
8 |
|
} |
250
|
|
|
|
251
|
|
|
/** |
252
|
|
|
* @override |
253
|
|
|
* @inheritDoc |
254
|
|
|
*/ |
255
|
14 |
|
public function onAfterTick(callable $listener) |
256
|
|
|
{ |
257
|
14 |
|
$this->futureTickQueue->add($listener); |
258
|
14 |
|
} |
259
|
|
|
|
260
|
|
|
/** |
261
|
|
|
* @override |
262
|
|
|
* @inheritDoc |
263
|
|
|
*/ |
264
|
28 |
|
public function tick() |
265
|
|
|
{ |
266
|
28 |
|
$this->flowController->isRunning = true; |
267
|
|
|
|
268
|
28 |
|
$this->nextTickQueue->tick(); |
269
|
28 |
|
$this->futureTickQueue->tick(); |
270
|
28 |
|
$this->timers->tick(); |
271
|
28 |
|
$this->waitForStreamActivity(0); |
272
|
|
|
|
273
|
28 |
|
$this->flowController->isRunning = false; |
274
|
28 |
|
} |
275
|
|
|
|
276
|
|
|
/** |
277
|
|
|
* @override |
278
|
|
|
* @inheritDoc |
279
|
|
|
*/ |
280
|
10 |
|
public function start() |
281
|
|
|
{ |
282
|
10 |
|
if ($this->flowController->isRunning) |
283
|
|
|
{ |
284
|
|
|
return; |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
// TODO KRF-107 |
288
|
10 |
|
$this->addPeriodicTimer(1, function() { |
289
|
|
|
usleep(1); |
290
|
10 |
|
}); |
291
|
|
|
|
292
|
10 |
|
$this->flowController->isRunning = true; |
293
|
10 |
|
$this->startTickQueue->tick(); |
294
|
|
|
|
295
|
10 |
|
while ($this->flowController->isRunning) |
296
|
|
|
{ |
297
|
10 |
|
$this->nextTickQueue->tick(); |
298
|
|
|
|
299
|
10 |
|
$this->futureTickQueue->tick(); |
300
|
|
|
|
301
|
10 |
|
$this->timers->tick(); |
302
|
|
|
|
303
|
|
|
// Next-tick or future-tick queues have pending callbacks ... |
304
|
10 |
|
if (!$this->flowController->isRunning || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) |
305
|
|
|
{ |
306
|
10 |
|
$timeout = 0; |
307
|
|
|
} |
308
|
|
|
// There is a pending timer, only block until it is due ... |
309
|
4 |
|
else if ($scheduledAt = $this->timers->getFirst()) |
310
|
|
|
{ |
311
|
4 |
|
$timeout = $scheduledAt - $this->timers->getTime(); |
312
|
4 |
|
$timeout = ($timeout < 0) ? 0 : $timeout * self::MICROSECONDS_PER_SECOND; |
313
|
|
|
} |
314
|
|
|
// The only possible event is stream activity, so wait forever ... |
315
|
|
|
else if ($this->readStreams || $this->writeStreams) |
|
|
|
|
316
|
|
|
{ |
317
|
|
|
$timeout = null; |
318
|
|
|
} |
319
|
|
|
// There's nothing left to do ... |
320
|
|
|
else |
321
|
|
|
{ |
322
|
|
|
break; |
323
|
|
|
} |
324
|
|
|
|
325
|
10 |
|
$this->waitForStreamActivity($timeout); |
326
|
|
|
} |
327
|
10 |
|
} |
328
|
|
|
|
329
|
|
|
/** |
330
|
|
|
* @override |
331
|
|
|
* @inheritDoc |
332
|
|
|
*/ |
333
|
14 |
|
public function stop() |
334
|
|
|
{ |
335
|
14 |
|
if (!$this->flowController->isRunning) |
336
|
|
|
{ |
337
|
4 |
|
return; |
338
|
|
|
} |
339
|
|
|
|
340
|
10 |
|
$this->stopTickQueue->tick(); |
341
|
10 |
|
$this->flowController->isRunning = false; |
342
|
10 |
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* @override |
346
|
|
|
* @inheritDoc |
347
|
|
|
*/ |
348
|
2 |
|
public function setFlowController($flowController) |
349
|
|
|
{ |
350
|
2 |
|
$this->flowController = $flowController; |
351
|
2 |
|
} |
352
|
|
|
|
353
|
|
|
/** |
354
|
|
|
* @override |
355
|
|
|
* @inheritDoc |
356
|
|
|
*/ |
357
|
4 |
|
public function getFlowController() |
358
|
|
|
{ |
359
|
4 |
|
return $this->flowController; |
360
|
|
|
} |
361
|
|
|
|
362
|
|
|
/** |
363
|
|
|
* @override |
364
|
|
|
* @inheritDoc |
365
|
|
|
*/ |
366
|
4 |
|
public function erase($all = false) |
367
|
|
|
{ |
368
|
4 |
|
$this->stop(); |
369
|
4 |
|
$loop = new static(); |
370
|
|
|
|
371
|
4 |
|
$list = $all === true ? $this : $this->getTransferableProperties(); |
372
|
4 |
|
foreach ($list as $key=>$val) |
|
|
|
|
373
|
|
|
{ |
374
|
4 |
|
$this->$key = $loop->$key; |
375
|
|
|
} |
376
|
|
|
|
377
|
4 |
|
$this->flowController->isRunning = false; |
378
|
|
|
|
379
|
4 |
|
return $this; |
380
|
|
|
} |
381
|
|
|
|
382
|
|
|
/** |
383
|
|
|
* @override |
384
|
|
|
* @inheritDoc |
385
|
|
|
*/ |
386
|
|
View Code Duplication |
public function export(LoopModelInterface $loop, $all = false) |
|
|
|
|
387
|
|
|
{ |
388
|
|
|
$this->stop(); |
389
|
|
|
$loop->stop(); |
390
|
|
|
|
391
|
|
|
$list = $all === true ? $this : $this->getTransferableProperties(); |
392
|
|
|
foreach ($list as $key=>$val) |
|
|
|
|
393
|
|
|
{ |
394
|
|
|
$loop->$key = $this->$key; |
395
|
|
|
} |
396
|
|
|
|
397
|
|
|
return $this; |
398
|
|
|
} |
399
|
|
|
|
400
|
|
|
/** |
401
|
|
|
* @override |
402
|
|
|
* @inheritDoc |
403
|
|
|
*/ |
404
|
|
View Code Duplication |
public function import(LoopModelInterface $loop, $all = false) |
|
|
|
|
405
|
|
|
{ |
406
|
|
|
$this->stop(); |
407
|
|
|
$loop->stop(); |
408
|
|
|
|
409
|
|
|
$list = $all === true ? $this : $this->getTransferableProperties(); |
410
|
|
|
foreach ($list as $key=>$val) |
|
|
|
|
411
|
|
|
{ |
412
|
|
|
$this->$key = $loop->$key; |
413
|
|
|
} |
414
|
|
|
|
415
|
|
|
return $this; |
416
|
|
|
} |
417
|
|
|
|
418
|
|
|
/** |
419
|
|
|
* @override |
420
|
|
|
* @inheritDoc |
421
|
|
|
*/ |
422
|
|
|
public function swap(LoopModelInterface $loop, $all = false) |
423
|
|
|
{ |
424
|
|
|
$this->stop(); |
425
|
|
|
$loop->stop(); |
426
|
|
|
|
427
|
|
|
$list = $all === true ? $this : $this->getTransferableProperties(); |
428
|
|
|
foreach ($list as $key=>$val) |
|
|
|
|
429
|
|
|
{ |
430
|
|
|
$tmp = $loop->$key; |
431
|
|
|
$loop->$key = $this->$key; |
432
|
|
|
$this->$key = $tmp; |
433
|
|
|
} |
434
|
|
|
|
435
|
|
|
return $this; |
436
|
|
|
} |
437
|
|
|
|
438
|
|
|
/** |
439
|
|
|
* Wait/check for stream activity, or until the next timer is due. |
440
|
|
|
* |
441
|
|
|
* @param float $timeout |
442
|
|
|
*/ |
443
|
38 |
|
private function waitForStreamActivity($timeout) |
444
|
|
|
{ |
445
|
38 |
|
$read = $this->readStreams; |
446
|
38 |
|
$write = $this->writeStreams; |
447
|
|
|
|
448
|
38 |
|
if ($this->streamSelect($read, $write, $timeout) === false) |
449
|
|
|
{ |
450
|
|
|
return; |
451
|
|
|
} |
452
|
|
|
|
453
|
38 |
View Code Duplication |
foreach ($read as $stream) |
|
|
|
|
454
|
|
|
{ |
455
|
8 |
|
$key = (int) $stream; |
456
|
|
|
|
457
|
8 |
|
if (isset($this->readListeners[$key])) |
458
|
|
|
{ |
459
|
8 |
|
$callable = $this->readListeners[$key]; |
460
|
8 |
|
$callable($stream, $this); |
461
|
|
|
} |
462
|
|
|
} |
463
|
|
|
|
464
|
38 |
View Code Duplication |
foreach ($write as $stream) |
|
|
|
|
465
|
|
|
{ |
466
|
8 |
|
$key = (int) $stream; |
467
|
|
|
|
468
|
8 |
|
if (isset($this->writeListeners[$key])) |
469
|
|
|
{ |
470
|
8 |
|
$callable = $this->writeListeners[$key]; |
471
|
8 |
|
$callable($stream, $this); |
472
|
|
|
} |
473
|
|
|
} |
474
|
38 |
|
} |
475
|
|
|
|
476
|
|
|
/** |
477
|
|
|
* Emulate a stream_select() implementation that does not break when passed empty stream arrays. |
478
|
|
|
* |
479
|
|
|
* @param array &$read |
480
|
|
|
* @param array &$write |
481
|
|
|
* @param integer|null $timeout |
482
|
|
|
* |
483
|
|
|
* @return integer The total number of streams that are ready for read/write. |
484
|
|
|
*/ |
485
|
38 |
|
private function streamSelect(array &$read, array &$write, $timeout) |
486
|
|
|
{ |
487
|
38 |
|
if ($read || $write) |
|
|
|
|
488
|
|
|
{ |
489
|
12 |
|
$except = null; |
490
|
|
|
|
491
|
12 |
|
return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout); |
492
|
|
|
} |
493
|
|
|
|
494
|
32 |
|
usleep($timeout); |
495
|
|
|
|
496
|
32 |
|
return 0; |
497
|
|
|
} |
498
|
|
|
|
499
|
|
|
/** |
500
|
|
|
* Get list of properties that can be exported/imported safely. |
501
|
|
|
* |
502
|
|
|
* @return array |
503
|
|
|
*/ |
504
|
2 |
|
private function getTransferableProperties() |
505
|
|
|
{ |
506
|
|
|
return [ |
507
|
2 |
|
'nextTickQueue' => null, |
508
|
|
|
'futureTickQueue' => null, |
509
|
|
|
'flowController' => null |
510
|
|
|
]; |
511
|
|
|
} |
512
|
|
|
} |
513
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.