Passed
Pull Request — main (#160)
by
unknown
01:23
created

pincer.core.gateway.Dispatcher.__dispatcher()   C

Complexity

Conditions 10

Size

Total Lines 72
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 39
dl 0
loc 72
rs 5.9999
c 0
b 0
f 0
cc 10
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pincer.core.gateway.Dispatcher.__dispatcher() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
5
from __future__ import annotations
6
7
import logging
8
import zlib
9
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
10
from platform import system
11
from typing import Dict, Callable, Awaitable, Optional
12
13
from websockets import connect
14
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15
from websockets.legacy.client import WebSocketClientProtocol
16
17
from . import __package__
18
from .._config import GatewayConfig
19
from ..core.dispatch import GatewayDispatch
20
from ..core.heartbeat import Heartbeat
21
from ..exceptions import (
22
    PincerError, InvalidTokenError, UnhandledException,
23
    _InternalPerformReconnectError, DisallowedIntentsError
24
)
25
from ..objects import Intents
26
27
ZLIB_SUFFIX = b'\x00\x00\xff\xff'
28
29
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
30
_log = logging.getLogger(__package__)
31
32
33
class Dispatcher:
34
    """
35
    The Dispatcher handles all interactions with discord websocket API.
36
    This also contains the main event loop, and handles the heartbeat.
37
38
    Running the dispatcher will create a connection with the
39
    Discord WebSocket API on behalf of the provided token.
40
41
    This token must be a bot token.
42
    (Which can be found on
43
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
44
    """
45
46
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
47
    def __init__(
48
            self, token: str, *,
49
            handlers: Dict[int, Handler],
50
            intents: Intents
51
    ) -> None:
52
        """
53
        :param token:
54
            Bot token for discord's API.
55
56
        :param intents:
57
            Represents the discord bot intents.
58
59
        :param handlers:
60
            A hashmap of coroutines with as key the gateway opcode.
61
62
        :raises InvalidTokenError:
63
            Discord Token length is not 59 characters.
64
        """
65
66
        if len(token) != 59:
67
            raise InvalidTokenError(
68
                "Discord Token must have exactly 59 characters."
69
            )
70
71
        self.__token = token
72
        self.__keep_alive = True
73
        self.__socket: Optional[WebSocketClientProtocol] = None
74
        self.__intents = intents
75
76
        async def identify_and_handle_hello(
77
                socket: WebSocketClientProtocol,
78
                payload: GatewayDispatch
79
        ):
80
            """
81
            Identifies the client to the Discord Websocket API, this
82
            gets done when the client receives the ``hello`` (opcode 10)
83
            message from discord. Right after we send our identification
84
            the heartbeat starts.
85
86
            :param socket:
87
                The current socket, which can be used to interact
88
                with the Discord API.
89
90
            :param payload:
91
                The received payload from Discord.
92
            """
93
            _log.debug("Sending authentication/identification message.")
94
95
            await socket.send(self.__hello_socket)
96
            await Heartbeat.handle_hello(socket, payload)
97
98
        async def handle_reconnect(_, payload: GatewayDispatch):
99
            """
100
            Closes the client and then reconnects it.
101
            """
102
            _log.debug("Reconnecting client...")
103
            await self.close()
104
105
            Heartbeat.update_sequence(payload.seq)
106
            self.start_loop()
107
108
        self.__dispatch_handlers: Dict[int, Handler] = {
109
            **handlers,
110
            7: handle_reconnect,
111
            9: handle_reconnect,
112
            10: identify_and_handle_hello,
113
            11: Heartbeat.handle_heartbeat
114
        }
115
116
        self.__dispatch_errors: Dict[int, PincerError] = {
117
            4000: _InternalPerformReconnectError(),
118
            4004: InvalidTokenError(),
119
            4007: _InternalPerformReconnectError(),
120
            4009: _InternalPerformReconnectError(),
121
            4014: DisallowedIntentsError()
122
        }
123
124
    @property
125
    def intents(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
126
        return self.__intents
127
128
    @property
129
    def __hello_socket(self) -> str:
130
        return str(
131
            GatewayDispatch(
132
                2, {
133
                    "token": self.__token,
134
                    "intents": self.__intents,
135
                    "properties": {
136
                        "$os": system(),
137
                        "$browser": __package__,
138
                        "$device": __package__
139
                    },
140
                    "compress": GatewayConfig.compressed()
141
                }
142
            )
143
        )
144
145
    async def __handler_manager(
146
            self,
147
            socket: WebSocketClientProtocol,
148
            payload: GatewayDispatch,
149
            loop: AbstractEventLoop
150
    ):
151
        """
152
        This manages all handles for given OP codes.
153
        This method gets invoked for every message that is received from
154
        Discord.
155
156
        :meta public:
157
158
        :param socket:
159
            The current socket, which can be used to interact with
160
            the Discord API.
161
162
        :param payload:
163
            The received payload from Discord.
164
165
        :param loop:
166
            The current async loop on which the future is bound.
167
        """
168
        _log.debug(
169
            "New event received, checking if handler exists for opcode: %i",
170
            payload.op
171
        )
172
173
        handler: Handler = self.__dispatch_handlers.get(payload.op)
174
        all_handler: Handler = self.__dispatch_handlers.get(-1)
175
176
        if not handler:
177
            _log.error(
178
                "No handler was found for opcode %i, please report this to the "
179
                "pincer dev team!", payload.op
180
            )
181
182
            if not all_handler:
183
                raise UnhandledException(f"Unhandled payload: {payload}")
184
185
        _log.debug(
186
            "Event handler found, ensuring async future in current loop."
187
        )
188
189
        def execute_handler(event_handler: Handler):
190
            if event_handler:
191
                ensure_future(event_handler(socket, payload), loop=loop)
192
193
        execute_handler(handler)
194
        execute_handler(all_handler)
195
196
    async def __dispatcher(self, loop: AbstractEventLoop):
197
        """
198
        The main event loop.
199
        This handles all interactions with the websocket API.
200
201
        :meta public:
202
203
        :param loop:
204
            The loop in which the dispatcher is running.
205
        """
206
        _log.debug(
207
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
208
        )
209
210
        async with connect(GatewayConfig.uri()) as socket:
211
            self.__socket = socket
212
213
            # Removing the limit of the received socket.
214
            # Having the default limit can cause an issue
215
            # with first payload of bigger bots.
216
            socket.max_size = None
217
218
            _log.debug(
219
                "Successfully established websocket connection with `%s`",
220
                GatewayConfig.uri()
221
            )
222
223
            if GatewayConfig.compression == "zlib-stream":
224
                # Create an inflator for compressed data as defined in
225
                # https://discord.com/developers/docs/topics/gateway
226
                inflator = zlib.decompressobj()
227
228
            while self.__keep_alive:
229
                try:
230
                    _log.debug("Waiting for new event.")
231
                    msg = await socket.recv()
232
233
                    if isinstance(msg, bytes):
234
                        if GatewayConfig.compression == "zlib-payload":
235
                            msg = zlib.decompress(msg)
236
                        else:
237
                            buffer = bytearray(msg)
238
239
                            while not buffer.endswith(ZLIB_SUFFIX):
240
                                buffer.extend(await socket.recv())
241
242
                            msg = inflator.decompress(buffer).decode('utf-8')
0 ignored issues
show
introduced by
The variable inflator does not seem to be defined in case GatewayConfig.compression == "zlib-stream" on line 223 is False. Are you sure this can never be the case?
Loading history...
243
244
                    await self.__handler_manager(
245
                        socket,
246
                        GatewayDispatch.from_string(msg),
247
                        loop
248
                    )
249
250
                except ConnectionClosedError as exc:
251
                    _log.debug(
252
                        "The connection with `%s` has been broken unexpectedly."
253
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
254
                    )
255
256
                    await self.close()
257
                    exception = self.__dispatch_errors.get(exc.code)
258
259
                    if isinstance(exception, _InternalPerformReconnectError):
260
                        Heartbeat.update_sequence(0)
261
                        return self.start_loop()
262
263
                    raise exception or UnhandledException(
264
                        f"Dispatch error ({exc.code}): {exc.reason}"
265
                    )
266
                except ConnectionClosedOK:
267
                    _log.debug("Connection closed successfully.")
268
269
    def start_loop(self, *, loop: AbstractEventLoop = None):
270
        """
271
        Instantiate the dispatcher, this will create a connection to the
272
        Discord websocket API on behalf of the client who's token has
273
        been passed.
274
275
        Keyword Arguments:
276
277
        :param loop:
278
            The loop in which the Dispatcher will run. If no loop is
279
            provided it will get a new one.
280
        """
281
        _log.debug("Starting GatewayDispatcher")
282
        loop = loop or get_event_loop()
283
        loop.run_until_complete(self.__dispatcher(loop))
284
        loop.close()
285
286
    async def close(self):
287
        """
288
        Stop the dispatcher from listening and responding to gateway
289
        events. This should let the client close on itself.
290
        """
291
        if not self.__socket:
292
            _log.error("Cannot close non existing socket socket connection.")
293
            raise RuntimeError("Please open the connection before closing.")
294
295
        _log.debug(
296
            "Setting keep_alive to False, this will terminate the heartbeat."
297
        )
298
299
        self.__keep_alive = False
300
        await self.__socket.close()
301