1 | <?php declare(strict_types=1); |
||
17 | final class AsyncClient |
||
18 | { |
||
19 | const DEFAULT_DELAY = 200; |
||
20 | const NO_ACTIVITY_TIMEOUT = 120; |
||
21 | const NO_PING_RESPONSE_TIMEOUT = 30; |
||
22 | |||
23 | protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT; |
||
24 | |||
25 | /** |
||
26 | * @var Observable\RefCountObservable |
||
27 | */ |
||
28 | protected $client; |
||
29 | |||
30 | /** |
||
31 | * @var Observable\AnonymousObservable |
||
32 | */ |
||
33 | protected $messages; |
||
34 | |||
35 | /** |
||
36 | * @var MessageSubject |
||
37 | */ |
||
38 | protected $sendSubject; |
||
39 | |||
40 | /** |
||
41 | * @var array |
||
42 | */ |
||
43 | protected $channels = []; |
||
44 | |||
45 | /** |
||
46 | * @var int |
||
47 | */ |
||
48 | protected $delay = self::DEFAULT_DELAY; |
||
49 | |||
50 | /** |
||
51 | * @internal |
||
52 | */ |
||
53 | 5 | public function __construct(Observable $client) |
|
54 | { |
||
55 | 5 | $this->messages = $client |
|
56 | // Save this subject for sending stuff |
||
57 | ->do(function (MessageSubject $ms) { |
||
58 | $this->sendSubject = $ms; |
||
59 | |||
60 | // Resubscribe to an channels we where subscribed to when disconnected |
||
61 | foreach ($this->channels as $channel => $_) { |
||
62 | $this->subscribeOnChannel($channel); |
||
63 | } |
||
64 | 5 | }) |
|
65 | |||
66 | // Make sure if there is a disconnect or something |
||
67 | // that we unset the sendSubject |
||
68 | ->finally(function () { |
||
69 | 2 | $this->sendSubject = null; |
|
70 | 5 | }) |
|
71 | |||
72 | ->flatMap(function (MessageSubject $ms) { |
||
73 | return $ms; |
||
74 | 5 | }) |
|
75 | |||
76 | // This is the ping/timeout functionality |
||
77 | ->flatMapLatest(function ($x) { |
||
78 | // this Observable emits the current value immediately |
||
79 | // if another value comes along, this all gets disposed (because we are using flatMapLatest) |
||
80 | // before the timeouts start get triggered |
||
81 | return Observable::never() |
||
82 | ->timeout($this->noActivityTimeout * 1000) |
||
83 | ->catch(function () use ($x) { |
||
84 | // ping (do something that causes incoming stream to get a message) |
||
85 | $this->send(['event' => 'pusher:ping']); |
||
86 | // this timeout will actually timeout with a TimeoutException - causing |
||
87 | // everything above this to dispose |
||
88 | return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000); |
||
89 | }) |
||
90 | ->startWith($x); |
||
91 | 5 | }) |
|
92 | |||
93 | // Decode JSON |
||
94 | 5 | ->_ApiClients_jsonDecode() |
|
95 | |||
96 | // Deal with connection established messages |
||
97 | ->flatMap(function (array $message) { |
||
98 | $this->delay = self::DEFAULT_DELAY; |
||
99 | |||
100 | $event = Event::createFromMessage($message); |
||
101 | |||
102 | if ($event->getEvent() === 'pusher:error') { |
||
103 | $throwable = new PusherErrorException($event->getData()['message'], $event->getData()['code']); |
||
104 | |||
105 | return Observable::error($throwable); |
||
106 | } |
||
107 | |||
108 | // If this event represents the connection_established event set the timeout |
||
109 | if ($event->getEvent() === 'pusher:connection_established') { |
||
110 | $this->setActivityTimeout($event); |
||
111 | } |
||
112 | |||
113 | return Observable::of($event); |
||
114 | 5 | }) |
|
115 | |||
116 | // Handle connection level and Pusher procotol errors |
||
117 | ->retryWhen(function (Observable $errors) { |
||
118 | return $errors->flatMap(function (Throwable $throwable) { |
||
119 | 2 | return $this->handleLowLevelError($throwable); |
|
120 | 2 | }); |
|
121 | 5 | }) |
|
122 | |||
123 | // Share client |
||
124 | 5 | ->share(); |
|
125 | 5 | } |
|
126 | |||
127 | /** |
||
128 | * @param LoopInterface $loop |
||
129 | * @param string $app Application ID |
||
130 | * @param Resolver $resolver Optional DNS resolver |
||
131 | * @return AsyncClient |
||
132 | */ |
||
133 | 4 | public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient |
|
134 | { |
||
135 | // Rather not do this, but have to until ReactPHP gets it's own global loop |
||
136 | try { |
||
137 | Scheduler::setAsyncFactory(function () use ($loop) { |
||
138 | 1 | return new Scheduler\EventLoopScheduler($loop); |
|
|
|||
139 | 4 | }); |
|
140 | 3 | } catch (Throwable $t) { |
|
141 | } |
||
142 | |||
143 | try { |
||
144 | Scheduler::setDefaultFactory(function () { |
||
145 | return Scheduler::getImmediate(); |
||
146 | 4 | }); |
|
147 | } catch (Throwable $t) { |
||
148 | } |
||
149 | |||
150 | 4 | return new self( |
|
151 | 4 | new WebsocketClient( |
|
152 | 4 | ApiSettings::createUrl($app), |
|
153 | 4 | false, |
|
154 | 4 | [], |
|
155 | $loop, |
||
156 | $resolver |
||
157 | ) |
||
158 | ); |
||
159 | } |
||
160 | |||
161 | /** |
||
162 | * Listen on a channel. |
||
163 | * |
||
164 | * @param string $channel Channel to listen on |
||
165 | * @return Observable |
||
166 | */ |
||
167 | 2 | public function channel(string $channel): Observable |
|
168 | { |
||
169 | // Only join a channel once |
||
170 | 2 | if (isset($this->channels[$channel])) { |
|
171 | return $this->channels[$channel]; |
||
172 | } |
||
173 | |||
174 | // Ensure we only get messages for the given channel |
||
175 | $channelMessages = $this->messages->filter(function (Event $event) use ($channel) { |
||
176 | return $event->getChannel() !== '' && $event->getChannel() === $channel; |
||
177 | 2 | }); |
|
178 | |||
179 | // Observable representing channel events |
||
180 | $events = Observable::create(function ( |
||
181 | ObserverInterface $observer |
||
182 | ) use ( |
||
183 | 2 | $channel, |
|
184 | 2 | $channelMessages |
|
185 | ) { |
||
186 | // Subscribe to channel messages but filter out internal events |
||
187 | $subscription = $channelMessages |
||
188 | ->filter(function (Event $event) { |
||
189 | return $event->getEvent() !== 'pusher_internal:subscription_succeeded'; |
||
190 | 2 | }) |
|
191 | 2 | ->subscribe($observer); |
|
192 | |||
193 | 2 | $this->subscribeOnChannel($channel); |
|
194 | |||
195 | 2 | return new CallbackDisposable(function () use ($channel, $subscription) { |
|
196 | // Send unsubscribe event |
||
197 | $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]); |
||
198 | // Dispose our own subscription to messages |
||
199 | $subscription->dispose(); |
||
200 | // Remove our channel from the channel list so we don't resubscribe in case we reconnect |
||
201 | unset($this->channels[$channel]); |
||
202 | 2 | }); |
|
203 | 2 | }); |
|
204 | |||
205 | // Share stream amount subscribers to this channel |
||
206 | 2 | $this->channels[$channel] = $events->share(); |
|
207 | |||
208 | 2 | return $this->channels[$channel]; |
|
209 | } |
||
210 | |||
211 | /** |
||
212 | * Send a message through the client. |
||
213 | * |
||
214 | * @param array $message Message to send, will be json encoded |
||
215 | * |
||
216 | * @return A bool indicating whether or not the connection was active |
||
217 | * and the given message has been pass onto the connection. |
||
218 | */ |
||
219 | 2 | public function send(array $message): bool |
|
230 | |||
231 | /** |
||
232 | * Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes. |
||
233 | */ |
||
234 | 2 | private function handleLowLevelError(Throwable $throwable) |
|
267 | |||
268 | /** |
||
269 | * @param string $channel |
||
270 | */ |
||
271 | 2 | private function subscribeOnChannel(string $channel) |
|
275 | |||
276 | /** |
||
277 | * Get connection activity timeout from connection established event. |
||
278 | * |
||
279 | * @param Event $event |
||
280 | */ |
||
281 | private function setActivityTimeout(Event $event) |
||
297 | } |
||
298 |
It seems like the type of the argument is not accepted by the function/method which you are calling.
In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.
We suggest to add an explicit type cast like in the following example: