Passed
Push — main ( 3c4919...341de5 )
by Yohann
01:27
created

pincer.core.gateway.Dispatcher.__hello_socket()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 11
rs 9.85
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# MIT License
3
#
4
# Copyright (c) 2021 Pincer
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining
7
# a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
21
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
22
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
23
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24
25
from __future__ import annotations
26
27
import logging
28
from asyncio import get_event_loop, AbstractEventLoop, ensure_future
29
from platform import system
30
from typing import Dict, Callable, Awaitable, Optional
31
32
from websockets import connect
33
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
34
from websockets.legacy.client import WebSocketClientProtocol
35
36
from . import __package__
37
from .._config import GatewayConfig
38
from ..core.dispatch import GatewayDispatch
39
from ..core.heartbeat import Heartbeat
40
from ..exceptions import (
41
    PincerError, InvalidTokenError, UnhandledException,
42
    _InternalPerformReconnectError, DisallowedIntentsError
43
)
44
45
Handler = Callable[[WebSocketClientProtocol, GatewayDispatch], Awaitable[None]]
46
_log = logging.getLogger(__package__)
47
48
49
class Dispatcher:
50
    """
51
    The Dispatcher handles all interactions with discord websocket API.
52
    This also contains the main event loop, and handles the heartbeat.
53
54
    Running the dispatcher will create a connection with the
55
    Discord WebSocket API on behalf of the provided token.
56
57
    This token must be a bot token.
58
    (Which can be found on `<https://discord.com/developers/applications/\<bot_id\>/bot>`_)
59
    """
60
61
    # TODO: Add intents argument
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
62
    # TODO: Implement compression
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
63
    def __init__(self, token: str, *, handlers: Dict[int, Handler]) -> None:
64
        """
65
        :param token:
66
            Bot token for discord's API.
67
        :raises InvalidTokenError:
68
            Discord Token length is not 59 characters.
69
        """
70
71
        if len(token) != 59:
72
            raise InvalidTokenError(
73
                "Discord Token must have exactly 59 characters."
74
            )
75
76
        self.__token = token
77
        self.__keep_alive = True
78
        self.__socket: Optional[WebSocketClientProtocol] = None
79
80
        async def identify_and_handle_hello(
81
                socket: WebSocketClientProtocol,
82
                payload: GatewayDispatch
83
        ):
84
            """
85
            Identifies the client to the Discord Websocket API, this
86
            gets done when the client receives the ``hello`` (opcode 10)
87
            message from discord. Right after we send our identification
88
            the heartbeat starts.
89
90
            :param socket:
91
                The current socket, which can be used to interact
92
                with the Discord API.
93
94
            :param payload:
95
                The received payload from Discord.
96
            """
97
            _log.debug("Sending authentication/identification message.")
98
99
            await socket.send(self.__hello_socket)
100
            await Heartbeat.handle_hello(socket, payload)
101
102
        async def handle_reconnect(_, payload: GatewayDispatch):
103
            """
104
            Closes the client and then reconnects it.
105
            """
106
            _log.debug("Reconnecting client...")
107
            await self.close()
108
109
            Heartbeat.update_sequence(payload.seq)
110
            self.run()
111
112
        self.__dispatch_handlers: Dict[int, Handler] = {
113
            **handlers,
114
            7: handle_reconnect,
115
            9: handle_reconnect,
116
            10: identify_and_handle_hello,
117
            11: Heartbeat.handle_heartbeat
118
        }
119
120
        self.__dispatch_errors: Dict[int, PincerError] = {
121
            4000: _InternalPerformReconnectError(),
122
            4004: InvalidTokenError(),
123
            4007: _InternalPerformReconnectError(),
124
            4009: _InternalPerformReconnectError(),
125
            4014: DisallowedIntentsError()
126
        }
127
128
    @property
129
    def __hello_socket(self) -> str:
130
        return str(
131
            GatewayDispatch(
132
                2, {
133
                    "token": self.__token,
134
                    "intents": 0,
135
                    "properties": {
136
                        "$os": system(),
137
                        "$browser": __package__,
138
                        "$device": __package__
139
                    }
140
                }
141
            )
142
        )
143
144
    async def __handler_manager(
145
            self,
146
            socket: WebSocketClientProtocol,
147
            payload: GatewayDispatch,
148
            loop: AbstractEventLoop
149
    ):
150
        """
151
        This manages all handles for given OP codes.
152
        This method gets invoked for every message that is received from
153
        Discord.
154
155
        :meta public:
156
157
        :param socket:
158
            The current socket, which can be used to interact with
159
            the Discord API.
160
161
        :param payload:
162
            The received payload from Discord.
163
164
        :param loop:
165
            The current async loop on which the future is bound.
166
        """
167
        _log.debug(
168
            "New event received, checking if handler exists for opcode: %i",
169
            payload.op
170
        )
171
172
        handler: Handler = self.__dispatch_handlers.get(payload.op)
173
174
        if not handler:
175
            _log.error(
176
                "No handler was found for opcode %i, please report this to the "
177
                "pincer dev team!", payload.op
178
            )
179
180
            raise UnhandledException(f"Unhandled payload: {payload}")
181
182
        _log.debug(
183
            "Event handler found, ensuring async future in current loop."
184
        )
185
186
        ensure_future(handler(socket, payload), loop=loop)
187
188
    async def __dispatcher(self, loop: AbstractEventLoop):
189
        """
190
        The main event loop.
191
        This handles all interactions with the websocket API.
192
193
        :meta public:
194
195
        :param loop:
196
            The loop in which the dispatcher is running.
197
        """
198
        _log.debug(
199
            "Establishing websocket connection with `%s`", GatewayConfig.uri()
200
        )
201
202
        async with connect(GatewayConfig.uri()) as socket:
203
            self.__socket = socket
204
            _log.debug(
205
                "Successfully established websocket connection with `%s`",
206
                GatewayConfig.uri()
207
            )
208
209
            while self.__keep_alive:
210
                try:
211
                    _log.debug("Waiting for new event.")
212
                    await self.__handler_manager(
213
                        socket,
214
                        GatewayDispatch.from_string(await socket.recv()),
215
                        loop
216
                    )
217
218
                except ConnectionClosedError as exc:
219
                    _log.debug(
220
                        "The connection with `%s` has been broken unexpectedly."
221
                        " (%i, %s)", GatewayConfig.uri(), exc.code, exc.reason
222
                    )
223
224
                    await self.close()
225
                    exception = self.__dispatch_errors.get(exc.code)
226
227
                    if isinstance(exception, _InternalPerformReconnectError):
228
                        Heartbeat.update_sequence(0)
229
                        await self.close()
230
                        return self.run()
231
232
                    raise exception or UnhandledException(
233
                        f"Dispatch error ({exc.code}): {exc.reason}"
234
                    )
235
                except ConnectionClosedOK:
236
                    _log.debug("Connection closed successfully.")
237
238
    def run(self, *, loop: AbstractEventLoop = None):
239
        """
240
        Instantiate the dispatcher, this will create a connection to the
241
        Discord websocket API on behalf of the client who's token has
242
        been passed.
243
244
        Keyword Arguments:
245
246
        :param loop:
247
            The loop in which the Dispatcher will run. If no loop is
248
            provided it will get a new one.
249
        """
250
        _log.debug("Starting GatewayDispatcher")
251
        loop = loop or get_event_loop()
252
        loop.run_until_complete(self.__dispatcher(loop))
253
        loop.close()
254
255
    async def close(self):
256
        """
257
        Stop the dispatcher from listening and responding to gateway
258
        events. This should let the client close on itself.
259
        """
260
        if not self.__socket:
261
            _log.error("Cannot close non existing socket socket connection.")
262
            raise RuntimeError("Please open the connection before closing.")
263
264
        _log.debug(
265
            "Setting keep_alive to False, this will terminate the heartbeat."
266
        )
267
268
        self.__keep_alive = False
269
        await self.__socket.close()
270