Passed
Push — master ( d82381...085bb4 )
by Biao
04:48
created

Server::onStart()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nc 4
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Hhxsv5\LaravelS\Swoole;
4
5
use Hhxsv5\LaravelS\Swoole\Socket\PortInterface;
6
use Hhxsv5\LaravelS\Swoole\Task\Event;
7
use Hhxsv5\LaravelS\Swoole\Task\Listener;
8
use Hhxsv5\LaravelS\Swoole\Task\Task;
9
use Hhxsv5\LaravelS\Swoole\Traits\LogTrait;
10
use Hhxsv5\LaravelS\Swoole\Traits\ProcessTitleTrait;
11
12
class Server
13
{
14
    use LogTrait;
15
    use ProcessTitleTrait;
16
17
    protected $conf;
18
19
    /**
20
     * @var \swoole_http_server|\swoole_websocket_server
21
     */
22
    protected $swoole;
23
24
    protected $enableWebSocket = false;
25
26
    protected $attachedSockets = [];
27
28
    protected function __construct(array $conf)
29
    {
30
        $this->conf = $conf;
31
        $this->enableWebSocket = !empty($this->conf['websocket']['enable']);
32
        $this->attachedSockets = empty($this->conf['sockets']) ? [] : $this->conf['sockets'];
33
        if (isset($this->conf['event_handlers'])) {
34
            $this->conf['event_handlers'] = array_change_key_case($this->conf['event_handlers'], CASE_LOWER);
35
        }
36
37
        $ip = isset($conf['listen_ip']) ? $conf['listen_ip'] : '127.0.0.1';
38
        $port = isset($conf['listen_port']) ? $conf['listen_port'] : 5200;
39
        $socketType = isset($conf['socket_type']) ? (int)$conf['socket_type'] : \SWOOLE_SOCK_TCP;
40
41
        if ($socketType === \SWOOLE_SOCK_UNIX_STREAM) {
42
            $socketDir = dirname($ip);
43
            if (!file_exists($socketDir)) {
44
                mkdir($socketDir);
45
            }
46
        }
47
48
        $settings = isset($conf['swoole']) ? $conf['swoole'] : [];
49
        $settings['enable_static_handler'] = !empty($conf['handle_static']);
50
51
        $serverClass = $this->enableWebSocket ? \swoole_websocket_server::class : \swoole_http_server::class;
52
        if (isset($settings['ssl_cert_file'], $settings['ssl_key_file'])) {
53
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS, $socketType | \SWOOLE_SSL);
54
        } else {
55
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS, $socketType);
56
        }
57
58
        $this->swoole->set($settings);
59
60
        $this->bindBaseEvent();
61
        $this->bindHttpEvent();
62
        $this->bindTaskEvent();
63
        $this->bindWebSocketEvent();
64
        $this->bindAttachedSockets();
65
        $this->bindSwooleTables();
66
    }
67
68
    protected function bindBaseEvent()
69
    {
70
        $this->swoole->on('Start', [$this, 'onStart']);
71
        $this->swoole->on('Shutdown', [$this, 'onShutdown']);
72
        $this->swoole->on('ManagerStart', [$this, 'onManagerStart']);
73
        $this->swoole->on('ManagerStop', [$this, 'onManagerStop']);
74
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
75
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
76
        $this->swoole->on('WorkerError', [$this, 'onWorkerError']);
77
        $this->swoole->on('PipeMessage', [$this, 'onPipeMessage']);
78
    }
79
80
    protected function bindHttpEvent()
81
    {
82
        $this->swoole->on('Request', [$this, 'onRequest']);
83
    }
84
85
    protected function bindTaskEvent()
86
    {
87
        if (!empty($this->conf['swoole']['task_worker_num'])) {
88
            $this->swoole->on('Task', [$this, 'onTask']);
89
            $this->swoole->on('Finish', [$this, 'onFinish']);
90
        }
91
    }
92
93
    protected function bindWebSocketEvent()
