Completed
Push — master ( 96c5d9...aaff1c )
by Biao
04:31
created

Server::bindHttpEvent()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Hhxsv5\LaravelS\Swoole;
4
5
use Hhxsv5\LaravelS\Swoole\Socket\PortInterface;
6
use Hhxsv5\LaravelS\Swoole\Task\Event;
7
use Hhxsv5\LaravelS\Swoole\Task\Listener;
8
use Hhxsv5\LaravelS\Swoole\Task\Task;
9
use Hhxsv5\LaravelS\Swoole\Traits\LogTrait;
10
use Hhxsv5\LaravelS\Swoole\Traits\ProcessTitleTrait;
11
12
class Server
13
{
14
    use LogTrait;
15
    use ProcessTitleTrait;
16
17
    protected $conf;
18
19
    /**
20
     * @var \swoole_http_server|\swoole_websocket_server
21
     */
22
    protected $swoole;
23
24
    protected $enableWebSocket = false;
25
26
    protected $attachedSockets = [];
27
28
    protected function __construct(array $conf)
29
    {
30
        $this->conf = $conf;
31
        $this->enableWebSocket = !empty($this->conf['websocket']['enable']);
32
        $this->attachedSockets = empty($this->conf['sockets']) ? [] : $this->conf['sockets'];
33
34
        $ip = isset($conf['listen_ip']) ? $conf['listen_ip'] : '127.0.0.1';
35
        $port = isset($conf['listen_port']) ? $conf['listen_port'] : 5200;
36
        $socketType = isset($conf['socket_type']) ? $conf['socket_type'] : \SWOOLE_SOCK_TCP;
37
38
        if ($socketType === \SWOOLE_SOCK_UNIX_STREAM) {
39
            $socketDir = dirname($ip);
40
            if (!file_exists($socketDir)) {
41
                mkdir($socketDir);
42
            }
43
        }
44
45
        $settings = isset($conf['swoole']) ? $conf['swoole'] : [];
46
        $settings['enable_static_handler'] = !empty($conf['handle_static']);
47
48
        $serverClass = $this->enableWebSocket ? \swoole_websocket_server::class : \swoole_http_server::class;
49
        if (isset($settings['ssl_cert_file'], $settings['ssl_key_file'])) {
50
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS, $socketType | \SWOOLE_SSL);
51
        } else {
52
            $this->swoole = new $serverClass($ip, $port, \SWOOLE_PROCESS, $socketType);
53
        }
54
55
        $this->swoole->set($settings);
56
57
        $this->bindBaseEvent();
58
        $this->bindHttpEvent();
59
        $this->bindTaskEvent();
60
        $this->bindWebSocketEvent();
61
        $this->bindAttachedSockets();
62
        $this->bindSwooleTables();
63
64
        if (!empty($conf['enable_coroutine'])) {
65
            \Swoole\Runtime::enableCoroutine();
0 ignored issues
show
Bug introduced by
The type Swoole\Runtime was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
66
        }
67
    }
68
69
    protected function bindBaseEvent()
70
    {
71
        $this->swoole->on('Start', [$this, 'onStart']);
72
        $this->swoole->on('Shutdown', [$this, 'onShutdown']);
73
        $this->swoole->on('ManagerStart', [$this, 'onManagerStart']);
74
        $this->swoole->on('ManagerStop', [$this, 'onManagerStop']);
75
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
76
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
77
        $this->swoole->on('WorkerError', [$this, 'onWorkerError']);
78
        $this->swoole->on('PipeMessage', [$this, 'onPipeMessage']);
79
    }
80
81
    protected function bindHttpEvent()
82
    {
83
        $this->swoole->on('Request', [$this, 'onRequest']);
84
    }
85
86
    protected function bindTaskEvent()
87
    {
88
        if (!empty($this->conf['swoole']['task_worker_num'])) {
89
            $this->swoole->on('Task', [$this, 'onTask']);
90
            $this->swoole->on('Finish', [$this, 'onFinish']);
91
        }
92
    }
93
94
    protected function bindWebSocketEvent()
