Passed
Pull Request — main (#160)
by
unknown
01:25
created

pincer.core.gateway.Dispatcher.__dispatcher()   C

Complexity

Conditions 10

Size

Total Lines 72
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 39
dl 0
loc 72
rs 5.9999
c 0
b 0
f 0
cc 10
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pincer.core.gateway.Dispatcher.__dispatcher() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
5
from __future__ import annotations
6
7
import logging
8
import zlib
9
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
10
from platform import system
11
from typing import Dict, Callable, Awaitable, Optional
12
13
from websockets import connect
14
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15
from websockets.legacy.client import WebSocketClientProtocol
16
17
from . import __package__
18
from .._config import GatewayConfig
19
from ..core.dispatch import GatewayDispatch
20
from ..core.heartbeat import Heartbeat
21
from ..exceptions import (
22
    PincerError, InvalidTokenError, UnhandledException,
23
    _InternalPerformReconnectError, DisallowedIntentsError
24
)
25
from ..objects import Intents
26
27
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
28
_log = logging.getLogger(__package__)
29
30
31
class Dispatcher:
32
    """
33
    The Dispatcher handles all interactions with discord websocket API.
34
    This also contains the main event loop, and handles the heartbeat.
35
36
    Running the dispatcher will create a connection with the
37
    Discord WebSocket API on behalf of the provided token.
38
39
    This token must be a bot token.
40
    (Which can be found on
41
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
42
    """
43
44
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
45
    def __init__(
46
            self, token: str, *,
47
            handlers: Dict[int, Handler],
48
            intents: Intents
49
    ) -> None:
50
        """
51
        :param token:
52
            Bot token for discord's API.
53
54
        :param intents:
55
            Represents the discord bot intents.
56
57
        :param handlers:
58
            A hashmap of coroutines with as key the gateway opcode.
59
60
        :raises InvalidTokenError:
61
            Discord Token length is not 59 characters.
62
        """
63
64
        if len(token) != 59:
65
            raise InvalidTokenError(
66
                "Discord Token must have exactly 59 characters."
67
            )
68
69
        self.__token = token
70
        self.__keep_alive = True
71
        self.__socket: Optional[WebSocketClientProtocol] = None
72
        self.__intents = intents
73
74
        async def identify_and_handle_hello(
75
                socket: WebSocketClientProtocol,
76
                payload: GatewayDispatch
77
        ):
78
            """
79
            Identifies the client to the Discord Websocket API, this
80
            gets done when the client receives the ``hello`` (opcode 10)
81
            message from discord. Right after we send our identification
82
            the heartbeat starts.
83
84
            :param socket:
85
                The current socket, which can be used to interact
86
                with the Discord API.
87
88
            :param payload:
89
                The received payload from Discord.
90
            """
91
            _log.debug("Sending authentication/identification message.")
92
93
            await socket.send(self.__hello_socket)
94
            await Heartbeat.handle_hello(socket, payload)
95
96
        async def handle_reconnect(_, payload: GatewayDispatch):
97
            """
98
            Closes the client and then reconnects it.
99
            """
100
            _log.debug("Reconnecting client...")
101
            await self.close()
102
103
            Heartbeat.update_sequence(payload.seq)
104
            self.start_loop()
105
106
        self.__dispatch_handlers: Dict[int, Handler] = {
107
            **handlers,
108
            7: handle_reconnect,
109
            9: handle_reconnect,
110
            10: identify_and_handle_hello,
111
            11: Heartbeat.handle_heartbeat
112
        }
113
114
        self.__dispatch_errors: Dict[int, PincerError] = {
115
            4000: _InternalPerformReconnectError(),
116
            4004: InvalidTokenError(),
117
            4007: _InternalPerformReconnectError(),
118
            4009: _InternalPerformReconnectError(),
119
            4014: DisallowedIntentsError()
120
        }
121
122
    @property
123
    def intents(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
124
        return self.__intents
125
126
    @property
127
    def __hello_socket(self) -> str:
128
        return str(
129
            GatewayDispatch(
130
                2, {
131
                    "token": self.__token,
132
                    "intents": self.__intents,
133
                    "properties": {
134
                        "$os": system(),
135
                        "$browser": __package__,
136
                        "$device": __package__
137
                    },
138
                    "compress": GatewayConfig.compressed()
139
                }
140
            )
141
        )
142
143
    async def __handler_manager(
144
            self,
145
            socket: WebSocketClientProtocol,
146
            payload: GatewayDispatch,
147
            loop: AbstractEventLoop
148
    ):
149
        """
150
        This manages all handles for given OP codes.
151
        This method gets invoked for every message that is received from
152
        Discord.
153
154
        :meta public:
155
156
        :param socket:
157
            The current socket, which can be used to interact with
158
            the Discord API.
159
160
        :param payload:
161
            The received payload from Discord.
162
163
        :param loop:
164
            The current async loop on which the future is bound.
165
        """
166
        _log.debug(
167
            "New event received, checking if handler exists for opcode: %i",
168
            payload.op
169
        )
170
171
        handler: Handler = self.__dispatch_handlers.get(payload.op)
172
        all_handler: Handler = self.__dispatch_handlers.get(-1)
173
174
        if not handler:
175
            _log.error(
176
                "No handler was found for opcode %i, please report this to the "
177
                "pincer dev team!", payload.op
178
            )
179
180
            if not all_handler:
181
                raise UnhandledException(f"Unhandled payload: {payload}")
182
183
        _log.debug(
184
            "Event handler found, ensuring async future in current loop."
185
        )
186
187
        def execute_handler(event_handler: Handler):
188
            if event_handler:
189
                ensure_future(event_handler(socket, payload), loop=loop)
190
191
        execute_handler(handler)
192
        execute_handler(all_handler)
193
194
    async def __dispatcher(self, loop: AbstractEventLoop):
195
        """
196
        The main event loop.
197
        This handles all interactions with the websocket API.
198
199
        :meta public:
200
201
        :param loop:
202
            The loop in which the dispatcher is running.
203
        """
204
        _log.debug(
205
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
206
        )
207
208
        async with connect(GatewayConfig.uri()) as socket:
209
            self.__socket = socket
210
211
            # Removing the limit of the received socket.
212
            # Having the default limit can cause an issue
213
            # with first payload of bigger bots.
214
            socket.max_size = None
215
216
            _log.debug(
217
                "Successfully established websocket connection with `%s`",
218
                GatewayConfig.uri()
219
            )
220
221
            if GatewayConfig.compression == "zlib-stream":
222
                # Create an inflator for compressed data as defined in
223
                # https://discord.com/developers/docs/topics/gateway
224
                inflator = zlib.decompressobj()
225
226
            while self.__keep_alive:
227
                try:
228
                    _log.debug("Waiting for new event.")
229
                    msg = await socket.recv()
230
231
                    if isinstance(msg, bytes):
232
                        if GatewayConfig.compression == "zlib-payload":
233
                            msg = zlib.decompress(msg)
234
                        else:
235
                            buffer = bytearray(msg)
236
237
                            while not buffer.endswith(b'\x00\x00\xff\xff'):
238
                                buffer.extend(await socket.recv())
239
240
                            msg = inflator.decompress(buffer).decode('utf-8')
0 ignored issues
show
introduced by
The variable inflator does not seem to be defined in case GatewayConfig.compression == "zlib-stream" on line 221 is False. Are you sure this can never be the case?
Loading history...
241
242
                    await self.__handler_manager(
243
                        socket,
244
                        GatewayDispatch.from_string(msg),
245
                        loop
246
                    )
247
248
                except ConnectionClosedError as exc:
249
                    _log.debug(
250
                        "The connection with `%s` has been broken unexpectedly."
251
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
252
                    )
253
254
                    await self.close()
255
                    exception = self.__dispatch_errors.get(exc.code)
256
257
                    if isinstance(exception, _InternalPerformReconnectError):
258
                        Heartbeat.update_sequence(0)
259
                        return self.start_loop()
260
261
                    raise exception or UnhandledException(
262
                        f"Dispatch error ({exc.code}): {exc.reason}"
263
                    )
264
                except ConnectionClosedOK:
265
                    _log.debug("Connection closed successfully.")
266
267
    def start_loop(self, *, loop: AbstractEventLoop = None):
268
        """
269
        Instantiate the dispatcher, this will create a connection to the
270
        Discord websocket API on behalf of the client who's token has
271
        been passed.
272
273
        Keyword Arguments:
274
275
        :param loop:
276
            The loop in which the Dispatcher will run. If no loop is
277
            provided it will get a new one.
278
        """
279
        _log.debug("Starting GatewayDispatcher")
280
        loop = loop or get_event_loop()
281
        loop.run_until_complete(self.__dispatcher(loop))
282
        loop.close()
283
284
    async def close(self):
285
        """
286
        Stop the dispatcher from listening and responding to gateway
287
        events. This should let the client close on itself.
288
        """
289
        if not self.__socket:
290
            _log.error("Cannot close non existing socket socket connection.")
291
            raise RuntimeError("Please open the connection before closing.")
292
293
        _log.debug(
294
            "Setting keep_alive to False, this will terminate the heartbeat."
295
        )
296
297
        self.__keep_alive = False
298
        await self.__socket.close()
299