94
    {
95
        if ($this->enableWebSocket) {
96
            $eventHandler = function ($method, array $params) {
97
                $this->callWithCatchException(function () use ($method, $params) {
98
                    call_user_func_array([$this->getWebSocketHandler(), $method], $params);
99
                });
100
            };
101
102
            $this->swoole->on('Open', function () use ($eventHandler) {
103
                $eventHandler('onOpen', func_get_args());
104
            });
105
106
            $this->swoole->on('Message', function () use ($eventHandler) {
107
                $eventHandler('onMessage', func_get_args());
108
            });
109
110
            $this->swoole->on('Close', function (\swoole_websocket_server $server, $fd, $reactorId) use ($eventHandler) {
111
                $clientInfo = $server->getClientInfo($fd);
112
                if (isset($clientInfo['websocket_status']) && $clientInfo['websocket_status'] === \WEBSOCKET_STATUS_FRAME) {
113
                    $eventHandler('onClose', func_get_args());
114
                }
115
                // else ignore the close event for http server
116
            });
117
        }
118
    }
119
120
    protected function bindAttachedSockets()
121
    {
122
        foreach ($this->attachedSockets as $socket) {
123
            $port = $this->swoole->addListener($socket['host'], $socket['port'], $socket['type']);
124
            if (!($port instanceof \swoole_server_port)) {
125
                $errno = method_exists($this->swoole, 'getLastError') ? $this->swoole->getLastError() : 'unknown';
126
                $errstr = sprintf('listen %s:%s failed: errno=%s', $socket['host'], $socket['port'], $errno);
127
                $this->error($errstr);
128
                continue;
129
            }
130
131
            $port->set(empty($socket['settings']) ? [] : $socket['settings']);
132
133
            $handlerClass = $socket['handler'];
134
            $eventHandler = function ($method, array $params) use ($port, $handlerClass) {
135
                $handler = $this->getSocketHandler($port, $handlerClass);
136
                if (method_exists($handler, $method)) {
137
                    $this->callWithCatchException(function () use ($handler, $method, $params) {
138
                        call_user_func_array([$handler, $method], $params);
139
                    });
140
                }
141
            };
142
            static $events = [
143
                'Open',
144
                'Request',
145
                'Message',
146
                'Connect',
147
                'Close',
148
                'Receive',
149
                'Packet',
150
                'BufferFull',
151
                'BufferEmpty',
152
            ];
153
            foreach ($events as $event) {
154
                $port->on($event, function () use ($event, $eventHandler) {
155
                    $eventHandler('on' . $event, func_get_args());
156
                });
157
            }
158
        }
159
    }
160
161
    protected function getWebSocketHandler()
162
    {
163
        static $handler = null;
164
        if ($handler !== null) {
165
            return $handler;
166
        }
167
168
        $handlerClass = $this->conf['websocket']['handler'];
169
        $t = new $handlerClass();
170
        if (!($t instanceof WebSocketHandlerInterface)) {
171
            throw new \Exception(sprintf('%s must implement the interface %s', get_class($t), WebSocketHandlerInterface::class));
172
        }
173
        $handler = $t;
174
        return $handler;
175
    }
176
177
    protected function getSocketHandler(\swoole_server_port $port, $handlerClass)
178
    {
179
        static $handlers = [];
180
        $portHash = spl_object_hash($port);
181
        if (isset($handlers[$portHash])) {
182
            return $handlers[$portHash];
183
        }
184
        $t = new $handlerClass($port);
185
        if (!($t instanceof PortInterface)) {
186
            throw new \Exception(sprintf('%s must extend the abstract class TcpSocket/UdpSocket', get_class($t)));
187
        }
188
        $handlers[$portHash] = $t;
189
        return $handlers[$portHash];
190
    }
191
192
    protected function bindSwooleTables()
193
    {
194
        $tables = isset($this->conf['swoole_tables']) ? (array)$this->conf['swoole_tables'] : [];
195
        foreach ($tables as $name => $table) {
196
            $t = new \swoole_table($table['size']);
197
            foreach ($table['column'] as $column) {
198
                if (isset($column['size'])) {
199
                    $t->column($column['name'], $column['type'], $column['size']);
200
                } else {
201
                    $t->column($column['name'], $column['type']);
202
                }
203
            }
204
            $t->create();
205
            $name .= 'Table'; // Avoid naming conflicts
206
            $this->swoole->$name = $t;
207
        }
208
    }
209
210
    public function onStart(\swoole_http_server $server)
211
    {
212
        foreach (spl_autoload_functions() as $function) {
213
            spl_autoload_unregister($function);
214
        }
215
216
        $this->setProcessTitle(sprintf('%s laravels: master process', $this->conf['process_prefix']));
217
218
        if (version_compare(\swoole_version(), '1.9.5', '<')) {
219
            file_put_contents($this->conf['swoole']['pid_file'], $server->master_pid);
220
        }
221
    }
222
223
    public function onShutdown(\swoole_http_server $server)
224
    {
225
226
    }
227
228
    public function onManagerStart(\swoole_http_server $server)
229
    {
230
        $this->setProcessTitle(sprintf('%s laravels: manager process', $this->conf['process_prefix']));
231
    }
232
233
    public function onManagerStop(\swoole_http_server $server)
234
    {
235
236
    }
237
238
    public function onWorkerStart(\swoole_http_server $server, $workerId)
239
    {
240
        if ($workerId >= $server->setting['worker_num']) {
241
            $process = 'task worker';
242
        } else {
243
            $process = 'worker';
244
            if (!empty($this->conf['enable_coroutine_runtime'])) {
245
                \Swoole\Runtime::enableCoroutine();
246
            }
247
        }
248
        $this->setProcessTitle(sprintf('%s laravels: %s process %d', $this->conf['process_prefix'], $process, $workerId));
249
250
        if (function_exists('opcache_reset')) {
251
            opcache_reset();
252
        }
253
        if (function_exists('apc_clear_cache')) {
254
            apc_clear_cache();
255
        }
256
257
        clearstatcache();
258
    }
259
260
    public function onWorkerStop(\swoole_http_server $server, $workerId)
261
    {
262
263
    }
264
265
    public function onWorkerError(\swoole_http_server $server, $workerId, $workerPId, $exitCode, $signal)
266
    {
267
        $this->error(sprintf('worker[%d] error: exitCode=%s, signal=%s', $workerId, $exitCode, $signal));
268
    }
269
270
    public function onPipeMessage(\swoole_http_server $server, $srcWorkerId, $message)
271
    {
272
        if ($message instanceof Task) {
273
            $this->onTask($server, uniqid('', true), $srcWorkerId, $message);
274
        }
275
    }
276
277
    public function onRequest(\swoole_http_request $request, \swoole_http_response $response)
278
    {
279
280
    }
281
282
    public function onTask(\swoole_http_server $server, $taskId, $srcWorkerId, $data)
283
    {
284
        if ($data instanceof Event) {
285
            $this->handleEvent($data);
286
        } elseif ($data instanceof Task) {
287
            if ($this->handleTask($data) && method_exists($data, 'finish')) {
288
                return $data;
289
            }
290
        }
291
    }
292
293
    public function onFinish(\swoole_http_server $server, $taskId, $data)
294
    {
295
        if ($data instanceof Task) {
296
            $data->finish();
0 ignored issues
show
Bug introduced by
The method finish() does not exist on Hhxsv5\LaravelS\Swoole\Task\Task. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

296
            $data->/** @scrutinizer ignore-call */ 
297
                   finish();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
297
        }
298
    }
299
300
    protected function handleEvent(Event $event)
301
    {
302
        $eventClass = get_class($event);
303
        if (!isset($this->conf['events'][$eventClass])) {
304
            return;
305
        }
306
307
        $listenerClasses = $this->conf['events'][$eventClass];
308
        if (!is_array($listenerClasses)) {
309
            $listenerClasses = (array)$listenerClasses;
310
        }
311
        foreach ($listenerClasses as $listenerClass) {
312
            /**
313
             * @var Listener $listener
314
             */
315
            $listener = new $listenerClass();
316
            if (!($listener instanceof Listener)) {
317
                throw new \Exception(sprintf('%s must extend the abstract class %s', $listenerClass, Listener::class));
318
            }
319
            $this->callWithCatchException(function () use ($listener, $event) {
320
                $listener->handle($event);
321
            });
322
        }
323
    }
324
325
    protected function handleTask(Task $task)
326
    {
327
        return $this->callWithCatchException(function () use ($task) {
328
            $task->handle();
329
            return true;
330
        });
331
    }
332
333
    protected function fireEvent($event, $interface, array $arguments)
334
    {
335
        $event = strtolower($event);
336
        if (isset($this->conf['event_handlers'][$event])) {
337
            $eventHandler = $this->conf['event_handlers'][$event];
338
            if (!isset(class_implements($eventHandler)[$interface])) {
339
                throw new \Exception(sprintf(
340
                        '%s must implement the interface %s',
341
                        $eventHandler,
342
                        $interface
343
                    )
344
                );
345
            }
346
            $this->callWithCatchException(function () use ($eventHandler, $arguments) {
347
                call_user_func_array([(new $eventHandler), 'handle'], $arguments);
348
            });
349
        }
350
    }
351
352
    public function run()
353
    {
354
        $this->swoole->start();
355
    }
356
}
357