1 | <?php declare(strict_types=1); |
||
15 | final class AsyncClient |
||
16 | { |
||
17 | const NO_ACTIVITY_TIMEOUT = 120; |
||
18 | const NO_PING_RESPONSE_TIMEOUT = 30; |
||
19 | |||
20 | protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT; |
||
21 | |||
22 | /** |
||
23 | * @var Observable\RefCountObservable |
||
24 | */ |
||
25 | protected $client; |
||
26 | |||
27 | /** |
||
28 | * @var Observable\AnonymousObservable |
||
29 | */ |
||
30 | protected $messages; |
||
31 | |||
32 | /** |
||
33 | * @var MessageSubject |
||
34 | */ |
||
35 | protected $sendSubject; |
||
36 | |||
37 | /** |
||
38 | * @var array |
||
39 | */ |
||
40 | protected $channels = []; |
||
41 | |||
42 | /** |
||
43 | * @var int |
||
44 | */ |
||
45 | protected $delay = 200; |
||
46 | |||
47 | /** |
||
48 | * @internal |
||
49 | */ |
||
50 | 3 | public function __construct(Observable $client) |
|
51 | { |
||
52 | 3 | $this->messages = $client |
|
53 | // Save this subject for sending stuff |
||
54 | ->do(function (MessageSubject $ms) { |
||
55 | $this->sendSubject = $ms; |
||
56 | |||
57 | // Resubscribe to an channels we where subscribed to when disconnected |
||
58 | foreach ($this->channels as $channel => $_) { |
||
59 | $this->subscribeOnChannel($channel); |
||
60 | } |
||
61 | 3 | }) |
|
62 | |||
63 | // Make sure if there is a disconnect or something |
||
64 | // that we unset the sendSubject |
||
65 | ->finally(function () { |
||
66 | $this->sendSubject = null; |
||
67 | 3 | }) |
|
68 | |||
69 | ->flatMap(function (MessageSubject $ms) { |
||
70 | return $ms; |
||
71 | 3 | }) |
|
72 | |||
73 | // This is the ping/timeout functionality |
||
74 | ->flatMapLatest(function ($x) { |
||
75 | // this Observable emits the current value immediately |
||
76 | // if another value comes along, this all gets disposed (because we are using flatMapLatest) |
||
77 | // before the timeouts start get triggered |
||
78 | return Observable::never() |
||
79 | ->timeout($this->noActivityTimeout * 1000) |
||
80 | ->catch(function () use ($x) { |
||
81 | // ping (do something that causes incoming stream to get a message) |
||
82 | $this->send(['event' => 'pusher:ping']); |
||
83 | // this timeout will actually timeout with a TimeoutException - causing |
||
84 | // everything above this to dispose |
||
85 | return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000); |
||
86 | }) |
||
87 | ->startWith($x); |
||
88 | 3 | }) |
|
89 | |||
90 | // Handle connection level errors |
||
91 | ->retryWhen(function (Observable $errors) { |
||
92 | return $errors->flatMap(function (Throwable $throwable) { |
||
93 | return $this->handleLowLevelError($throwable); |
||
94 | }); |
||
95 | 3 | }) |
|
96 | |||
97 | // Decode JSON |
||
98 | 3 | ->_ApiClients_jsonDecode() |
|
99 | |||
100 | // Deal with connection established messages |
||
101 | ->map(function (array $message) { |
||
102 | $event = Event::createFromMessage($message); |
||
103 | |||
104 | if ($event->getEvent() === 'pusher:connection_established') { |
||
105 | $this->setActivityTimeout($event); |
||
106 | } |
||
107 | |||
108 | return $event; |
||
109 | 3 | }) |
|
110 | 3 | ->share(); |
|
111 | 3 | } |
|
112 | |||
113 | /** |
||
114 | * @param LoopInterface $loop |
||
115 | * @param string $app Application ID |
||
116 | * @param Resolver $resolver Optional DNS resolver |
||
117 | * @return AsyncClient |
||
118 | */ |
||
119 | 3 | public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient |
|
120 | { |
||
121 | try { |
||
122 | Scheduler::setAsyncFactory(function () use ($loop) { |
||
123 | return new Scheduler\EventLoopScheduler($loop); |
||
|
|||
124 | 3 | }); |
|
125 | } catch (Throwable $t) { |
||
126 | } |
||
127 | |||
128 | 3 | return new self( |
|
129 | 3 | new WebsocketClient( |
|
130 | 3 | ApiSettings::createUrl($app), |
|
131 | 3 | false, |
|
132 | 3 | [], |
|
133 | $loop, |
||
134 | 3 | $resolver |
|
135 | ) |
||
136 | ); |
||
137 | } |
||
138 | |||
139 | /** |
||
140 | * Listen on a channel. |
||
141 | * |
||
142 | * @param string $channel Channel to listen on |
||
143 | * @return Observable |
||
144 | */ |
||
145 | public function channel(string $channel): Observable |
||
146 | { |
||
147 | if (isset($this->channels[$channel])) { |
||
148 | return $this->channels[$channel]; |
||
149 | } |
||
150 | |||
151 | // Ensure we only get messages for the given channel |
||
152 | $channelMessages = $this->messages->filter(function (Event $event) use ($channel) { |
||
153 | return $event->getChannel() !== '' && $event->getChannel() === $channel; |
||
154 | }); |
||
155 | |||
156 | $events = Observable::create(function ( |
||
157 | ObserverInterface $observer |
||
158 | ) use ( |
||
159 | $channel, |
||
160 | $channelMessages |
||
161 | ) { |
||
162 | $subscription = $channelMessages |
||
163 | ->filter(function (Event $event) { |
||
164 | return $event->getEvent() !== 'pusher_internal:subscription_succeeded'; |
||
165 | }) |
||
166 | ->subscribe($observer); |
||
167 | |||
168 | $this->subscribeOnChannel($channel); |
||
169 | |||
170 | return new CallbackDisposable(function () use ($channel, $subscription) { |
||
171 | // Send unsubscribe event |
||
172 | $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]); |
||
173 | // Dispose our own subscription to messages |
||
174 | $subscription->dispose(); |
||
175 | // Remove our channel from the channel list so we don't resubscribe in case we reconnect |
||
176 | unset($this->channels[$channel]); |
||
177 | }); |
||
178 | }); |
||
179 | |||
180 | // Share stream amount subscribers to this channel |
||
181 | $this->channels[$channel] = $events->share(); |
||
182 | |||
183 | return $this->channels[$channel]; |
||
184 | } |
||
185 | |||
186 | /** |
||
187 | * Send a message through the client. |
||
188 | * |
||
189 | * @param array $message Message to send, will be json encoded |
||
190 | * |
||
191 | * @return A bool indicating whether or not the connection was active |
||
192 | * and the given message has been pass onto the connection. |
||
193 | */ |
||
194 | public function send(array $message): bool |
||
195 | { |
||
196 | // Don't send messages when we aren't connected |
||
197 | if ($this->sendSubject === null) { |
||
198 | return false; |
||
199 | } |
||
200 | |||
201 | $this->sendSubject->onNext(json_encode($message)); |
||
202 | |||
203 | return true; |
||
204 | } |
||
205 | |||
206 | private function handleLowLevelError(Throwable $throwable) |
||
207 | { |
||
208 | $this->delay *= 2; |
||
209 | |||
210 | return Observable::timer($this->delay); |
||
211 | } |
||
212 | |||
213 | /** |
||
214 | * @param string $channel |
||
215 | */ |
||
216 | private function subscribeOnChannel(string $channel) |
||
217 | { |
||
218 | $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]); |
||
219 | } |
||
220 | |||
221 | /** |
||
222 | * Get connection activity timeout from connection established event. |
||
223 | * |
||
224 | * @param Event $event |
||
225 | */ |
||
226 | private function setActivityTimeout(Event $event) |
||
242 | } |
||
243 |
It seems like the type of the argument is not accepted by the function/method which you are calling.
In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.
We suggest to add an explicit type cast like in the following example: