Passed
Pull Request — main (#167)
by
unknown
01:53
created

pincer.core.gateway.Dispatcher.intents()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# Copyright Pincer 2021-Present
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
5
from __future__ import annotations
6
7
import logging
8
import zlib
9
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
10
from platform import system
11
from typing import Dict, Callable, Awaitable, Optional
12
13
from websockets import connect
14
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15
from websockets.legacy.client import WebSocketClientProtocol
16
17
from . import __package__
18
from .._config import GatewayConfig
19
from ..core.dispatch import GatewayDispatch
20
from ..core.heartbeat import Heartbeat
21
from ..exceptions import (
22
    PincerError, InvalidTokenError, UnhandledException,
23
    _InternalPerformReconnectError, DisallowedIntentsError
24
)
25
from ..objects import Intents
26
27
ZLIB_SUFFIX = b'\x00\x00\xff\xff'
28
29
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
30
_log = logging.getLogger(__package__)
31
32
33
class Dispatcher:
34
    """
35
    The Dispatcher handles all interactions with discord websocket API.
36
    This also contains the main event loop, and handles the heartbeat.
37
38
    Running the dispatcher will create a connection with the
39
    Discord WebSocket API on behalf of the provided token.
40
41
    This token must be a bot token.
42
    (Which can be found on
43
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
44
    """
45
46
    def __init__(
47
            self, token: str, *,
48
            handlers: Dict[int, Handler],
49
            intents: Intents,
50
            reconnect: bool
51
    ) -> None:
52
        """
53
        :param token:
54
            Bot token for discord's API.
55
56
        :param intents:
57
            Represents the discord bot intents.
58
59
        :param handlers:
60
            A hashmap of coroutines with as key the gateway opcode.
61
62
        :raises InvalidTokenError:
63
            Discord Token length is not 59 characters.
64
65
        auto_reconnect :class:`bool`
66
            Whether the dispatcher should automatically reconnect.
67
        """
68
69
        if len(token) != 59:
70
            raise InvalidTokenError(
71
                "Discord Token must have exactly 59 characters."
72
            )
73
74
        self.__token = token
75
        self.__keep_alive = True
76
        self.__has_closed = self.__should_restart = False
77
        self.__socket: Optional[WebSocketClientProtocol] = None
78
        self.__intents = intents
79
        self.__reconnect = reconnect
80
81
        async def identify_and_handle_hello(
82
                socket: WebSocketClientProtocol,
83
                payload: GatewayDispatch
84
        ):
85
            """
86
            Identifies the client to the Discord Websocket API, this
87
            gets done when the client receives the ``hello`` (opcode 10)
88
            message from discord. Right after we send our identification
89
            the heartbeat starts.
90
91
            :param socket:
92
                The current socket, which can be used to interact
93
                with the Discord API.
94
95
            :param payload:
96
                The received payload from Discord.
97
            """
98
            _log.debug("Sending authentication/identification message.")
99
100
            await socket.send(self.__hello_socket)
101
            await Heartbeat.handle_hello(socket, payload)
102
103
        async def handle_reconnect(_, payload: GatewayDispatch):
104
            """
105
            Closes the client and then reconnects it.
106
            """
107
            _log.debug("Reconnecting client...")
108
            await self.restart(payload.seq)
109
110
        self.__dispatch_handlers: Dict[int, Handler] = {
111
            **handlers,
112
            7: handle_reconnect,
113
            9: handle_reconnect,
114
            10: identify_and_handle_hello,
115
            11: Heartbeat.handle_heartbeat
116
        }
117
118
        self.__dispatch_errors: Dict[int, PincerError] = {
119
            1006: _InternalPerformReconnectError(),
120
            4000: _InternalPerformReconnectError(),
121
            4004: InvalidTokenError(),
122
            4007: _InternalPerformReconnectError(),
123
            4009: _InternalPerformReconnectError(),
124
            4014: DisallowedIntentsError()
125
        }
126
127
    @property
128
    def intents(self):
129
        return self.__intents
130
131
    @property
132
    def __hello_socket(self) -> str:
133
        return str(
134
            GatewayDispatch(
135
                2, {
136
                    "token": self.__token,
137
                    "intents": self.__intents,
138
                    "properties": {
139
                        "$os": system(),
140
                        "$browser": __package__,
141
                        "$device": __package__
142
                    },
143
                    "compress": GatewayConfig.compressed()
144
                }
145
            )
146
        )
147
148
    async def __handler_manager(
149
            self,
150
            socket: WebSocketClientProtocol,
151
            payload: GatewayDispatch,
152
            loop: AbstractEventLoop
153
    ):
154
        """
155
        This manages all handles for given OP codes.
156
        This method gets invoked for every message that is received from
157
        Discord.
158
159
        :meta public:
160
161
        :param socket:
162
            The current socket, which can be used to interact with
163
            the Discord API.
164
165
        :param payload:
166
            The received payload from Discord.
167
168
        :param loop:
169
            The current async loop on which the future is bound.
170
        """
171
        _log.debug(
172
            "New event received, checking if handler exists for opcode: %i",
173
            payload.op
174
        )
175
176
        handler: Handler = self.__dispatch_handlers.get(payload.op)
177
        all_handler: Handler = self.__dispatch_handlers.get(-1)
178
179
        if not handler:
180
            _log.error(
181
                "No handler was found for opcode %i, please report this to the "
182
                "pincer dev team!", payload.op
183
            )
184
185
            if not all_handler:
186
                raise UnhandledException(f"Unhandled payload: {payload}")
187
188
        _log.debug(
189
            "Event handler found, ensuring async future in current loop."
190
        )
191
192
        def execute_handler(event_handler: Handler):
193
            if event_handler:
194
                ensure_future(event_handler(socket, payload), loop=loop)
195
196
        execute_handler(handler)
197
        execute_handler(all_handler)
198
199
    async def __dispatcher(self, loop: AbstractEventLoop):
200
        """
201
        The main event loop.
202
        This handles all interactions with the websocket API.
203
204
        :meta public:
205
206
        :param loop:
207
            The loop in which the dispatcher is running.
208
        """
209
        _log.debug(
210
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
211
        )
212
213
        async with connect(GatewayConfig.uri()) as socket:
214
            socket: WebSocketClientProtocol = socket
215
            self.__socket = socket
216
217
            # Removing the limit of the received socket.
218
            # Having the default limit can cause an issue
219
            # with first payload of bigger bots.
220
            socket.max_size = None
221
222
            _log.debug(
223
                "Successfully established websocket connection with `%s`",
224
                GatewayConfig.uri()
225
            )
226
227
            if GatewayConfig.compression == "zlib-stream":
228
                # Create an inflator for compressed data as defined in
229
                # https://discord.com/developers/docs/topics/gateway
230
                inflator = zlib.decompressobj()
231
232
            while self.__keep_alive:
233
                try:
234
                    _log.debug("Waiting for new event.")
235
                    msg = await socket.recv()
236
237
                    if msg == "CLOSE":
238
                        break
239
240
                    if isinstance(msg, bytes):
241
                        if GatewayConfig.compression == "zlib-payload":
242
                            msg = zlib.decompress(msg)
243
                        else:
244
                            buffer = bytearray(msg)
245
246
                            while not buffer.endswith(ZLIB_SUFFIX):
247
                                buffer.extend(await socket.recv())
248
249
                            msg = inflator.decompress(buffer).decode('utf-8')
250
251
                    await self.__handler_manager(
252
                        socket,
253
                        GatewayDispatch.from_string(msg),
254
                        loop
255
                    )
256
257
                except ConnectionClosedError as exc:
258
                    _log.debug(
259
                        "The connection with `%s` has been broken unexpectedly."
260
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
261
                    )
262
263
                    await self.close()
264
                    exception = self.__dispatch_errors.get(exc.code)
265
266
                    if isinstance(exception, _InternalPerformReconnectError):
267
                        if self.__reconnect:
268
                            _log.debug("Connection closed, reconnecting...")
269
                            return await self.restart()
270
271
                    raise exception or UnhandledException(
272
                        f"Dispatch error ({exc.code}): {exc.reason}"
273
                    )
274
                except ConnectionClosedOK:
275
                    _log.debug("Connection closed successfully.")
276
            self.__has_closed = True
277
278
    def start_loop(self, *, loop: AbstractEventLoop = None):
279
        """
280
        Instantiate the dispatcher, this will create a connection to the
281
        Discord websocket API on behalf of the client who's token has
282
        been passed.
283
284
        Keyword Arguments:
285
286
        :param loop:
287
            The loop in which the Dispatcher will run. If no loop is
288
            provided it will get a new one.
289
        """
290
        loop = loop or get_event_loop()
291
        self.__keep_alive = True
292
        self.__has_closed = self.__should_restart = False
293
        loop.run_until_complete(self.__dispatcher(loop))
294
        if self.__should_restart:
295
            return self.start_loop(loop=loop)
296
        loop.close()
297
298
    async def restart(self, /, seq: Optional[int] = None):
0 ignored issues
show
introduced by
invalid syntax (<unknown>, line 298)
Loading history...
299
        """
300
        Restart the dispatcher.
301
302
        Parameters
303
        ----------
304
        seq Optional[:class:`int`]
305
            The sequence number of the last dispatched event.
306
            If not provided, the dispatcher will restart with no base
307
            sequence.
308
        """
309
        await self.close()
310
        Heartbeat.update_sequence(seq)
311
        self.__should_restart = True
312
313
    async def close(self):
314
        """
315
        Stop the dispatcher from listening and responding to gateway
316
        events. This should let the client close on itself.
317
        """
318
        if not self.__socket:
319
            _log.error("Cannot close non existing socket socket connection.")
320
            raise RuntimeError("Please open the connection before closing.")
321
322
        _log.debug("Closing connection...")
323
        self.__keep_alive = False
324
325
        self.__socket.messages.append("CLOSE")
326
        # noinspection PyProtectedMember
327
        self.__socket._pop_message_waiter.cancel()
328
        await self.__socket.close()
329
        _log.debug("Successfully closed connection!")
330