Passed
Push — main ( d0b3ac...149404 )
by
unknown
01:45
created

pincer.core.gateway.Dispatcher.intents()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# MIT License
3
#
4
# Copyright (c) 2021 Pincer
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining
7
# a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
21
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
22
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
23
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24
25
from __future__ import annotations
26
27
import logging
28
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
29
from platform import system
30
from typing import Dict, Callable, Awaitable, Optional
31
32
from websockets import connect
33
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
34
from websockets.legacy.client import WebSocketClientProtocol
35
36
from . import __package__
37
from .._config import GatewayConfig
38
from ..core.dispatch import GatewayDispatch
39
from ..core.heartbeat import Heartbeat
40
from ..exceptions import (
41
    PincerError, InvalidTokenError, UnhandledException,
42
    _InternalPerformReconnectError, DisallowedIntentsError
43
)
44
from ..objects import Intents
45
46
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
47
_log = logging.getLogger(__package__)
48
49
50
class Dispatcher:
51
    """
52
    The Dispatcher handles all interactions with discord websocket API.
53
    This also contains the main event loop, and handles the heartbeat.
54
55
    Running the dispatcher will create a connection with the
56
    Discord WebSocket API on behalf of the provided token.
57
58
    This token must be a bot token.
59
    (Which can be found on
60
    `<https://discord.com/developers/applications/<bot_id>/bot>`_)
61
    """
62
63
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
64
    def __init__(self, token: str, *,
65
                 handlers: Dict[int, Handler],
66
                 intents: Intents) -> None:
67
        """
68
        :param token:
69
            Bot token for discord's API.
70
71
        :param intents:
72
            Represents the discord bot intents.
73
74
        :param handlers:
75
            A hashmap of coroutines with as key the gateway opcode.
76
77
        :raises InvalidTokenError:
78
            Discord Token length is not 59 characters.
79
        """
80
81
        if len(token) != 59:
82
            raise InvalidTokenError(
83
                "Discord Token must have exactly 59 characters."
84
            )
85
86
        self.__token = token
87
        self.__keep_alive = True
88
        self.__socket: Optional[WebSocketClientProtocol] = None
89
        self.__intents = intents
90
91
        async def identify_and_handle_hello(
92
                socket: WebSocketClientProtocol,
93
                payload: GatewayDispatch
94
        ):
95
            """
96
            Identifies the client to the Discord Websocket API, this
97
            gets done when the client receives the ``hello`` (opcode 10)
98
            message from discord. Right after we send our identification
99
            the heartbeat starts.
100
101
            :param socket:
102
                The current socket, which can be used to interact
103
                with the Discord API.
104
105
            :param payload:
106
                The received payload from Discord.
107
            """
108
            _log.debug("Sending authentication/identification message.")
109
110
            await socket.send(self.__hello_socket)
111
            await Heartbeat.handle_hello(socket, payload)
112
113
        async def handle_reconnect(_, payload: GatewayDispatch):
114
            """
115
            Closes the client and then reconnects it.
116
            """
117
            _log.debug("Reconnecting client...")
118
            await self.close()
119
120
            Heartbeat.update_sequence(payload.seq)
121
            self.start_loop()
122
123
        self.__dispatch_handlers: Dict[int, Handler] = {
124
            **handlers,
125
            7: handle_reconnect,
126
            9: handle_reconnect,
127
            10: identify_and_handle_hello,
128
            11: Heartbeat.handle_heartbeat
129
        }
130
131
        self.__dispatch_errors: Dict[int, PincerError] = {
132
            4000: _InternalPerformReconnectError(),
133
            4004: InvalidTokenError(),
134
            4007: _InternalPerformReconnectError(),
135
            4009: _InternalPerformReconnectError(),
136
            4014: DisallowedIntentsError()
137
        }
138
139
    @property
