Websocket::getPipeline()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace SwooleTW\Http\Websocket;
4
5
use Illuminate\Contracts\Container\Container;
6
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
7
use Illuminate\Support\Facades\App;
8
use Illuminate\Support\Facades\Config;
9
use InvalidArgumentException;
10
use SwooleTW\Http\Server\Facades\Server;
11
use SwooleTW\Http\Server\Manager;
12
use SwooleTW\Http\Websocket\Rooms\RoomContract;
13
14
/**
15
 * Class Websocket
16
 */
17
class Websocket
18
{
19
    use Authenticatable;
20
21
    const PUSH_ACTION = 'push';
22
    const EVENT_CONNECT = 'connect';
23
    const USER_PREFIX = 'uid_';
24
25
    /**
26
     * Determine if to broadcast.
27
     *
28
     * @var boolean
29
     */
30
    protected $isBroadcast = false;
31
32
    /**
33
     * Scoket sender's fd.
34
     *
35
     * @var integer
36
     */
37
    protected $sender;
38
39
    /**
40
     * Recepient's fd or room name.
41
     *
42
     * @var array
43
     */
44
    protected $to = [];
45
46
    /**
47
     * Websocket event callbacks.
48
     *
49
     * @var array
50
     */
51
    protected $callbacks = [];
52
53
    /**
54
     * Middleware for on connect request.
55
     *
56
     * @var array
57
     */
58
    protected $middleware = [];
59
60
    /**
61
     * Pipeline instance.
62
     *
63
     * @var \Illuminate\Contracts\Pipeline\Pipeline
64
     */
65
    protected $pipeline;
66
67
    /**
68
     * Room adapter.
69
     *
70
     * @var \SwooleTW\Http\Websocket\Rooms\RoomContract
71
     */
72
    protected $room;
73
74
    /**
75
     * DI Container.
76
     *
77
     * @var \Illuminate\Contracts\Container\Container
78
     */
79
    protected $container;
80
81
    /**
82
     * Websocket constructor.
83
     *
84
     * @param \SwooleTW\Http\Websocket\Rooms\RoomContract $room
85
     * @param \Illuminate\Contracts\Pipeline\Pipeline $pipeline
86
     */
87
    public function __construct(RoomContract $room, PipelineContract $pipeline)
88
    {
89
        $this->room = $room;
90
        $this->setPipeline($pipeline);
91
        $this->setDefaultMiddleware();
92
    }
93
94
    /**
95
     * Set broadcast to true.
96
     */
97
    public function broadcast(): self
98
    {
99
        $this->isBroadcast = true;
100
101
        return $this;
102
    }
103
104
    /**
105
     * Set multiple recipients fd or room names.
106
     *
107
     * @param integer, string, array
108
     *
109
     * @return $this
110
     */
111
    public function to($values): self
112
    {
113
        $values = is_string($values) || is_integer($values) ? func_get_args() : $values;
114
115
        foreach ($values as $value) {
116
            if (! in_array($value, $this->to)) {
117
                $this->to[] = $value;
118
            }
119
        }
120
121
        return $this;
122
    }
123
124
    /**
125
     * Join sender to multiple rooms.
126
     *
127
     * @param string, array $rooms
128
     *
129
     * @return $this
130
     */
131
    public function join($rooms): self
132
    {
133
        $rooms = is_string($rooms) || is_integer($rooms) ? func_get_args() : $rooms;
134
135
        $this->room->add($this->sender, $rooms);
136
137
        return $this;
138
    }
139
140
    /**
141
     * Make sender leave multiple rooms.
142
     *
143
     * @param array $rooms
144
     *
145
     * @return $this
146
     */
147
    public function leave($rooms = []): self
148
    {
149
        $rooms = is_string($rooms) || is_integer($rooms) ? func_get_args() : $rooms;
150
151
        $this->room->delete($this->sender, $rooms);
152
153
        return $this;
154
    }
155
156
    /**
157
     * Emit data and reset some status.
158
     *
159
     * @param string
160
     * @param mixed
161
     *
162
     * @return boolean
163
     */
164
    public function emit(string $event, $data): bool
165
    {
166
        $fds = $this->getFds();
167
        $assigned = ! empty($this->to);
168
169
        // if no fds are found, but rooms are assigned
170
        // that means trying to emit to a non-existing room
171
        // skip it directly instead of pushing to a task queue
172
        if (empty($fds) && $assigned) {
173
            $this->reset();
174
            return false;
175
        }
176
177
        $payload = [
178
            'sender'    => $this->sender,
179
            'fds'       => $fds,
180
            'broadcast' => $this->isBroadcast,
181
            'assigned'  => $assigned,
182
            'event'     => $event,
183
            'message'   => $data,
184
        ];
185
186
        $result = true;
187
        $server = App::make(Server::class);
188
        if ($server->taskworker) {
189
            App::make(Manager::class)->pushMessage($server, $payload);
190
        } else {
191
            $result = $server->task([
192
                'action' => static::PUSH_ACTION,
193
                'data' => $payload
194
            ]);
195
        }
196
197
        $this->reset();
198
199
        return $result !== false;
200
    }
201
202
    /**
203
     * An alias of `join` function.
204
     *
205
     * @param string
206
     *
207
     * @return $this
208
     */
209
    public function in($room)
210
    {
211
        $this->join($room);
212
213
        return $this;
214
    }
215
216
    /**
217
     * Register an event name with a closure binding.
218
     *
219
     * @param string
220
     * @param callback
221
     *
222
     * @return $this
223
     */
224
    public function on(string $event, $callback)
225
    {
226
        if (! is_string($callback) && ! is_callable($callback)) {
227
            throw new InvalidArgumentException(
228
                'Invalid websocket callback. Must be a string or callable.'
229
            );
230
        }
231
232
        $this->callbacks[$event] = $callback;
233
234
        return $this;
235
    }
236
237
    /**
238
     * Check if this event name exists.
239
     *
240
     * @param string
241
     *
242
     * @return boolean
243
     */
244
    public function eventExists(string $event)
245
    {
246
        return array_key_exists($event, $this->callbacks);
247
    }
248
249
    /**
250
     * Execute callback function by its event name.
251
     *
252
     * @param string
253
     * @param mixed
254
     *
255
     * @return mixed
256
     */
257
    public function call(string $event, $data = null)
258
    {
259
        if (! $this->eventExists($event)) {
260
            return null;
261
        }
262
263
        // inject request param on connect event
264
        $isConnect = $event === static::EVENT_CONNECT;
265
        $dataKey = $isConnect ? 'request' : 'data';
266
267
        // dispatch request to pipeline if middleware are set
268
        if ($isConnect && count($this->middleware)) {
269
            $data = $this->setRequestThroughMiddleware($data);
270
        }
271
272
        return App::call($this->callbacks[$event], [
273
            'websocket' => $this,
274
            $dataKey => $data,
275
        ]);
276
    }
277
278
    /**
279
     * Close current connection.
280
     *
281
     * @param integer
282
     *
283
     * @return boolean
284
     */
285
    public function close(int $fd = null)
286
    {
287
        return App::make(Server::class)->close($fd ?: $this->sender);
288
    }
289
290
    /**
291
     * Set sender fd.
292
     *
293
     * @param integer
294
     *
295
     * @return $this
296
     */
297
    public function setSender(int $fd)
298
    {
299
        $this->sender = $fd;
300
301
        return $this;
302
    }
303
304
    /**
305
     * Get current sender fd.
306
     */
307
    public function getSender()
308
    {
309
        return $this->sender;
310
    }
311
312
    /**
313
     * Get broadcast status value.
314
     */
315
    public function getIsBroadcast()
316
    {
317
        return $this->isBroadcast;
318
    }
319
320
    /**
321
     * Get push destinations (fd or room name).
322
     */
323
    public function getTo()
324
    {
325
        return $this->to;
326
    }
327
328
    /**
329
     * Get all fds we're going to push data to.
330
     */
331
    protected function getFds()
332
    {
333
        $fds = array_filter($this->to, function ($value) {
334
            return is_integer($value);
335
        });
336
        $rooms = array_diff($this->to, $fds);
337
338
        foreach ($rooms as $room) {
339
            $clients = $this->room->getClients($room);
340
            // fallback fd with wrong type back to fds array
341
            if (empty($clients) && is_numeric($room)) {
342
                $fds[] = $room;
343
            } else {
344
                $fds = array_merge($fds, $clients);
345
            }
346
        }
347
348
        return array_values(array_unique($fds));
349
    }
350
351
    /**
352
     * Reset some data status.
353
     *
354
     * @param bool $force
355
     *
356
     * @return $this
357
     */
358
    public function reset($force = false)
359
    {
360
        $this->isBroadcast = false;
361
        $this->to = [];
362
363
        if ($force) {
364
            $this->sender = null;
365
            $this->userId = null;
366
        }
367
368
        return $this;
369
    }
370
371
    /**
372
     * Get or set middleware.
373
     *
374
     * @param array|string|null $middleware
375
     *
376
     * @return array|\SwooleTW\Http\Websocket\Websocket
377
     */
378
    public function middleware($middleware = null)
379
    {
380
        if (is_null($middleware)) {
381
            return $this->middleware;
382
        }
383
384
        if (is_string($middleware)) {
385
            $middleware = func_get_args();
386
        }
387
388
        $this->middleware = array_unique(array_merge($this->middleware, $middleware));
389
390
        return $this;
391
    }
392
393
    /**
394
     * Set default middleware.
395
     */
396
    protected function setDefaultMiddleware()
397
    {
398
        $this->middleware = Config::get('swoole_websocket.middleware', []);
399
    }
400
401
    /**
402
     * Set container to pipeline.
403
     *
404
     * @param \Illuminate\Contracts\Container\Container $container
405
     *
406
     * @return $this
407
     */
408
    public function setContainer(Container $container)
409
    {
410
        $pipeline = $this->pipeline;
411
412
        $closure = function () use ($container) {
413
            $this->container = $container;
414
        };
415
416
        $resetPipeline = $closure->bindTo($pipeline, $pipeline);
417
        $resetPipeline();
418
419
        return $this;
420
    }
421
422
    /**
423
     * Set pipeline.
424
     *
425
     * @param \Illuminate\Contracts\Pipeline\Pipeline $pipeline
426
     *
427
     * @return $this
428
     */
429
    public function setPipeline(PipelineContract $pipeline)
430
    {
431
        $this->pipeline = $pipeline;
432
433
        return $this;
434
    }
435
436
    /**
437
     * Get pipeline.
438
     */
439
    public function getPipeline()
440
    {
441
        return $this->pipeline;
442
    }
443
444
    /**
445
     * Set the given request through the middleware.
446
     *
447
     * @param  \Illuminate\Http\Request $request
448
     *
449
     * @return \Illuminate\Http\Request
450
     */
451
    protected function setRequestThroughMiddleware($request)
452
    {
453
        return $this->pipeline
454
            ->send($request)
455
            ->through($this->middleware)
456
            ->then(function ($request) {
457
                return $request;
458
            });
459
    }
460
}
461