1
|
|
|
<?php declare(strict_types=1); |
2
|
|
|
|
3
|
|
|
namespace ApiClients\Client\Pusher; |
4
|
|
|
|
5
|
|
|
use PackageVersions\Versions; |
6
|
|
|
use React\Dns\Resolver\Resolver; |
7
|
|
|
use React\EventLoop\LoopInterface; |
8
|
|
|
use React\Socket\Connector; |
9
|
|
|
use Rx\DisposableInterface; |
10
|
|
|
use Rx\Observable; |
11
|
|
|
use Rx\ObserverInterface; |
12
|
|
|
use Rx\Subject\ReplaySubject; |
13
|
|
|
use Rx\Subject\Subject; |
14
|
|
|
use Rx\Websocket\Client; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* Class WebSocket - WebSocket wrapper that queues messages while the connection is being established. |
18
|
|
|
*/ |
19
|
|
|
final class WebSocket extends Subject |
20
|
|
|
{ |
21
|
|
|
private $ws; |
22
|
|
|
private $sendSubject; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @internal |
26
|
|
|
* @param Observable $ws |
27
|
|
|
* @throws \InvalidArgumentException |
28
|
|
|
*/ |
29
|
13 |
|
public function __construct(Observable $ws) |
30
|
|
|
{ |
31
|
13 |
|
$this->sendSubject = new ReplaySubject(); |
32
|
13 |
|
$this->ws = $ws; |
33
|
13 |
|
} |
34
|
|
|
|
35
|
4 |
|
public static function createFactory(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, Resolver $resolver = null): Subject |
36
|
|
|
{ |
37
|
4 |
|
if (Versions::getVersion('rx/websocket')[0] === '1' || $resolver === null) { |
38
|
3 |
|
return new self( |
39
|
3 |
|
new Client($url, $useMessageObject, $subProtocols, $loop, $resolver) |
|
|
|
|
40
|
|
|
); |
41
|
|
|
} |
42
|
|
|
|
43
|
1 |
|
return new self( |
44
|
1 |
|
new Client($url, $useMessageObject, $subProtocols, $loop, new Connector( |
45
|
1 |
|
$loop, |
|
|
|
|
46
|
|
|
[ |
47
|
1 |
|
'dns' => $resolver, |
48
|
|
|
] |
49
|
|
|
)) |
50
|
|
|
); |
51
|
|
|
} |
52
|
|
|
|
53
|
2 |
|
public function onNext($value): void |
54
|
|
|
{ |
55
|
2 |
|
$this->sendSubject->onNext($value); |
56
|
2 |
|
} |
57
|
|
|
|
58
|
10 |
|
protected function _subscribe(ObserverInterface $observer): DisposableInterface |
59
|
|
|
{ |
60
|
10 |
|
return $this->ws |
61
|
10 |
|
->do(function (Subject $ms): void { |
62
|
|
|
// Replay buffered messages onto the MessageSubject |
63
|
5 |
|
$this->sendSubject->subscribe($ms); |
64
|
|
|
|
65
|
|
|
// Now that the connection has been established, use the message subject directly. |
66
|
5 |
|
$this->sendSubject = $ms; |
67
|
10 |
|
}) |
68
|
10 |
|
->finally(function (): void { |
69
|
|
|
// The connection has closed, so start buffering messages util it reconnects. |
70
|
10 |
|
$this->sendSubject = new ReplaySubject(); |
71
|
10 |
|
}) |
72
|
10 |
|
->switch() |
73
|
10 |
|
->repeatDelay() |
74
|
10 |
|
->subscribe($observer); |
75
|
|
|
} |
76
|
|
|
} |
77
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.