Completed
Push — master ( a3573e...7dad0f )
by Biao
05:09 queued 30s
created

Server::bindWebSocketEvent()   C

Complexity

Conditions 7
Paths 2

Size

Total Lines 29
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 21
nc 2
nop 0
dl 0
loc 29
rs 6.7272
c 0
b 0
f 0
1
<?php
2
3
namespace Hhxsv5\LaravelS\Swoole;
4
5
use Hhxsv5\LaravelS\Swoole\Socket\PortInterface;
6
use Hhxsv5\LaravelS\Swoole\Socket\TcpSocket;
7
use Hhxsv5\LaravelS\Swoole\Socket\UdpSocket;
8
use Hhxsv5\LaravelS\Swoole\Task\Event;
9
use Hhxsv5\LaravelS\Swoole\Task\Listener;
10
use Hhxsv5\LaravelS\Swoole\Task\Task;
11
use Hhxsv5\LaravelS\Swoole\Traits\LogTrait;
12
use Hhxsv5\LaravelS\Swoole\Traits\ProcessTitleTrait;
13
14
class Server
15
{
16
    use LogTrait;
17
    use ProcessTitleTrait;
18
19
    protected $conf;
20
21
    /**
22
     * @var \swoole_http_server
23
     */
24
    protected $swoole;
25
26
    protected $enableWebSocket = false;
27
28
    protected $attachedSockets = [];
29
30
    protected function __construct(array $conf)
31
    {
32
        $this->conf = $conf;
33
        $this->enableWebSocket = !empty($this->conf['websocket']['enable']);
34
        $this->attachedSockets = empty($this->conf['sockets']) ? [] : $this->conf['sockets'];
35
36
        $ip = isset($conf['listen_ip']) ? $conf['listen_ip'] : '127.0.0.1';
37
        $port = isset($conf['listen_port']) ? $conf['listen_port'] : 5200;
38
        $settings = isset($conf['swoole']) ? $conf['swoole'] : [];
39
        $settings['enable_static_handler'] = !empty($conf['handle_static']);
40
41
        $serverClass = $this->enableWebSocket ? \swoole_websocket_server::class : \swoole_http_server::class;
42
        if (isset($settings['ssl_cert_file'], $settings['ssl_key_file'])) {
43
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS, \SWOOLE_SOCK_TCP | \SWOOLE_SSL);
44
        } else {
45
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS);
46
        }
47
48
        $this->swoole->set($settings);
49
50
        $this->bindBaseEvent();
51
        $this->bindHttpEvent();
52
        $this->bindTaskEvent();
53
        $this->bindWebSocketEvent();
54
        $this->bindAttachedSockets();
55
        $this->bindSwooleTables();
56
    }
57
58
    protected function bindBaseEvent()
59
    {
60
        $this->swoole->on('Start', [$this, 'onStart']);
61
        $this->swoole->on('Shutdown', [$this, 'onShutdown']);
62
        $this->swoole->on('ManagerStart', [$this, 'onManagerStart']);
63
        $this->swoole->on('ManagerStop', [$this, 'onManagerStop']);
64
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
65
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
66
        $this->swoole->on('WorkerError', [$this, 'onWorkerError']);
67
    }
68
69
    protected function bindHttpEvent()
70
    {
71
        $this->swoole->on('Request', [$this, 'onRequest']);
72
    }
73
74
    protected function bindTaskEvent()
75
    {
76
        if (!empty($this->conf['swoole']['task_worker_num'])) {
77
            $this->swoole->on('Task', [$this, 'onTask']);
78
            $this->swoole->on('Finish', [$this, 'onFinish']);
79
        }
80
    }
81
82
    protected function bindWebSocketEvent()