140
    def intents(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
141
        return self.__intents
142
143
    @property
144
    def __hello_socket(self) -> str:
145
        return str(
146
            GatewayDispatch(
147
                2, {
148
                    "token": self.__token,
149
                    "intents": self.__intents,
150
                    "properties": {
151
                        "$os": system(),
152
                        "$browser": __package__,
153
                        "$device": __package__
154
                    }
155
                }
156
            )
157
        )
158
159
    async def __handler_manager(
160
            self,
161
            socket: WebSocketClientProtocol,
162
            payload: GatewayDispatch,
163
            loop: AbstractEventLoop
164
    ):
165
        """
166
        This manages all handles for given OP codes.
167
        This method gets invoked for every message that is received from
168
        Discord.
169
170
        :meta public:
171
172
        :param socket:
173
            The current socket, which can be used to interact with
174
            the Discord API.
175
176
        :param payload:
177
            The received payload from Discord.
178
179
        :param loop:
180
            The current async loop on which the future is bound.
181
        """
182
        _log.debug(
183
            "New event received, checking if handler exists for opcode: %i",
184
            payload.op
185
        )
186
187
        handler: Handler = self.__dispatch_handlers.get(payload.op)
188
189
        if not handler:
190
            _log.error(
191
                "No handler was found for opcode %i, please report this to the "
192
                "pincer dev team!", payload.op
193
            )
194
195
            raise UnhandledException(f"Unhandled payload: {payload}")
196
197
        _log.debug(
198
            "Event handler found, ensuring async future in current loop."
199
        )
200
201
        ensure_future(handler(socket, payload), loop=loop)
202
203
    async def __dispatcher(self, loop: AbstractEventLoop):
204
        """
205
        The main event loop.
206
        This handles all interactions with the websocket API.
207
208
        :meta public:
209
210
        :param loop:
211
            The loop in which the dispatcher is running.
212
        """
213
        _log.debug(
214
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
215
        )
216
217
        async with connect(GatewayConfig.uri()) as socket:
218
            self.__socket = socket
219
            _log.debug(
220
                "Successfully established websocket connection with `%s`",
221
                GatewayConfig.uri()
222
            )
223
224
            while self.__keep_alive:
225
                try:
226
                    _log.debug("Waiting for new event.")
227
                    await self.__handler_manager(
228
                        socket,
229
                        GatewayDispatch.from_string(await socket.recv()),
230
                        loop
231
                    )
232
233
                except ConnectionClosedError as exc:
234
                    _log.debug(
235
                        "The connection with `%s` has been broken unexpectedly."
236
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
237
                    )
238
239
                    await self.close()
240
                    exception = self.__dispatch_errors.get(exc.code)
241
242
                    if isinstance(exception, _InternalPerformReconnectError):
243
                        Heartbeat.update_sequence(0)
244
                        return self.start_loop()
245
246
                    raise exception or UnhandledException(
247
                        f"Dispatch error ({exc.code}): {exc.reason}"
248
                    )
249
                except ConnectionClosedOK:
250
                    _log.debug("Connection closed successfully.")
251
252
    def start_loop(self, *, loop: AbstractEventLoop = None):
253
        """
254
        Instantiate the dispatcher, this will create a connection to the
255
        Discord websocket API on behalf of the client who's token has
256
        been passed.
257
258
        Keyword Arguments:
259
260
        :param loop:
261
            The loop in which the Dispatcher will run. If no loop is
262
            provided it will get a new one.
263
        """
264
        _log.debug("Starting GatewayDispatcher")
265
        loop = loop or get_event_loop()
266
        loop.run_until_complete(self.__dispatcher(loop))
267
        loop.close()
268
269
    async def close(self):
270
        """
271
        Stop the dispatcher from listening and responding to gateway
272
        events. This should let the client close on itself.
273
        """
274
        if not self.__socket:
275
            _log.error("Cannot close non existing socket socket connection.")
276
            raise RuntimeError("Please open the connection before closing.")
277
278
        _log.debug(
279
            "Setting keep_alive to False, this will terminate the heartbeat."
280
        )
281
282
        self.__keep_alive = False
283
        await self.__socket.close()
284