1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Thruster\Component\EventLoop; |
4
|
|
|
|
5
|
|
|
use Ev; |
6
|
|
|
use EvIo; |
7
|
|
|
use EvLoop; |
8
|
|
|
use EvTimer; |
9
|
|
|
use EvChild; |
10
|
|
|
use EvSignal; |
11
|
|
|
use SplObjectStorage; |
12
|
|
|
|
13
|
|
|
/** |
14
|
|
|
* Class EventLoop |
15
|
|
|
* |
16
|
|
|
* @package Thruster\Component\EventLoop |
17
|
|
|
* @author Aurimas Niekis <[email protected]> |
18
|
|
|
*/ |
19
|
|
|
class EventLoop implements EventLoopInterface |
20
|
|
|
{ |
21
|
|
|
/** |
22
|
|
|
* @var EvLoop |
23
|
|
|
*/ |
24
|
|
|
protected $loop; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var SplObjectStorage|EvTimer[] |
28
|
|
|
*/ |
29
|
|
|
protected $timers; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var SplObjectStorage|EvSignal[] |
33
|
|
|
*/ |
34
|
|
|
protected $signals; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var SplObjectStorage|EvChild[] |
38
|
|
|
*/ |
39
|
|
|
protected $children; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var bool |
43
|
|
|
*/ |
44
|
|
|
protected $running; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var EvIo[] |
48
|
|
|
*/ |
49
|
|
|
private $readEvents; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var EvIo[] |
53
|
|
|
*/ |
54
|
|
|
private $writeEvents; |
55
|
|
|
|
56
|
|
|
public function __construct() |
57
|
|
|
{ |
58
|
|
|
$this->loop = EvLoop::defaultLoop(); |
59
|
|
|
$this->timers = new SplObjectStorage(); |
60
|
|
|
$this->signals = new SplObjectStorage(); |
61
|
|
|
$this->children = new SplObjectStorage(); |
62
|
|
|
$this->readEvents = []; |
63
|
|
|
$this->writeEvents = []; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @inheritdoc |
68
|
|
|
*/ |
69
|
|
|
public function addReadStream($stream, callable $listener) : self |
70
|
|
|
{ |
71
|
|
|
$this->addStream($stream, $listener, Ev::READ); |
72
|
|
|
|
73
|
|
|
return $this; |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @inheritdoc |
78
|
|
|
*/ |
79
|
|
|
public function addWriteStream($stream, callable $listener) : self |
80
|
|
|
{ |
81
|
|
|
$this->addStream($stream, $listener, Ev::WRITE); |
82
|
|
|
|
83
|
|
|
return $this; |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* @inheritdoc |
88
|
|
|
*/ |
89
|
|
View Code Duplication |
public function removeReadStream($stream) : self |
|
|
|
|
90
|
|
|
{ |
91
|
|
|
$key = (int) $stream; |
92
|
|
|
if (isset($this->readEvents[$key])) { |
93
|
|
|
$this->readEvents[$key]->stop(); |
94
|
|
|
unset($this->readEvents[$key]); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
return $this; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @inheritdoc |
102
|
|
|
*/ |
103
|
|
View Code Duplication |
public function removeWriteStream($stream) : self |
|
|
|
|
104
|
|
|
{ |
105
|
|
|
$key = (int) $stream; |
106
|
|
|
if (isset($this->writeEvents[$key])) { |
107
|
|
|
$this->writeEvents[$key]->stop(); |
108
|
|
|
unset($this->writeEvents[$key]); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
return $this; |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
/** |
115
|
|
|
* @inheritdoc |
116
|
|
|
*/ |
117
|
|
|
public function removeStream($stream) : self |
118
|
|
|
{ |
119
|
|
|
$this->removeReadStream($stream); |
120
|
|
|
$this->removeWriteStream($stream); |
121
|
|
|
|
122
|
|
|
return $this; |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
/** |
126
|
|
|
* @inheritdoc |
127
|
|
|
*/ |
128
|
|
|
public function addStream($stream, callable $listener, $flags) : self |
129
|
|
|
{ |
130
|
|
|
$listener = function ($event) use ($stream, $listener) { |
|
|
|
|
131
|
|
|
call_user_func($listener, $stream, $this); |
132
|
|
|
}; |
133
|
|
|
|
134
|
|
|
$event = $this->loop->io($stream, $flags, $listener); |
135
|
|
|
|
136
|
|
|
if (($flags & \Ev::READ) === $flags) { |
137
|
|
|
$this->readEvents[(int)$stream] = $event; |
138
|
|
|
} elseif (($flags & \Ev::WRITE) === $flags) { |
139
|
|
|
$this->writeEvents[(int)$stream] = $event; |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
return $this; |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* @inheritdoc |
147
|
|
|
*/ |
148
|
|
View Code Duplication |
public function addTimer($interval, callable $callback, int $priority = 0) : Timer |
|
|
|
|
149
|
|
|
{ |
150
|
|
|
$timer = new Timer($this, $interval, $callback, false, $priority); |
151
|
|
|
|
152
|
|
|
$callback = function ($evTimer) use ($timer) { |
153
|
|
|
call_user_func($timer->getCallback(), $timer, $evTimer); |
154
|
|
|
|
155
|
|
|
if ($this->isTimerActive($timer)) { |
156
|
|
|
$this->cancelTimer($timer); |
157
|
|
|
} |
158
|
|
|
}; |
159
|
|
|
|
160
|
|
|
$event = $this->loop->timer($interval, 0, $callback, null, $priority); |
161
|
|
|
|
162
|
|
|
$this->timers->attach($timer, $event); |
163
|
|
|
|
164
|
|
|
return $timer; |
165
|
|
|
} |
166
|
|
|
|
167
|
|
|
/** |
168
|
|
|
* @inheritdoc |
169
|
|
|
*/ |
170
|
|
View Code Duplication |
public function addPeriodicTimer($interval, callable $callback, int $priority = 0) : Timer |
|
|
|
|
171
|
|
|
{ |
172
|
|
|
$timer = new Timer($this, $interval, $callback, true, $priority); |
173
|
|
|
|
174
|
|
|
$internalCallback = function ($evTimer) use ($timer) { |
175
|
|
|
call_user_func($timer->getCallback(), $timer, $evTimer); |
176
|
|
|
}; |
177
|
|
|
|
178
|
|
|
$event = $this->loop->periodic( |
179
|
|
|
$timer->getInterval(), |
180
|
|
|
$timer->getInterval(), |
181
|
|
|
null, |
182
|
|
|
$internalCallback, |
183
|
|
|
null, |
184
|
|
|
$priority |
185
|
|
|
); |
186
|
|
|
|
187
|
|
|
$this->timers->attach($timer, $event); |
188
|
|
|
|
189
|
|
|
return $timer; |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
/** |
193
|
|
|
* @inheritdoc |
194
|
|
|
*/ |
195
|
|
|
public function cancelTimer(Timer $timer) : self |
196
|
|
|
{ |
197
|
|
|
if (isset($this->timers[$timer])) { |
198
|
|
|
$this->timers[$timer]->stop(); |
199
|
|
|
$this->timers->detach($timer); |
200
|
|
|
} |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
/** |
204
|
|
|
* @inheritdoc |
205
|
|
|
*/ |
206
|
|
|
public function isTimerActive(Timer $timer) : bool |
207
|
|
|
{ |
208
|
|
|
return $this->timers->contains($timer); |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* @inheritdoc |
213
|
|
|
*/ |
214
|
|
View Code Duplication |
public function addSignal(int $signalNo, callable $callback, int $priority = 0) : Signal |
|
|
|
|
215
|
|
|
{ |
216
|
|
|
$signal = new Signal($this, $signalNo, $callback, $priority); |
217
|
|
|
|
218
|
|
|
$internalCallback = function ($evSignal) use ($signal) { |
219
|
|
|
call_user_func($signal->getCallback(), $signal, $evSignal); |
220
|
|
|
}; |
221
|
|
|
|
222
|
|
|
$event = $this->loop->signal($signalNo, $internalCallback, null, $priority); |
223
|
|
|
|
224
|
|
|
$this->signals->attach($signal, $event); |
225
|
|
|
|
226
|
|
|
return $this; |
227
|
|
|
} |
228
|
|
|
|
229
|
|
|
/** |
230
|
|
|
* @inheritdoc |
231
|
|
|
*/ |
232
|
|
|
public function cancelSignal(Signal $signal) : self |
233
|
|
|
{ |
234
|
|
|
if (isset($this->signals[$signal])) { |
235
|
|
|
$this->signals[$signal]->stop(); |
236
|
|
|
$this->signals->detach($signal); |
237
|
|
|
} |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
/** |
241
|
|
|
* @inheritdoc |
242
|
|
|
*/ |
243
|
|
|
public function isSignalActive(Signal $signal) : bool |
244
|
|
|
{ |
245
|
|
|
return $this->signals->contains($signal); |
246
|
|
|
} |
247
|
|
|
|
248
|
|
|
/** |
249
|
|
|
* @inheritdoc |
250
|
|
|
*/ |
251
|
|
View Code Duplication |
public function addChild(callable $callback, int $pid = 0, int $priority = 0) : Child |
|
|
|
|
252
|
|
|
{ |
253
|
|
|
$child = new Child($this, $pid, $callback, $priority); |
254
|
|
|
|
255
|
|
|
$internalCallback = function ($evChild) use ($child) { |
256
|
|
|
call_user_func($child->getCallback(), $child, $evChild); |
257
|
|
|
}; |
258
|
|
|
|
259
|
|
|
$event = $this->loop->child($pid, false, $internalCallback, null, $priority); |
260
|
|
|
|
261
|
|
|
$this->signals->attach($child, $event); |
262
|
|
|
|
263
|
|
|
return $this; |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
/** |
267
|
|
|
* @inheritdoc |
268
|
|
|
*/ |
269
|
|
|
public function cancelChild(Child $child) : self |
270
|
|
|
{ |
271
|
|
|
if (isset($this->children[$child])) { |
272
|
|
|
$this->children[$child]->stop(); |
273
|
|
|
$this->children->detach($child); |
274
|
|
|
} |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
/** |
278
|
|
|
* @inheritdoc |
279
|
|
|
*/ |
280
|
|
|
public function isChildActive(Child $child) : bool |
281
|
|
|
{ |
282
|
|
|
return $this->children->contains($child); |
283
|
|
|
} |
284
|
|
|
|
285
|
|
|
/** |
286
|
|
|
* @inheritdoc |
287
|
|
|
*/ |
288
|
|
|
public function afterFork() : self |
289
|
|
|
{ |
290
|
|
|
$this->loop->loopFork(); |
291
|
|
|
|
292
|
|
|
return $this; |
293
|
|
|
} |
294
|
|
|
|
295
|
|
|
/** |
296
|
|
|
* {@inheritDoc} |
297
|
|
|
*/ |
298
|
|
|
public function tick() : self |
299
|
|
|
{ |
300
|
|
|
$this->loop->run(Ev::RUN_ONCE | Ev::RUN_NOWAIT); |
301
|
|
|
} |
302
|
|
|
|
303
|
|
|
/** |
304
|
|
|
* @inheritdoc |
305
|
|
|
*/ |
306
|
|
|
public function run() : self |
307
|
|
|
{ |
308
|
|
|
$this->running = true; |
309
|
|
|
|
310
|
|
|
while ($this->running) { |
311
|
|
|
$flags = Ev::RUN_ONCE; |
312
|
|
|
|
313
|
|
|
if ($this->timers->count() < 1 && |
314
|
|
|
$this->signals->count() < 1 && |
315
|
|
|
$this->children->count() < 1 && |
316
|
|
|
count($this->readEvents) < 1 && |
317
|
|
|
count($this->writeEvents) < 1 |
318
|
|
|
) { |
319
|
|
|
break; |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
$this->loop->run($flags); |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
return $this; |
326
|
|
|
} |
327
|
|
|
|
328
|
|
|
/** |
329
|
|
|
* @inheritdoc |
330
|
|
|
*/ |
331
|
|
|
public function stop() : self |
332
|
|
|
{ |
333
|
|
|
$this->running = false; |
334
|
|
|
|
335
|
|
|
return $this; |
336
|
|
|
} |
337
|
|
|
} |
338
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.