83
    {
84
        if ($this->enableWebSocket) {
85
            $this->swoole->on('Open', function (\swoole_websocket_server $server, \swoole_http_request $request) {
86
                $handler = $this->getWebSocketHandler();
87
                try {
88
                    $handler->onOpen($server, $request);
89
                } catch (\Exception $e) {
90
                    $this->logException($e);
91
                }
92
            });
93
94
            $this->swoole->on('Message', function (\swoole_websocket_server $server, \swoole_websocket_frame $frame) {
95
                $handler = $this->getWebSocketHandler();
96
                try {
97
                    $handler->onMessage($server, $frame);
98
                } catch (\Exception $e) {
99
                    $this->logException($e);
100
                }
101
            });
102
103
            $this->swoole->on('Close', function (\swoole_websocket_server $server, $fd, $reactorId) {
104
                $clientInfo = $server->getClientInfo($fd);
105
                if (isset($clientInfo['websocket_status']) && $clientInfo['websocket_status'] === \WEBSOCKET_STATUS_FRAME) {
106
                    $handler = $this->getWebSocketHandler();
107
                    try {
108
                        $handler->onClose($server, $fd, $reactorId);
109
                    } catch (\Exception $e) {
110
                        $this->logException($e);
111
                    }
112
                }
113
                // else ignore the close event for http server
114
            });
115
        }
116
    }
117
118
    protected function bindAttachedSockets()
119
    {
120
        foreach ($this->attachedSockets as $socket) {
121
            $port = $this->swoole->addListener($socket['host'], $socket['port'], $socket['type']);
122
            if (!($port instanceof \swoole_server_port)) {
123
                $errno = method_exists($this->swoole, 'getLastError') ? $this->swoole->getLastError() : 'unknown';
124
                $errstr = sprintf('listen %s:%s failed: errno=%s', $socket['host'], $socket['port'], $errno);
125
                $this->log($errstr, 'ERROR');
126
                continue;
127
            }
128
129
            $port->set(empty($socket['settings']) ? [] : $socket['settings']);
130
131
            $handlerClass = $socket['handler'];
132
            $eventHandler = function ($method, array $params) use ($port, $handlerClass) {
133
                $handler = $this->getSocketHandler($port, $handlerClass);
134
                if (method_exists($handler, $method)) {
135
                    try {
136
                        call_user_func_array([$handler, $method], $params);
137
                    } catch (\Exception $e) {
138
                        $this->logException($e);
139
                    }
140
                }
141
            };
142
            static $events = [
143
                'Open',
144
                'Request',
145
                'Message',
146
                'Connect',
147
                'Close',
148
                'Receive',
149
                'Packet',
150
                'BufferFull',
151
                'BufferEmpty',
152
            ];
153
            foreach ($events as $event) {
154
                $port->on($event, function () use ($event, $eventHandler) {
155
                    $eventHandler('on' . $event, func_get_args());
156
                });
157
            }
158
        }
159
    }
160
161
    protected function getWebSocketHandler()
162
    {
163
        static $handler = null;
164
        if ($handler !== null) {
165
            return $handler;
166
        }
167
168
        $handlerClass = $this->conf['websocket']['handler'];
169
        $t = new $handlerClass();
170
        if (!($t instanceof WebsocketHandlerInterface)) {
171
            throw new \Exception(sprintf('%s must implement the interface %s', get_class($t), WebsocketHandlerInterface::class));
172
        }
173
        $handler = $t;
174
        return $handler;
175
    }
176
177
    protected function getSocketHandler(\swoole_server_port $port, $handlerClass)
178
    {
179
        static $handlers = [];
180
        $portHash = spl_object_hash($port);
181
        if (isset($handlers[$portHash])) {
182
            return $handlers[$portHash];
183
        }
184
        $t = new $handlerClass($port);
185
        if (!($t instanceof PortInterface)) {
186
            throw new \Exception(sprintf('%s must extend the abstract class TcpSocket/UdpSocket', get_class($t)));
187
        }
188
        $handlers[$portHash] = $t;
189
        return $handlers[$portHash];
190
    }
191
192
    protected function bindSwooleTables()
193
    {
194
        $tables = isset($this->conf['swoole_tables']) ? (array)$this->conf['swoole_tables'] : [];
195
        foreach ($tables as $name => $table) {
196
            $t = new \swoole_table($table['size']);
197
            foreach ($table['column'] as $column) {
198
                if (isset($column['size'])) {
199
                    $t->column($column['name'], $column['type'], $column['size']);
200
                } else {
201
                    $t->column($column['name'], $column['type']);
202
                }
203
            }
204
            $t->create();
205
            $name .= 'Table'; // Avoid naming conflicts
206
            $this->swoole->$name = $t;
207
        }
208
    }
