Passed
Pull Request — main (#167)
by
unknown
01:35
created

pincer.core.gateway.Dispatcher.__hello_socket()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 13
rs 9.8
c 0
b 0
f 0
cc 1
nop 1
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
5
from __future__ import annotations
6
7
import logging
8
import zlib
9
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
10
from platform import system
11
from typing import Dict, Callable, Awaitable, Optional
12
13
from websockets import connect
14
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15
from websockets.legacy.client import WebSocketClientProtocol
16
17
from . import __package__
18
from .._config import GatewayConfig
19
from ..core.dispatch import GatewayDispatch
20
from ..core.heartbeat import Heartbeat
21
from ..exceptions import (
22
    PincerError, InvalidTokenError, UnhandledException,
23
    _InternalPerformReconnectError, DisallowedIntentsError
24
)
25
from ..objects import Intents
26
27
ZLIB_SUFFIX = b'\x00\x00\xff\xff'
28
29
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
30
_log = logging.getLogger(__package__)
31
32
33
class Dispatcher:
34
    """
35
    The Dispatcher handles all interactions with discord websocket API.
36
    This also contains the main event loop, and handles the heartbeat.
37
38
    Running the dispatcher will create a connection with the
39
    Discord WebSocket API on behalf of the provided token.
40
41
    This token must be a bot token.
42
    (Which can be found on
43
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
44
    """
45
46
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
47
    def __init__(
48
            self, token: str, *,
49
            handlers: Dict[int, Handler],
50
            intents: Intents,
51
            reconnect: bool
52
    ) -> None:
53
        """
54
        :param token:
55
            Bot token for discord's API.
56
57
        :param intents:
58
            Represents the discord bot intents.
59
60
        :param handlers:
61
            A hashmap of coroutines with as key the gateway opcode.
62
63
        :raises InvalidTokenError:
64
            Discord Token length is not 59 characters.
65
66
        auto_reconnect :class:`bool`
67
            Whether the dispatcher should automatically reconnect.
68
        """
69
70
        if len(token) != 59:
71
            raise InvalidTokenError(
72
                "Discord Token must have exactly 59 characters."
73
            )
74
75
        self.__token = token
76
        self.__keep_alive = True
77
        self.__socket: Optional[WebSocketClientProtocol] = None
78
        self.__intents = intents
79
        self.__reconnect = reconnect
80
81
        async def identify_and_handle_hello(
82
                socket: WebSocketClientProtocol,
83
                payload: GatewayDispatch
84
        ):
85
            """
86
            Identifies the client to the Discord Websocket API, this
87
            gets done when the client receives the ``hello`` (opcode 10)
88
            message from discord. Right after we send our identification
89
            the heartbeat starts.
90
91
            :param socket:
92
                The current socket, which can be used to interact
93
                with the Discord API.
94
95
            :param payload:
96
                The received payload from Discord.
97
            """
98
            _log.debug("Sending authentication/identification message.")
99
100
            await socket.send(self.__hello_socket)
101
            await Heartbeat.handle_hello(socket, payload)
102
103
        async def handle_reconnect(_, payload: GatewayDispatch):
104
            """
105
            Closes the client and then reconnects it.
106
            """
107
            _log.debug("Reconnecting client...")
108
            await self.close()
109
110
            Heartbeat.update_sequence(payload.seq)
111
            self.start_loop()
112
113
        self.__dispatch_handlers: Dict[int, Handler] = {
114
            **handlers,
115
            7: handle_reconnect,
116
            9: handle_reconnect,
117
            10: identify_and_handle_hello,
118
            11: Heartbeat.handle_heartbeat
119
        }
120
121
        self.__dispatch_errors: Dict[int, PincerError] = {
122
            1006: _InternalPerformReconnectError(),
123
            4000: _InternalPerformReconnectError(),
124
            4004: InvalidTokenError(),
125
            4007: _InternalPerformReconnectError(),
126
            4009: _InternalPerformReconnectError(),
127
            4014: DisallowedIntentsError()
128
        }
129
130
    @property
131
    def intents(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
132
        return self.__intents
133
134
    @property
135
    def __hello_socket(self) -> str:
136
        return str(
137
            GatewayDispatch(
138
                2, {
139
                    "token": self.__token,
140
                    "intents": self.__intents,
141
                    "properties": {
142
                        "$os": system(),
143
                        "$browser": __package__,
144
                        "$device": __package__
145
                    },
146
                    "compress": GatewayConfig.compressed()
147
                }
148
            )
149
        )
150
151
    async def __handler_manager(
152
            self,
153
            socket: WebSocketClientProtocol,
154
            payload: GatewayDispatch,
155
            loop: AbstractEventLoop
156
    ):
157
        """
158
        This manages all handles for given OP codes.
159
        This method gets invoked for every message that is received from
160
        Discord.
161
162
        :meta public:
163
164
        :param socket:
165
            The current socket, which can be used to interact with
166
            the Discord API.
167
168
        :param payload:
169
            The received payload from Discord.
170
171
        :param loop:
172
            The current async loop on which the future is bound.
173
        """
174
        _log.debug(
175
            "New event received, checking if handler exists for opcode: %i",
176
            payload.op
177
        )
178
179
        handler: Handler = self.__dispatch_handlers.get(payload.op)
180
        all_handler: Handler = self.__dispatch_handlers.get(-1)
181
182
        if not handler:
183
            _log.error(
184
                "No handler was found for opcode %i, please report this to the "
185
                "pincer dev team!", payload.op
186
            )
187
188
            if not all_handler:
189
                raise UnhandledException(f"Unhandled payload: {payload}")
190
191
        _log.debug(
192
            "Event handler found, ensuring async future in current loop."
193
        )
194
195
        def execute_handler(event_handler: Handler):
196
            if event_handler:
197
                ensure_future(event_handler(socket, payload), loop=loop)
198
199
        execute_handler(handler)
200
        execute_handler(all_handler)
201
202
    async def __dispatcher(self, loop: AbstractEventLoop):
203
        """
204
        The main event loop.
205
        This handles all interactions with the websocket API.
206
207
        :meta public:
208
209
        :param loop:
210
            The loop in which the dispatcher is running.
211
        """
212
        _log.debug(
213
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
214
        )
215
216
        async with connect(GatewayConfig.uri()) as socket:
217
            self.__socket = socket
218
219
            # Removing the limit of the received socket.
220
            # Having the default limit can cause an issue
221
            # with first payload of bigger bots.
222
            socket.max_size = None
223
224
            _log.debug(
225
                "Successfully established websocket connection with `%s`",
226
                GatewayConfig.uri()
227
            )
228
229
            if GatewayConfig.compression == "zlib-stream":
230
                # Create an inflator for compressed data as defined in
231
                # https://discord.com/developers/docs/topics/gateway
232
                inflator = zlib.decompressobj()
233
234
            while self.__keep_alive:
235
                try:
236
                    _log.debug("Waiting for new event.")
237
                    msg = await socket.recv()
238
239
                    if isinstance(msg, bytes):
240
                        if GatewayConfig.compression == "zlib-payload":
241
                            msg = zlib.decompress(msg)
242
                        else:
243
                            buffer = bytearray(msg)
244
245
                            while not buffer.endswith(ZLIB_SUFFIX):
246
                                buffer.extend(await socket.recv())
247
248
                            msg = inflator.decompress(buffer).decode('utf-8')
0 ignored issues
show
introduced by
The variable inflator does not seem to be defined in case GatewayConfig.compression == "zlib-stream" on line 229 is False. Are you sure this can never be the case?
Loading history...
249
250
                    await self.__handler_manager(
251
                        socket,
252
                        GatewayDispatch.from_string(msg),
253
                        loop
254
                    )
255
256
                except ConnectionClosedError as exc:
257
                    _log.debug(
258
                        "The connection with `%s` has been broken unexpectedly."
259
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
260
                    )
261
262
                    await self.close()
263
                    exception = self.__dispatch_errors.get(exc.code)
264
265
                    if isinstance(exception, _InternalPerformReconnectError):
266
                        if self.__reconnect:
267
                            _log.debug("Connection closed, reconnecting...")
268
                            Heartbeat.update_sequence(0)
269
                            return self.start_loop(loop=loop)
270
271
                    raise exception or UnhandledException(
272
                        f"Dispatch error ({exc.code}): {exc.reason}"
273
                    )
274
                except ConnectionClosedOK:
275
                    _log.debug("Connection closed successfully.")
276
277
    def start_loop(self, *, loop: AbstractEventLoop = None):
278
        """
279
        Instantiate the dispatcher, this will create a connection to the
280
        Discord websocket API on behalf of the client who's token has
281
        been passed.
282
283
        Keyword Arguments:
284
285
        :param loop:
286
            The loop in which the Dispatcher will run. If no loop is
287
            provided it will get a new one.
288
        """
289
        _log.debug("Starting GatewayDispatcher")
290
        loop = loop or get_event_loop()
291
        loop.run_until_complete(self.__dispatcher(loop))
292
        loop.close()
293
294
    async def close(self):
295
        """
296
        Stop the dispatcher from listening and responding to gateway
297
        events. This should let the client close on itself.
298
        """
299
        if not self.__socket:
300
            _log.error("Cannot close non existing socket socket connection.")
301
            raise RuntimeError("Please open the connection before closing.")
302
303
        _log.debug(
304
            "Setting keep_alive to False, this will terminate the heartbeat."
305
        )
306
307
        self.__keep_alive = False
308
        await self.__socket.close()
309