Passed
Pull Request — main (#63)
by P
01:20
created

pincer.core.gateway   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 271
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 14
eloc 119
dl 0
loc 271
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A Dispatcher.__hello_socket() 0 11 1
A Dispatcher.__handler_manager() 0 43 2
B Dispatcher.__dispatcher() 0 49 6
A Dispatcher.close() 0 15 2
A Dispatcher.__init__() 0 63 2
A Dispatcher.start_loop() 0 16 1
1
# -*- coding: utf-8 -*-
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# MIT License
3
#
4
# Copyright (c) 2021 Pincer
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining
7
# a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
21
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
22
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
23
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24
25
from __future__ import annotations
26
27
import logging
28
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
29
from platform import system
30
from typing import Dict, Callable, Awaitable, Optional
31
32
from websockets import connect
33
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
34
from websockets.legacy.client import WebSocketClientProtocol
35
36
from . import __package__
37
from .._config import GatewayConfig
38
from ..core.dispatch import GatewayDispatch
39
from ..core.heartbeat import Heartbeat
40
from ..exceptions import (
41
    PincerError, InvalidTokenError, UnhandledException,
42
    _InternalPerformReconnectError, DisallowedIntentsError
43
)
44
45
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
46
_log = logging.getLogger(__package__)
47
48
49
class Dispatcher:
50
    """
51
    The Dispatcher handles all interactions with discord websocket API.
52
    This also contains the main event loop, and handles the heartbeat.
53
54
    Running the dispatcher will create a connection with the
55
    Discord WebSocket API on behalf of the provided token.
56
57
    This token must be a bot token.
58
    (Which can be found on
59
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
60
    """
61
62
    # TODO: Add intents argument
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
63
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
64
    def __init__(self, token: str, *, handlers: Dict[int, Handler]) -> None:
65
        """
66
        :param token:
67
            Bot token for discord's API.
68
        :raises InvalidTokenError:
69
            Discord Token length is not 59 characters.
70
        """
71
72
        if len(token) != 59:
73
            raise InvalidTokenError(
74
                "Discord Token must have exactly 59 characters."
75
            )
76
77
        self.__token = token
78
        self.__keep_alive = True
79
        self.__socket: Optional[WebSocketClientProtocol] = None
80
81
        async def identify_and_handle_hello(
82
                socket: WebSocketClientProtocol,
83
                payload: GatewayDispatch
84
        ):
85
            """
86
            Identifies the client to the Discord Websocket API, this
87
            gets done when the client receives the ``hello`` (opcode 10)
88
            message from discord. Right after we send our identification
89
            the heartbeat starts.
90
91
            :param socket:
92
                The current socket, which can be used to interact
93
                with the Discord API.
94
95
            :param payload:
96
                The received payload from Discord.
97
            """
98
            _log.debug("Sending authentication/identification message.")
99
100
            await socket.send(self.__hello_socket)
101
            await Heartbeat.handle_hello(socket, payload)
102
103
        async def handle_reconnect(_, payload: GatewayDispatch):
104
            """
105
            Closes the client and then reconnects it.
106
            """
107
            _log.debug("Reconnecting client...")
108
            await self.close()
109
110
            Heartbeat.update_sequence(payload.seq)
111
            self.start_loop()
112
113
        self.__dispatch_handlers: Dict[int, Handler] = {
114
            **handlers,
115
            7: handle_reconnect,
116
            9: handle_reconnect,
117
            10: identify_and_handle_hello,
118
            11: Heartbeat.handle_heartbeat
119
        }
120
121
        self.__dispatch_errors: Dict[int, PincerError] = {
122
            4000: _InternalPerformReconnectError(),
123
            4004: InvalidTokenError(),
124
            4007: _InternalPerformReconnectError(),
125
            4009: _InternalPerformReconnectError(),
126
            4014: DisallowedIntentsError()
127
        }
128
129
    @property
130
    def __hello_socket(self) -> str:
131
        return str(
132
            GatewayDispatch(
133
                2, {
134
                    "token": self.__token,
135
                    "intents": 0,
136
                    "properties": {
137
                        "$os": system(),
138
                        "$browser": __package__,
139
                        "$device": __package__
140
                    }
141
                }
142
            )
143
        )
144
145
    async def __handler_manager(
146
            self,
147
            socket: WebSocketClientProtocol,
148
            payload: GatewayDispatch,
149
            loop: AbstractEventLoop
150
    ):
151
        """
152
        This manages all handles for given OP codes.
153
        This method gets invoked for every message that is received from
154
        Discord.
155
156
        :meta public:
157
158
        :param socket:
159
            The current socket, which can be used to interact with
160
            the Discord API.
161
162
        :param payload:
163
            The received payload from Discord.
164
165
        :param loop:
166
            The current async loop on which the future is bound.
167
        """
168
        _log.debug(
169
            "New event received, checking if handler exists for opcode: %i",
170
            payload.op
171
        )
172
173
        handler: Handler = self.__dispatch_handlers.get(payload.op)
174
175
        if not handler:
176
            _log.error(
177
                "No handler was found for opcode %i, please report this to the "
178
                "pincer dev team!", payload.op
179
            )
180
181
            raise UnhandledException(f"Unhandled payload: {payload}")
182
183
        _log.debug(
184
            "Event handler found, ensuring async future in current loop."
185
        )
186
187
        ensure_future(handler(socket, payload), loop=loop)
188
189
    async def __dispatcher(self, loop: AbstractEventLoop):
190
        """
191
        The main event loop.
192
        This handles all interactions with the websocket API.
193
194
        :meta public:
195
196
        :param loop:
197
            The loop in which the dispatcher is running.
198
        """
199
        _log.debug(
200
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
201
        )
202
203
        async with connect(GatewayConfig.uri()) as socket:
204
            self.__socket = socket
205
            _log.debug(
206
                "Successfully established websocket connection with `%s`",
207
                GatewayConfig.uri()
208
            )
209
210
            while self.__keep_alive:
211
                try:
212
                    _log.debug("Waiting for new event.")
213
                    await self.__handler_manager(
214
                        socket,
215
                        GatewayDispatch.from_string(await socket.recv()),
216
                        loop
217
                    )
218
219
                except ConnectionClosedError as exc:
220
                    _log.debug(
221
                        "The connection with `%s` has been broken unexpectedly."
222
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
223
                    )
224
225
                    await self.close()
226
                    exception = self.__dispatch_errors.get(exc.code)
227
228
                    if isinstance(exception, _InternalPerformReconnectError):
229
                        Heartbeat.update_sequence(0)
230
                        await self.close()
231
                        return self.start_loop()
232
233
                    raise exception or UnhandledException(
234
                        f"Dispatch error ({exc.code}): {exc.reason}"
235
                    )
236
                except ConnectionClosedOK:
237
                    _log.debug("Connection closed successfully.")
238
239
    def start_loop(self, *, loop: AbstractEventLoop = None):
240
        """
241
        Instantiate the dispatcher, this will create a connection to the
242
        Discord websocket API on behalf of the client who's token has
243
        been passed.
244
245
        Keyword Arguments:
246
247
        :param loop:
248
            The loop in which the Dispatcher will run. If no loop is
249
            provided it will get a new one.
250
        """
251
        _log.debug("Starting GatewayDispatcher")
252
        loop = loop or get_event_loop()
253
        loop.run_until_complete(self.__dispatcher(loop))
254
        loop.close()
255
256
    async def close(self):
257
        """
258
        Stop the dispatcher from listening and responding to gateway
259
        events. This should let the client close on itself.
260
        """
261
        if not self.__socket:
262
            _log.error("Cannot close non existing socket socket connection.")
263
            raise RuntimeError("Please open the connection before closing.")
264
265
        _log.debug(
266
            "Setting keep_alive to False, this will terminate the heartbeat."
267
        )
268
269
        self.__keep_alive = False
270
        await self.__socket.close()
271