Passed
Pull Request — main (#160)
by
unknown
01:21
created

pincer.core.gateway.Dispatcher.start_loop()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 16
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# Copyright Pincer 2021-Present
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
5
from __future__ import annotations
6
7
import logging
8
import zlib
9
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
10
from platform import system
11
from typing import Dict, Callable, Awaitable, Optional
12
13
from websockets import connect
14
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15
from websockets.legacy.client import WebSocketClientProtocol
16
17
from . import __package__
18
from .._config import GatewayConfig
19
from ..core.dispatch import GatewayDispatch
20
from ..core.heartbeat import Heartbeat
21
from ..exceptions import (
22
    PincerError, InvalidTokenError, UnhandledException,
23
    _InternalPerformReconnectError, DisallowedIntentsError
24
)
25
from ..objects import Intents
26
27
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
28
_log = logging.getLogger(__package__)
29
30
31
class Dispatcher:
32
    """
33
    The Dispatcher handles all interactions with discord websocket API.
34
    This also contains the main event loop, and handles the heartbeat.
35
36
    Running the dispatcher will create a connection with the
37
    Discord WebSocket API on behalf of the provided token.
38
39
    This token must be a bot token.
40
    (Which can be found on
41
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
42
    """
43
44
    # TODO: Implement compression
45
    def __init__(
46
            self, token: str, *,
47
            handlers: Dict[int, Handler],
48
            intents: Intents
49
    ) -> None:
50
        """
51
        :param token:
52
            Bot token for discord's API.
53
54
        :param intents:
55
            Represents the discord bot intents.
56
57
        :param handlers:
58
            A hashmap of coroutines with as key the gateway opcode.
59
60
        :raises InvalidTokenError:
61
            Discord Token length is not 59 characters.
62
        """
63
64
        if len(token) != 59:
65
            raise InvalidTokenError(
66
                "Discord Token must have exactly 59 characters."
67
            )
68
69
        self.__token = token
70
        self.__keep_alive = True
71
        self.__socket: Optional[WebSocketClientProtocol] = None
72
        self.__intents = intents
73
74
        async def identify_and_handle_hello(
75
                socket: WebSocketClientProtocol,
76
                payload: GatewayDispatch
77
        ):
78
            """
79
            Identifies the client to the Discord Websocket API, this
80
            gets done when the client receives the ``hello`` (opcode 10)
81
            message from discord. Right after we send our identification
82
            the heartbeat starts.
83
84
            :param socket:
85
                The current socket, which can be used to interact
86
                with the Discord API.
87
88
            :param payload:
89
                The received payload from Discord.
90
            """
91
            _log.debug("Sending authentication/identification message.")
92
93
            await socket.send(self.__hello_socket)
94
            await Heartbeat.handle_hello(socket, payload)
95
96
        async def handle_reconnect(_, payload: GatewayDispatch):
97
            """
98
            Closes the client and then reconnects it.
99
            """
100
            _log.debug("Reconnecting client...")
101
            await self.close()
102
103
            Heartbeat.update_sequence(payload.seq)
104
            self.start_loop()
105
106
        self.__dispatch_handlers: Dict[int, Handler] = {
107
            **handlers,
108
            7: handle_reconnect,
109
            9: handle_reconnect,
110
            10: identify_and_handle_hello,
111
            11: Heartbeat.handle_heartbeat
112
        }
113
114
        self.__dispatch_errors: Dict[int, PincerError] = {
115
            4000: _InternalPerformReconnectError(),
116
            4004: InvalidTokenError(),
117
            4007: _InternalPerformReconnectError(),
118
            4009: _InternalPerformReconnectError(),
119
            4014: DisallowedIntentsError()
120
        }
121
122
    @property
123
    def intents(self):
124
        return self.__intents
125
126
    @property
127
    def __hello_socket(self) -> str:
128
        return str(
129
            GatewayDispatch(
130
                2, {
131
                    "token": self.__token,
132
                    "intents": self.__intents,
133
                    "properties": {
134
                        "$os": system(),
135
                        "$browser": __package__,
136
                        "$device": __package__
137
                    },
138
                    "compress": GatewayConfig.compressed()
139
                }
140
            )
141
        )
142
143
    async def __handler_manager(
144
            self,
145
            socket: WebSocketClientProtocol,
146
            payload: GatewayDispatch,
147
            loop: AbstractEventLoop
148
    ):
149
        """
150
        This manages all handles for given OP codes.
151
        This method gets invoked for every message that is received from
152
        Discord.
153
154
        :meta public:
155
156
        :param socket:
157
            The current socket, which can be used to interact with
158
            the Discord API.
159
160
        :param payload:
161
            The received payload from Discord.
162
163
        :param loop:
164
            The current async loop on which the future is bound.
165
        """
166
        _log.debug(
167
            "New event received, checking if handler exists for opcode: %i",
168
            payload.op
169
        )
170
171
        handler: Handler = self.__dispatch_handlers.get(payload.op)
172
        all_handler: Handler = self.__dispatch_handlers.get(-1)
173
174
        if not handler:
175
            _log.error(
176
                "No handler was found for opcode %i, please report this to the "
177
                "pincer dev team!", payload.op
178
            )
179
180
            if not all_handler:
181
                raise UnhandledException(f"Unhandled payload: {payload}")
182
183
        _log.debug(
184
            "Event handler found, ensuring async future in current loop."
185
        )
186
187
        def execute_handler(event_handler: Handler):
188
            if event_handler:
189
                ensure_future(event_handler(socket, payload), loop=loop)
190
191
        execute_handler(handler)
192
        execute_handler(all_handler)
193
194
    async def __dispatcher(self, loop: AbstractEventLoop):
195
        """
196
        The main event loop.
197
        This handles all interactions with the websocket API.
198
199
        :meta public:
200
201
        :param loop:
202
            The loop in which the dispatcher is running.
203
        """
204
        _log.debug(
205
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
206
        )
207
208
        async with connect(GatewayConfig.uri()) as socket:
209
            self.__socket = socket
210
211
            # Removing the limit of the received socket.
212
            # Having the default limit can cause an issue
213
            # with first payload of bigger bots.
214
            socket.max_size = None
215
216
            _log.debug(
217
                "Successfully established websocket connection with `%s`",
218
                GatewayConfig.uri()
219
            )
220
221
            if compressed := GatewayConfig.compressed():
0 ignored issues
show
introduced by
invalid syntax (<unknown>, line 221)
Loading history...
222
                # Create an inflator for compressed data as defined in
223
                # https://discord.com/developers/docs/topics/gateway
224
                inflator = zlib.decompressobj()
225
226
            while self.__keep_alive:
227
                try:
228
                    _log.debug("Waiting for new event.")
229
                    msg = await socket.recv()
230
231
                    if compressed and isinstance(msg, bytes):
232
                        buffer = bytearray(msg)
233
234
                        while not buffer.endswith(b'\x00\x00\xff\xff'):
235
                            buffer.extend(await socket.recv())
236
237
                        msg = inflator.decompress(buffer).decode('utf-8')
238
239
                    await self.__handler_manager(
240
                        socket,
241
                        GatewayDispatch.from_string(msg),
242
                        loop
243
                    )
244
245
                except ConnectionClosedError as exc:
246
                    _log.debug(
247
                        "The connection with `%s` has been broken unexpectedly."
248
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
249
                    )
250
251
                    await self.close()
252
                    exception = self.__dispatch_errors.get(exc.code)
253
254
                    if isinstance(exception, _InternalPerformReconnectError):
255
                        Heartbeat.update_sequence(0)
256
                        return self.start_loop()
257
258
                    raise exception or UnhandledException(
259
                        f"Dispatch error ({exc.code}): {exc.reason}"
260
                    )
261
                except ConnectionClosedOK:
262
                    _log.debug("Connection closed successfully.")
263
264
    def start_loop(self, *, loop: AbstractEventLoop = None):
265
        """
266
        Instantiate the dispatcher, this will create a connection to the
267
        Discord websocket API on behalf of the client who's token has
268
        been passed.
269
270
        Keyword Arguments:
271
272
        :param loop:
273
            The loop in which the Dispatcher will run. If no loop is
274
            provided it will get a new one.
275
        """
276
        _log.debug("Starting GatewayDispatcher")
277
        loop = loop or get_event_loop()
278
        loop.run_until_complete(self.__dispatcher(loop))
279
        loop.close()
280
281
    async def close(self):
282
        """
283
        Stop the dispatcher from listening and responding to gateway
284
        events. This should let the client close on itself.
285
        """
286
        if not self.__socket:
287
            _log.error("Cannot close non existing socket socket connection.")
288
            raise RuntimeError("Please open the connection before closing.")
289
290
        _log.debug(
291
            "Setting keep_alive to False, this will terminate the heartbeat."
292
        )
293
294
        self.__keep_alive = False
295
        await self.__socket.close()
296