95
    {
96
        if ($this->enableWebSocket) {
97
            $eventHandler = function ($method, array $params) {
98
                try {
99
                    call_user_func_array([$this->getWebSocketHandler(), $method], $params);
100
                } catch (\Exception $e) {
101
                    $this->logException($e);
102
                }
103
            };
104
105
            $this->swoole->on('Open', function () use ($eventHandler) {
106
                $eventHandler('onOpen', func_get_args());
107
            });
108
109
            $this->swoole->on('Message', function () use ($eventHandler) {
110
                $eventHandler('onMessage', func_get_args());
111
            });
112
113
            $this->swoole->on('Close', function (\swoole_websocket_server $server, $fd, $reactorId) use ($eventHandler) {
114
                $clientInfo = $server->getClientInfo($fd);
115
                if (isset($clientInfo['websocket_status']) && $clientInfo['websocket_status'] === \WEBSOCKET_STATUS_FRAME) {
116
                    $eventHandler('onClose', func_get_args());
117
                }
118
                // else ignore the close event for http server
119
            });
120
        }
121
    }
122
123
    protected function bindAttachedSockets()
124
    {
125
        foreach ($this->attachedSockets as $socket) {
126
            $port = $this->swoole->addListener($socket['host'], $socket['port'], $socket['type']);
127
            if (!($port instanceof \swoole_server_port)) {
128
                $errno = method_exists($this->swoole, 'getLastError') ? $this->swoole->getLastError() : 'unknown';
129
                $errstr = sprintf('listen %s:%s failed: errno=%s', $socket['host'], $socket['port'], $errno);
130
                $this->log($errstr, 'ERROR');
131
                continue;
132
            }
133
134
            $port->set(empty($socket['settings']) ? [] : $socket['settings']);
135
136
            $handlerClass = $socket['handler'];
137
            $eventHandler = function ($method, array $params) use ($port, $handlerClass) {
138
                $handler = $this->getSocketHandler($port, $handlerClass);
139
                if (method_exists($handler, $method)) {
140
                    try {
141
                        call_user_func_array([$handler, $method], $params);
142
                    } catch (\Exception $e) {
143
                        $this->logException($e);
144
                    }
145
                }
146
            };
147
            static $events = [
148
                'Open',
149
                'Request',
150
                'Message',
151
                'Connect',
152
                'Close',
153
                'Receive',
154
                'Packet',
155
                'BufferFull',
156
                'BufferEmpty',
157
            ];
158
            foreach ($events as $event) {
159
                $port->on($event, function () use ($event, $eventHandler) {
160
                    $eventHandler('on' . $event, func_get_args());
161
                });
162
            }
163
        }
164
    }
165
166
    protected function getWebSocketHandler()
167
    {
168
        static $handler = null;
169
        if ($handler !== null) {
170
            return $handler;
171
        }
172
173
        $handlerClass = $this->conf['websocket']['handler'];
174
        $t = new $handlerClass();
175
        if (!($t instanceof WebSocketHandlerInterface)) {
176
            throw new \Exception(sprintf('%s must implement the interface %s', get_class($t), WebSocketHandlerInterface::class));
177
        }
178
        $handler = $t;
179
        return $handler;
180
    }
181
182
    protected function getSocketHandler(\swoole_server_port $port, $handlerClass)
183
    {
184
        static $handlers = [];
185
        $portHash = spl_object_hash($port);
186
        if (isset($handlers[$portHash])) {
187
            return $handlers[$portHash];
188
        }
189
        $t = new $handlerClass($port);
190
        if (!($t instanceof PortInterface)) {
191
            throw new \Exception(sprintf('%s must extend the abstract class TcpSocket/UdpSocket', get_class($t)));
192
        }
193
        $handlers[$portHash] = $t;
194
        return $handlers[$portHash];
195
    }
196
197
    protected function bindSwooleTables()
198
    {
199
        $tables = isset($this->conf['swoole_tables']) ? (array)$this->conf['swoole_tables'] : [];
200
        foreach ($tables as $name => $table) {
201
            $t = new \swoole_table($table['size']);
202
            foreach ($table['column'] as $column) {
203
                if (isset($column['size'])) {
204
                    $t->column($column['name'], $column['type'], $column['size']);
205
                } else {
206
                    $t->column($column['name'], $column['type']);
207
                }
208
            }
209
            $t->create();
210
            $name .= 'Table'; // Avoid naming conflicts
211
            $this->swoole->$name = $t;
212
        }
213
    }
214
215
    public function onStart(\swoole_http_server $server)
216
    {
217
        foreach (spl_autoload_functions() as $function) {
218
            spl_autoload_unregister($function);
219
        }
220
221
        $this->setProcessTitle(sprintf('%s laravels: master process', $this->conf['process_prefix']));
222
223
        if (version_compare(\swoole_version(), '1.9.5', '<')) {
224
            file_put_contents($this->conf['swoole']['pid_file'], $server->master_pid);
225
        }
226
    }
227
228
    public function onShutdown(\swoole_http_server $server)
229
    {
230
231
    }
232
233
    public function onManagerStart(\swoole_http_server $server)
234
    {
235
        $this->setProcessTitle(sprintf('%s laravels: manager process', $this->conf['process_prefix']));
236
    }
237
238
    public function onManagerStop(\swoole_http_server $server)
239
    {
240
241
    }
242
243
    public function onWorkerStart(\swoole_http_server $server, $workerId)
244
    {
245
        if ($workerId >= $server->setting['worker_num']) {
246
            $process = 'task worker';
247
        } else {
248
            $process = 'worker';
249
        }
250
        $this->setProcessTitle(sprintf('%s laravels: %s process %d', $this->conf['process_prefix'], $process, $workerId));
251
252
        if (function_exists('opcache_reset')) {
253
            opcache_reset();
254
        }
255
        if (function_exists('apc_clear_cache')) {
256
            apc_clear_cache();
257
        }
258
259
        clearstatcache();
260
    }
261
262
    public function onWorkerStop(\swoole_http_server $server, $workerId)
263
    {
264
265
    }
266
267
    public function onWorkerError(\swoole_http_server $server, $workerId, $workerPId, $exitCode, $signal)
268
    {
269
        $this->log(sprintf('worker[%d] error: exitCode=%s, signal=%s', $workerId, $exitCode, $signal), 'ERROR');
270
    }
271
272
    public function onPipeMessage(\swoole_http_server $server, $srcWorkerId, $message)
273
    {
274
        if ($message instanceof Task) {
275
            $this->onTask($server, uniqid('', true), $srcWorkerId, $message);
276
        }
277
    }
278
279
    public function onRequest(\swoole_http_request $request, \swoole_http_response $response)
280
    {
281
282
    }
283
284
    public function onTask(\swoole_http_server $server, $taskId, $srcWorkerId, $data)
285
    {
286
        if ($data instanceof Event) {
287
            $this->handleEvent($data);
288
        } elseif ($data instanceof Task) {
289
            if ($this->handleTask($data) && method_exists($data, 'finish')) {
290
                return $data;
291
            }
292
        }
293
    }
294
295
    public function onFinish(\swoole_http_server $server, $taskId, $data)
296
    {
297
        if ($data instanceof Task) {
298
            $data->finish();
0 ignored issues
show
Bug introduced by
The method finish() does not exist on Hhxsv5\LaravelS\Swoole\Task\Task. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

298
            $data->/** @scrutinizer ignore-call */ 
299
                   finish();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
299
        }
300
    }
301
302
    protected function handleEvent(Event $event)
303
    {
304
        $eventClass = get_class($event);
305
        if (!isset($this->conf['events'][$eventClass])) {
306
            return;
307
        }
308
309
        $listenerClasses = $this->conf['events'][$eventClass];
310
        if (!is_array($listenerClasses)) {
311
            $listenerClasses = (array)$listenerClasses;
312
        }
313
        foreach ($listenerClasses as $listenerClass) {
314
            /**
315
             * @var Listener $listener
316
             */
317
            $listener = new $listenerClass();
318
            if (!($listener instanceof Listener)) {
319
                throw new \Exception(sprintf('%s must extend the abstract class %s', $listenerClass, Listener::class));
320
            }
321
            try {
322
                $listener->handle($event);
323
            } catch (\Exception $e) {
324
                $this->logException($e);
325
            }
326
        }
327
    }
328
329
    protected function handleTask(Task $task)
330
    {
331
        try {
332
            $task->handle();
333
            return true;
334
        } catch (\Exception $e) {
335
            $this->logException($e);
336
            return false;
337
        }
338
    }
339
340
    public function run()
341
    {
342
        $this->swoole->start();
343
    }
344
}
345