209
210
    public function onStart(\swoole_http_server $server)
211
    {
212
        foreach (spl_autoload_functions() as $function) {
213
            spl_autoload_unregister($function);
214
        }
215
216
        $this->setProcessTitle(sprintf('%s laravels: master process', $this->conf['process_prefix']));
217
218
        if (version_compare(\swoole_version(), '1.9.5', '<')) {
219
            file_put_contents($this->conf['swoole']['pid_file'], $server->master_pid);
220
        }
221
    }
222
223
    public function onShutdown(\swoole_http_server $server)
224
    {
225
226
    }
227
228
    public function onManagerStart(\swoole_http_server $server)
229
    {
230
        $this->setProcessTitle(sprintf('%s laravels: manager process', $this->conf['process_prefix']));
231
    }
232
233
    public function onManagerStop(\swoole_http_server $server)
234
    {
235
236
    }
237
238
    public function onWorkerStart(\swoole_http_server $server, $workerId)
239
    {
240
        if ($workerId >= $server->setting['worker_num']) {
241
            $process = 'task worker';
242
        } else {
243
            $process = 'worker';
244
        }
245
        $this->setProcessTitle(sprintf('%s laravels: %s process %d', $this->conf['process_prefix'], $process, $workerId));
246
247
        if (function_exists('opcache_reset')) {
248
            opcache_reset();
249
        }
250
        if (function_exists('apc_clear_cache')) {
251
            apc_clear_cache();
252
        }
253
254
        clearstatcache();
255
    }
256
257
    public function onWorkerStop(\swoole_http_server $server, $workerId)
258
    {
259
260
    }
261
262
    public function onWorkerError(\swoole_http_server $server, $workerId, $workerPId, $exitCode, $signal)
263
    {
264
        $this->log(sprintf('worker[%d] error: exitCode=%s, signal=%s', $workerId, $exitCode, $signal), 'ERROR');
265
    }
266
267
    public function onRequest(\swoole_http_request $request, \swoole_http_response $response)
268
    {
269
270
    }
271
272
    public function onTask(\swoole_http_server $server, $taskId, $srcWorkerId, $data)
273
    {
274
        if ($data instanceof Event) {
275
            $this->handleEvent($data);
276
        } elseif ($data instanceof Task) {
277
            $this->handleTask($data);
278
            if (method_exists($data, 'finish')) {
279
                return $data;
280
            }
281
        }
282
    }
283
284
    public function onFinish(\swoole_http_server $server, $taskId, $data)
285
    {
286
        if ($data instanceof Task) {
287
            $data->/** @scrutinizer ignore-call */
288
            finish();
289
        }
290
    }
291
292
    protected function handleEvent(Event $event)
293
    {
294
        $eventClass = get_class($event);
295
        if (!isset($this->conf['events'][$eventClass])) {
296
            return;
297
        }
298
299
        $listenerClasses = $this->conf['events'][$eventClass];
300
        if (!is_array($listenerClasses)) {
301
            $listenerClasses = (array)$listenerClasses;
302
        }
303
        foreach ($listenerClasses as $listenerClass) {
304
            /**
305
             * @var Listener $listener
306
             */
307
            $listener = new $listenerClass();
308
            if (!($listener instanceof Listener)) {
309
                throw new \Exception(sprintf('%s must extend the abstract class %s', $listenerClass, Listener::class));
310
            }
311
            try {
312
                $listener->handle($event);
313
            } catch (\Exception $e) {
314
                $this->logException($e);
315
            }
316
        }
317
    }
318
319
    protected function handleTask(Task $task)
320
    {
321
        try {
322
            $task->handle();
323
        } catch (\Exception $e) {
324
            $this->logException($e);
325
        }
326
    }
327
328
    public function run()
329
    {
330
        $this->swoole->start();
331
    }
332
}
333