Client::sendMessage()   A
last analyzed

Complexity

Conditions 3
Paths 1

Size

Total Lines 29
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 15
c 1
b 0
f 0
dl 0
loc 29
rs 9.7666
cc 3
nc 1
nop 3
1
<?php
2
3
namespace PortlandLabs\Slackbot\Slack\Rtm;
4
5
use PortlandLabs\Slackbot\Slack\Api\Payload\RtmConnectPayload;
6
use PortlandLabs\Slackbot\Slack\Api\Payload\RtmConnectPayloadResponse;
7
use PortlandLabs\Slackbot\Slack\Api\Client as ApiClient;
8
use PortlandLabs\Slackbot\Slack\Rtm\Event\Factory;
9
use PortlandLabs\Slackbot\Slack\Rtm\Event\Handler;
10
use PortlandLabs\Slackbot\Slack\Rtm\Event\Middleware;
11
use PortlandLabs\Slackbot\Slack\Rtm\Event\Pong;
12
use Psr\Container\ContainerInterface;
13
use Psr\Log\LoggerAwareTrait;
14
use Psr\Log\LoggerInterface;
15
use Ratchet\Client\WebSocket;
16
use Ratchet\RFC6455\Messaging\Message;
17
use React\EventLoop\LoopInterface;
18
use React\Promise\Deferred;
19
use React\Promise\Promise;
20
use React\Promise\PromiseInterface;
21
22
use function Ratchet\Client\connect as websocketConnect;
23
24
class Client
25
{
26
27
    use LoggerAwareTrait;
28
29
    /** @var ContainerInterface  */
30
    protected $container;
31
32
    /** @var ApiClient */
33
    protected $client;
34
35
    /** @var Factory */
36
    protected $eventFactory;
37
38
    /** @var WebSocket */
39
    protected $socket;
40
41
    /** @var callable */
42
    protected $listener;
43
44
    /** @var string */
45
    protected $userName;
46
47
    /** @var string */
48
    protected $userId;
49
50
    /** @var Deferred The promise associated with this client */
51
    protected $deferred;
52
53
    /** @var int Tracks the current message identifier for the RTM api */
54
    protected $messageId = 1;
55
56
    /** @var bool */
57
    protected $connected = false;
58
59
    /** @var Deferred[] */
60
    protected $pings = [];
61
62
    public function __construct(ContainerInterface $container, Factory $eventFactory, LoggerInterface $logger)
63
    {
64
        $this->container = $container;
65
        $this->eventFactory = $eventFactory;
66
        $this->setLogger($logger);
67
    }
68
69
    /**
70
     * Handle shutting down
71
     */
72
    public function __destruct()
73
    {
74
        $this->disconnect();
75
    }
76
77
    /**
78
     * Disconnect from RTM stream
79
     *
80
     * @return bool
81
     */
82
    public function disconnect(): bool
83
    {
84
        if ($this->socket) {
85
            $this->logger->info('[<bold>RTM.DSC]</bold>] Disconnecting.');
0 ignored issues
show
Bug introduced by
The method info() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

85
            $this->logger->/** @scrutinizer ignore-call */ 
86
                           info('[<bold>RTM.DSC]</bold>] Disconnecting.');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
86
87
            // Stop listening
88
            $this->socket->removeListener('message', $this->listener);
89
90
            // Close the websocket
91
            $this->socket->close();
92
            $this->socket = null;
93
94
            // Unset stuff
95
            $this->userId = null;
96
            $this->userName = null;
97
            $this->listener = null;
98
            $this->connected = false;
99
100
            return true;
101
        }
102
103
        return false;
104
    }
105
106
    /**
107
     * Begin listening to events
108
     *
109
     * @param LoopInterface $loop
110
     * @param RtmConnectPayloadResponse $rtmPayload
111
     * @param array $middleware
112
     * @param callable $callback
113
     *
114
     * @return Promise
115
     */
116
    public function listen(LoopInterface $loop, RtmConnectPayloadResponse $rtmPayload, array $middleware, callable $callback): Promise
117
    {
118
        if ($this->socket) {
119
            throw new \RuntimeException('This client is already listening.');
120
        }
121
122
        // Build a stack of middleware with the passed callback at the center
123
        $middleware = array_reverse($middleware);
124
125
        /** @var Handler $stack */
126
        $stack = array_reduce($middleware, [$this, 'buildDelegateHandler'], new Handler\Dispatcher($callback));
127
128
        // Get our promise and add a listener when it resolves
129
        $promise = $this->getPromise();
130
        $promise->done(function() use ($stack) {
0 ignored issues
show
Bug introduced by
The method done() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

130
        $promise->/** @scrutinizer ignore-call */ 
131
                  done(function() use ($stack) {
Loading history...
131
            $this->logger->info('-- Starting to listen... --');
132
133
            $this->listener = function (Message $message) use ($stack) {
134
                if (!$this->connected) {
135
                    // After we disconnect stop listening.
136
                    return false;
137
                }
138
139
                $this->logger->debug('[<bold>RTM <- </bold>] ' . $message);
140
141
                // Decode the payload
142
                $payload = $message->getPayload();
143
                $data = json_decode($payload, true);
144
145
                // If we can build an event from it, send it through the middleware
146
                if ($event = $this->eventFactory->buildEvent($data)) {
147
                    if ($event instanceof Pong) {
148
                        $this->handlePong($event);
149
                    }
150
151
                    $this->logger->debug('[<bold>RTM.EVT</bold>] Sending through middleware...');
152
                    $stack->handle($event);
0 ignored issues
show
Bug introduced by
It seems like $event can also be of type null; however, parameter $event of PortlandLabs\Slackbot\Sl...Event\Handler::handle() does only seem to accept PortlandLabs\Slackbot\Slack\Rtm\Event, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

152
                    $stack->handle(/** @scrutinizer ignore-type */ $event);
Loading history...
153
                }
154
            };
155
156
            // Listen to messages
157
            $this->socket->on('message', $this->listener);
158
        });
159
160
        // Try to connect
161
        $this->connect($loop, $rtmPayload);
162
163
        return $promise;
164
    }
165
166
    /**
167
     * Build a delegate handler given a middleware to delegate to, and a handler to pass in
168
     *
169
     * @param Handler $handler
170
     * @param Middleware|string $middleware
171
     * @return Handler
172
     */
173
    protected function buildDelegateHandler(Handler $handler, $middleware): Handler
174
    {
175
        if (is_string($middleware)) {
176
            $middleware = $this->container->get($middleware);
177
        }
178
179
        return new Handler\Delegate($middleware, $handler);
180
    }
181
182
    /**
183
     * Get the promise associated with this client
184
     * This promise resolves when the connection is established
185
     *
186
     * @return Promise|\React\Promise\PromiseInterface
187
     */
188
    public function getPromise(): PromiseInterface
189
    {
190
        return $this->getDeferred()->promise();
191
    }
192
193
    /**
194
     * Get or create a new Deferred promise for our
195
     *
196
     * @return Deferred
197
     */
198
    protected function getDeferred(): Deferred
199
    {
200
        if (!$this->deferred) {
201
            $this->deferred = new Deferred();
202
        }
203
204
        return $this->deferred;
205
    }
206
207
    /**
208
     * Show the bot as typing
209
     * @param string $channel
210
     */
211
    public function typing(string $channel)
212
    {
213
        $this->send('typing', ['channel' => $channel]);
214
    }
215
216
    /**
217
     * Send some data into the socket
218
     *
219
     * @param string $type
220
     * @param array $data
221
     */
222
    public function send(string $type, array $data = [])
223
    {
224
        $data['type'] = $type;
225
        $data['id'] = $this->messageId++;
226
227
        $serialized = json_encode($data);
228
        $this->logger->debug('[<bold>RTM -> </bold>] ' . $serialized);
229
        $this->socket->send($serialized);
230
    }
231
232
    /**
233
     * Send a message
234
     *
235
     * @param string $message
236
     * @param array $data
237
     * @return Promise
238
     */
239
    public function sendMessage(string $message, string $channel, array $data = []): Promise
240
    {
241
        $deferred = new Deferred();
242
243
        $data['text'] = $message;
244
        $data['channel'] = $channel;
245
        $currentId = $this->messageId;
246
        $listener = null;
247
248
        // Listen for a reply
249
        $listener = function (Message $message) use ($deferred, $currentId, &$listener) {
250
            $payload = json_decode($message->getPayload(), true);
251
            $replyTo = $payload['reply_to'] ?? 0;
252
253
            // If this payload is in reply to our message
254
            if ($replyTo === $currentId) {
255
                $deferred->resolve($payload);
256
            }
257
258
            if ($this->socket) {
259
                $this->socket->removeListener('message', $listener);
260
            }
261
        };
262
263
        $this->socket->on('message', $listener);
264
265
        // Send our data
266
        $this->send('message', $data);
267
        return $deferred->promise();
268
    }
269
270
    /**
271
     * Send a ping and return a promise that resolves when the pong is received
272
     *
273
     * @param array $data
274
     * @param int $expires Old expired pings are marked rejected once a new ping is sent
275
     * @return Promise
276
     */
277
    public function sendPing(array $data = [], int $expires = 0): Promise
278
    {
279
        $expires = max($expires, 0);
280
281
        $this->prunePings();
282
283
        $deferred = new Deferred();
284
        $this->pings[time() + $expires] = [$deferred, $this->messageId];
285
        $this->send('ping', $data);
286
287
        return $deferred->promise();
288
    }
289
290
    /**
291
     * Deal with an incoming Pong message
292
     *
293
     * @param Pong $pong
294
     */
295
    public function handlePong(Pong $pong)
296
    {
297
        foreach ($this->pings as $key => $data) {
298
            [$deferred, $replyTo] = $data;
299
            if ($replyTo === $pong->getReplyTo()) {
300
                $deferred->resolve($pong);
301
                unset($this->pings[$key]);
302
            }
303
        }
304
    }
305
306
    /**
307
     * Reject and remove old expired pings
308
     */
309
    protected function prunePings(): void
310
    {
311
        $now = time();
312
        foreach ($this->pings as $expires => $data) {
313
            [$deferred, $replyTo] = $data;
314
            if ($now > $expires) {
315
                $deferred->reject('expired');
316
                unset($this->pings[$expires]);
317
            }
318
        }
319
    }
320
321
    /**
322
     * Connect to our RTM websocket
323
     *
324
     * @param LoopInterface $loop
325
     * @param RtmConnectPayloadResponse $rtmPayload
326
     */
327
    protected function connect(LoopInterface $loop, RtmConnectPayloadResponse $rtmPayload)
328
    {
329
        $deferred = $this->deferred;
330
331
        // Manage connecting
332
        $connection = websocketConnect($rtmPayload->getUrl(), [], [], $loop);
333
        $connection->then(
334
            function (WebSocket $connection) use ($deferred, $rtmPayload) {
335
                $this->connected($connection, $rtmPayload);
336
                $deferred->resolve([$connection, $rtmPayload]);
337
338
                return $connection;
339
            },
340
341
            // Failure
342
            function($e) use ($deferred) {
343
                $this->logger->error('[<bold>RTM.ERR</bold>] Websocket rejected: ' . $e);
344
                $deferred->reject($e);
345
            }
346
        );
347
    }
348
349
    /**
350
     * Manage what happens post connect
351
     *
352
     * @param WebSocket $socket
353
     * @param RtmConnectPayloadResponse $rtmPayload
354
     */
355
    protected function connected(WebSocket $socket, RtmConnectPayloadResponse $rtmPayload)
356
    {
357
        $this->logger->info('<bold>-- Connected! --</bold>');
358
        $this->socket = $socket;
359
        $this->userName = $rtmPayload->getUserName();
360
        $this->userId = $rtmPayload->getUserId();
361
        $this->url = $rtmPayload->getUrl();
0 ignored issues
show
Bug Best Practice introduced by
The property url does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
362
        $this->connected = true;
363
    }
364
365
    /**
366
     * Get the active user name
367
     *
368
     * @return string|null
369
     */
370
    public function getUserName(): ?string
371
    {
372
        return $this->userName;
373
    }
374
375
    /**
376
     * Get the active user id
377
     *
378
     * @return string|null
379
     */
380
    public function getUserId(): ?string
381
    {
382
        return $this->userId;
383
    }
384
385
}