Completed
Push — master ( 522e10...10d67d )
by Biao
03:58
created

Server::onTask()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
eloc 5
c 2
b 0
f 0
nc 4
nop 4
dl 0
loc 7
rs 9.6111
1
<?php
2
3
namespace Hhxsv5\LaravelS\Swoole;
4
5
use Hhxsv5\LaravelS\Swoole\Socket\PortInterface;
6
use Hhxsv5\LaravelS\Swoole\Task\Event;
7
use Hhxsv5\LaravelS\Swoole\Task\Listener;
8
use Hhxsv5\LaravelS\Swoole\Task\Task;
9
use Hhxsv5\LaravelS\Swoole\Traits\LogTrait;
10
use Hhxsv5\LaravelS\Swoole\Traits\ProcessTitleTrait;
11
use Swoole\Http\Request as SwooleRequest;
12
use Swoole\Http\Response as SwooleResponse;
13
use Swoole\Http\Server as HttpServer;
14
use Swoole\Server\Port;
15
use Swoole\Table;
16
use Swoole\WebSocket\Server as WebSocketServer;
17
18
class Server
19
{
20
    use LogTrait;
21
    use ProcessTitleTrait;
22
23
    /**@var array */
24
    protected $conf;
25
26
    /**@var HttpServer|WebSocketServer */
27
    protected $swoole;
28
29
    /**@var bool */
30
    protected $enableWebSocket = false;
31
32
    /**@var array */
33
    protected $attachedSockets = [];
34
35
    protected function __construct(array $conf)
36
    {
37
        $this->conf = $conf;
38
        $this->enableWebSocket = !empty($this->conf['websocket']['enable']);
39
        $this->attachedSockets = empty($this->conf['sockets']) ? [] : $this->conf['sockets'];
40
41
        $ip = isset($conf['listen_ip']) ? $conf['listen_ip'] : '127.0.0.1';
42
        $port = isset($conf['listen_port']) ? $conf['listen_port'] : 5200;
43
        $socketType = isset($conf['socket_type']) ? (int)$conf['socket_type'] : SWOOLE_SOCK_TCP;
44
45
        if ($socketType === SWOOLE_SOCK_UNIX_STREAM) {
46
            $socketDir = dirname($ip);
47
            if (!file_exists($socketDir)) {
48
                mkdir($socketDir);
49
            }
50
        }
51
52
        $settings = isset($conf['swoole']) ? $conf['swoole'] : [];
53
        $settings['enable_static_handler'] = !empty($conf['handle_static']);
54
55
        $serverClass = $this->enableWebSocket ? WebSocketServer::class : HttpServer::class;
56
        if (isset($settings['ssl_cert_file'], $settings['ssl_key_file'])) {
57
            $this->swoole = new $serverClass($ip, $port, SWOOLE_PROCESS, $socketType | SWOOLE_SSL);
58
        } else {
59
            $this->swoole = new $serverClass($ip, $port, SWOOLE_PROCESS, $socketType);
60
        }
61
62
        $this->swoole->set($settings);
63
64
        $this->bindBaseEvent();
65
        $this->bindHttpEvent();
66
        $this->bindTaskEvent();
67
        $this->bindWebSocketEvent();
68
        $this->bindAttachedSockets();
69
        $this->bindSwooleTables();
70
    }
71
72
    protected function bindBaseEvent()
73
    {
74
        $this->swoole->on('Start', [$this, 'onStart']);
75
        $this->swoole->on('Shutdown', [$this, 'onShutdown']);
76
        $this->swoole->on('ManagerStart', [$this, 'onManagerStart']);
77
        $this->swoole->on('ManagerStop', [$this, 'onManagerStop']);
78
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
79
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
80
        $this->swoole->on('WorkerError', [$this, 'onWorkerError']);
81
        $this->swoole->on('PipeMessage', [$this, 'onPipeMessage']);
82
    }
83
84
    protected function bindHttpEvent()
85
    {
86
        $this->swoole->on('Request', [$this, 'onRequest']);
87
    }
88
89
    protected function bindTaskEvent()
90
    {
91
        if (!empty($this->conf['swoole']['task_worker_num'])) {
92
            $this->swoole->on('Task', [$this, 'onTask']);
93
            $this->swoole->on('Finish', [$this, 'onFinish']);
94
        }
95
    }
96
97
    protected function bindWebSocketEvent()
98
    {
99
        if ($this->enableWebSocket) {
100
            $eventHandler = function ($method, array $params) {
101
                $this->callWithCatchException(function () use ($method, $params) {
102
                    call_user_func_array([$this->getWebSocketHandler(), $method], $params);
103
                });
104
            };
105
106
            $this->swoole->on('Open', function () use ($eventHandler) {
107
                $eventHandler('onOpen', func_get_args());
108
            });
109
110
            $this->swoole->on('Message', function () use ($eventHandler) {
111
                $eventHandler('onMessage', func_get_args());
112
            });
113
114
            $this->swoole->on('Close', function (\swoole_websocket_server $server, $fd, $reactorId) use ($eventHandler) {
115
                $clientInfo = $server->getClientInfo($fd);
116
                if (isset($clientInfo['websocket_status']) && $clientInfo['websocket_status'] === \WEBSOCKET_STATUS_FRAME) {
117
                    $eventHandler('onClose', func_get_args());
118
                }
119
                // else ignore the close event for http server
120
            });
121
        }
122
    }
123
124
    protected function bindAttachedSockets()
125
    {
126
        foreach ($this->attachedSockets as $socket) {
127
            $port = $this->swoole->addListener($socket['host'], $socket['port'], $socket['type']);
128
            if (!($port instanceof Port)) {
129
                $errno = method_exists($this->swoole, 'getLastError') ? $this->swoole->getLastError() : 'unknown';
130
                $errstr = sprintf('listen %s:%s failed: errno=%s', $socket['host'], $socket['port'], $errno);
131
                $this->error($errstr);
132
                continue;
133
            }
134
135
            $port->set(empty($socket['settings']) ? [] : $socket['settings']);
136
137
            $handlerClass = $socket['handler'];
138
            $eventHandler = function ($method, array $params) use ($port, $handlerClass) {
139
                $handler = $this->getSocketHandler($port, $handlerClass);
140
                if (method_exists($handler, $method)) {
141
                    $this->callWithCatchException(function () use ($handler, $method, $params) {
142
                        call_user_func_array([$handler, $method], $params);
143
                    });
144
                }
145
            };
146
            static $events = [
147
                'Open',
148
                'Request',
149
                'Message',
150
                'Connect',
151
                'Close',
152
                'Receive',
153
                'Packet',
154
                'BufferFull',
155
                'BufferEmpty',
156
            ];
157
            foreach ($events as $event) {
158
                $port->on($event, function () use ($event, $eventHandler) {
159
                    $eventHandler('on' . $event, func_get_args());
160
                });
161
            }
162
        }
163
    }
164
165
    protected function getWebSocketHandler()
166
    {
167
        static $handler = null;
168
        if ($handler !== null) {
169
            return $handler;
170
        }
171
172
        $handlerClass = $this->conf['websocket']['handler'];
173
        $t = new $handlerClass();
174
        if (!($t instanceof WebSocketHandlerInterface)) {
175
            throw new \InvalidArgumentException(sprintf('%s must implement the interface %s', get_class($t), WebSocketHandlerInterface::class));
176
        }
177
        $handler = $t;
178
        return $handler;
179
    }
180
181
    protected function getSocketHandler(Port $port, $handlerClass)
182
    {
183
        static $handlers = [];
184
        $portHash = spl_object_hash($port);
185
        if (isset($handlers[$portHash])) {
186
            return $handlers[$portHash];
187
        }
188
        $t = new $handlerClass($port);
189
        if (!($t instanceof PortInterface)) {
190
            throw new \InvalidArgumentException(sprintf('%s must extend the abstract class TcpSocket/UdpSocket', get_class($t)));
191
        }
192
        $handlers[$portHash] = $t;
193
        return $handlers[$portHash];
194
    }
195
196
    protected function bindSwooleTables()
197
    {
198
        $tables = isset($this->conf['swoole_tables']) ? (array)$this->conf['swoole_tables'] : [];
199
        foreach ($tables as $name => $table) {
200
            $t = new Table($table['size']);
201
            foreach ($table['column'] as $column) {
202
                if (isset($column['size'])) {
203
                    $t->column($column['name'], $column['type'], $column['size']);
204
                } else {
205
                    $t->column($column['name'], $column['type']);
206
                }
207
            }
208
            $t->create();
209
            $name .= 'Table'; // Avoid naming conflicts
210
            $this->swoole->{$name} = $t;
211
        }
212
    }
213
214
    public function onStart(HttpServer $server)
215
    {
216
        foreach (spl_autoload_functions() as $function) {
217
            spl_autoload_unregister($function);
218
        }
219
220
        $this->setProcessTitle(sprintf('%s laravels: master process', $this->conf['process_prefix']));
221
222
        if (version_compare(swoole_version(), '1.9.5', '<')) {
223
            file_put_contents($this->conf['swoole']['pid_file'], $server->master_pid);
224
        }
225
    }
226
227
    public function onShutdown(HttpServer $server)
228
    {
229
    }
230
231
    public function onManagerStart(HttpServer $server)
232
    {
233
        $this->setProcessTitle(sprintf('%s laravels: manager process', $this->conf['process_prefix']));
234
    }
235
236
    public function onManagerStop(HttpServer $server)
237
    {
238
    }
239
240
    public function onWorkerStart(HttpServer $server, $workerId)
241
    {
242
        if ($workerId >= $server->setting['worker_num']) {
243
            $process = 'task worker';
244
        } else {
245
            $process = 'worker';
246
            if (!empty($this->conf['enable_coroutine_runtime'])) {
247
                \Swoole\Runtime::enableCoroutine();
248
            }
249
        }
250
        $this->setProcessTitle(sprintf('%s laravels: %s process %d', $this->conf['process_prefix'], $process, $workerId));
251
252
        if (function_exists('opcache_reset')) {
253
            opcache_reset();
254
        }
255
        if (function_exists('apc_clear_cache')) {
256
            apc_clear_cache();
257
        }
258
259
        clearstatcache();
260
    }
261
262
    public function onWorkerStop(HttpServer $server, $workerId)
263
    {
264
    }
265
266
    public function onWorkerError(HttpServer $server, $workerId, $workerPId, $exitCode, $signal)
267
    {
268
        $this->error(sprintf('worker[%d] error: exitCode=%s, signal=%s', $workerId, $exitCode, $signal));
269
    }
270
271
    public function onPipeMessage(HttpServer $server, $srcWorkerId, $message)
272
    {
273
        if ($message instanceof Task) {
274
            $this->onTask($server, uniqid('', true), $srcWorkerId, $message);
275
        }
276
    }
277
278
    public function onRequest(SwooleRequest $swooleRequest, SwooleResponse $swooleResponse)
279
    {
280
    }
281
282
    public function onTask(HttpServer $server, $taskId, $srcWorkerId, $data)
283
    {
284
        if ($data instanceof Event) {
285
            $this->handleEvent($data);
286
        } elseif ($data instanceof Task) {
287
            if ($this->handleTask($data) && method_exists($data, 'finish')) {
288
                return $data;
289
            }
290
        }
291
    }
292
293
    public function onFinish(HttpServer $server, $taskId, $data)
294
    {
295
        if ($data instanceof Task) {
296
            $data->finish();
0 ignored issues
show
Bug introduced by
The method finish() does not exist on Hhxsv5\LaravelS\Swoole\Task\Task. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

296
            $data->/** @scrutinizer ignore-call */ 
297
                   finish();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
297
        }
298
    }
299
300
    protected function handleEvent(Event $event)
301
    {
302
        $eventClass = get_class($event);
303
        if (!isset($this->conf['events'][$eventClass])) {
304
            return;
305
        }
306
307
        $listenerClasses = $this->conf['events'][$eventClass];
308
        if (!is_array($listenerClasses)) {
309
            $listenerClasses = (array)$listenerClasses;
310
        }
311
        foreach ($listenerClasses as $listenerClass) {
312
            /**
313
             * @var Listener $listener
314
             */
315
            $listener = new $listenerClass();
316
            if (!($listener instanceof Listener)) {
317
                throw new \InvalidArgumentException(sprintf('%s must extend the abstract class %s', $listenerClass, Listener::class));
318
            }
319
            $this->callWithCatchException(function () use ($listener, $event) {
320
                $listener->handle($event);
321
            });
322
        }
323
    }
324
325
    protected function handleTask(Task $task)
326
    {
327
        return $this->callWithCatchException(function () use ($task) {
328
            $task->handle();
329
            return true;
330
        });
331
    }
332
333
    protected function fireEvent($event, $interface, array $arguments)
334
    {
335
        if (isset($this->conf['event_handlers'][$event])) {
336
            $eventHandler = $this->conf['event_handlers'][$event];
337
            if (!isset(class_implements($eventHandler)[$interface])) {
338
                throw new \InvalidArgumentException(sprintf(
339
                        '%s must implement the interface %s',
340
                        $eventHandler,
341
                        $interface
342
                    )
343
                );
344
            }
345
            $this->callWithCatchException(function () use ($eventHandler, $arguments) {
346
                call_user_func_array([(new $eventHandler), 'handle'], $arguments);
347
            });
348
        }
349
    }
350
351
    public function run()
352
    {
353
        $this->swoole->start();
354
    }
355
}
356