Passed
Pull Request — main (#63)
by P
01:20
created

pincer.client.Client.run()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# MIT License
3
#
4
# Copyright (c) 2021 Pincer
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining
7
# a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
21
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
22
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
23
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24
25
from __future__ import annotations
26
27
import logging
28
from asyncio import iscoroutinefunction, run
29
from typing import Optional, Any, Union, Dict, Tuple, List
30
31
from . import __package__
32
from ._config import events
33
from .commands import ChatCommandHandler
0 ignored issues
show
introduced by
Cannot import 'commands' due to syntax error 'invalid syntax (<unknown>, line 89)'
Loading history...
34
from .core.dispatch import GatewayDispatch
35
from .core.gateway import Dispatcher
36
from .core.http import HTTPClient
37
from .exceptions import InvalidEventName
38
from .objects import User, Message, Embed
39
from .objects.interactions import Interaction, InteractionFlags
40
from .utils import get_index, should_pass_cls, Coro, MISSING
41
from .utils.extraction import get_params
42
43
_log = logging.getLogger(__package__)
44
45
MiddlewareType = Optional[Union[Coro, Tuple[str, List[Any], Dict[str, Any]]]]
46
47
_events: Dict[str, Optional[Union[str, Coro]]] = {}
48
49
for event in events:
50
    event_final_executor = f"on_{event}"
51
52
    # Event middleware for the library.
53
    # Function argument is a payload (GatewayDispatch).
54
55
    # The function must return a string which
56
    # contains the main event key.
57
58
    # As second value a list with arguments,
59
    # and the third value value must be a dictionary.
60
    # The last two are passed on as *args and **kwargs.
61
62
    # NOTE: These return values must be passed as a tuple!
63
    _events[event] = event_final_executor
64
65
    # The registered event by the client. Do not manually overwrite.
66
    _events[event_final_executor] = None
67
68
69
def middleware(call: str, *, override: bool = False):
70
    """
71
    Middleware are methods which can be registered with this decorator.
72
    These methods are invoked before any ``on_`` event.
73
    As the ``on_`` event is the final call.
74
75
    A default call exists for all events, but some might already be in
76
    use by the library.
77
78
    If you know what you are doing, you can override these default
79
    middleware methods by passing the override parameter.
80
81
    The method to which this decorator is registered must be a coroutine,
82
    and it must return a tuple with the following format:
83
84
    .. code-block:: python
85
86
        tuple(
87
            key for next middleware or final event [str],
88
            args for next middleware/event which will be passed as *args
89
                [list(Any)],
90
            kwargs for next middleware/event which will be passed as
91
                **kwargs [dict(Any)]
92
        )
93
94
    Two parameters are passed to the middleware. The first parameter is
95
    the current socket connection with and the second one is the payload
96
    parameter which is of type :class:`~.core.dispatch.GatewayDispatch`.
97
    This contains the response from the discord API.
98
99
    :Implementation example:
100
101
    .. code-block:: pycon
102
103
        >>> @middleware("ready", override=True)
104
        >>> async def custom_ready(_, payload: GatewayDispatch):
105
        >>>     return "on_ready", [User.from_dict(payload.data.get("user"))]
106
107
        >>> @Client.event
108
        >>> async def on_ready(bot: User):
109
        >>>     print(f"Signed in as {bot}")
110
111
112
    :param call:
113
        The call where the method should be registered.
114
115
    Keyword Arguments:
116
117
    :param override:
118
        Setting this to True will allow you to override existing
119
        middleware. Usage of this is discouraged, but can help you out
120
        of some situations.
121
    """
122
123
    def decorator(func: Coro):
124
        if override:
125
            _log.warning(
126
                "Middleware overriding has been enabled for `%s`."
127
                " This might cause unexpected behavior.", call
128
            )
129
130
        if not override and callable(_events.get(call)):
131
            raise RuntimeError(
132
                f"Middleware event with call `{call}` has "
133
                "already been registered"
134
            )
135
136
        async def wrapper(cls, payload: GatewayDispatch):
137
            _log.debug("`%s` middleware has been invoked", call)
138
139
            return await (
140
                func(cls, payload)
141
                if should_pass_cls(func)
142
                else await func(payload)
143
            )
144
145
        _events[call] = wrapper
146
        return wrapper
147
148
    return decorator
149
150
151
class Client(Dispatcher):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
152
    def __init__(self, token: str, *, received: str = None):
153
        """
154
        The client is the main instance which is between the programmer
155
            and the discord API.
156
157
        This client represents your bot.
158
159
        :param token:
160
            The secret bot token which can be found in
161
            `<https://discord.com/developers/applications/<bot_id>/bot>`_
162
163
        :param received:
164
            The default message which will be sent when no response is
165
            given.
166
        """
167
        # TODO: Implement intents
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
168
        super().__init__(
169
            token,
170
            handlers={
171
                # Use this event handler for opcode 0.
172
                0: self.event_handler
173
            }
174
        )
175
176
        self.bot: Optional[User] = None
177
        self.__received = received or "Command arrived successfully!"
178
        self.http = HTTPClient(token)
179
180
    @property
181
    def chat_commands(self):
182
        """
183
        Get a list of chat command calls which have been registered in
184
        the ChatCommandHandler.
185
        """
186
        return [cmd.app.name for cmd in ChatCommandHandler.register.values()]
187
188
    @staticmethod
189
    def event(coroutine: Coro):
190
        """
191
        Register a Discord gateway event listener. This event will get
192
        called when the client receives a new event update from Discord
193
        which matches the event name.
194
195
        The event name gets pulled from your method name, and this must
196
        start with ``on_``. This forces you to write clean and consistent
197
        code.
198
199
        This decorator can be used in and out of a class, and all
200
        event methods must be coroutines. *(async)*
201
202
        :Example usage:
203
204
        .. code-block:: pycon
205
206
            >>> # Function based
207
            >>> from pincer import Client
208
            >>>
209
            >>> client = Client("token")
210
            >>>
211
            >>> @client.event
212
            >>> async def on_ready():
213
            ...     print(f"Signed in as {client.bot}")
214
            >>>
215
            >>> if __name__ == "__main__":
216
            ...     client.run()
217
218
        .. code-block :: pycon
219
220
            >>> # Class based
221
            >>> from pincer import Client
222
            >>>
223
            >>> class BotClient(Client):
224
            ...     @Client.event
225
            ...     async def on_ready(self):
226
            ...         print(f"Signed in as {self.bot}")
227
            >>>
228
            >>> if __name__ == "__main__":
229
            ...     BotClient("token").run()
230
231
232
        :param coroutine: # TODO: add info
233
234
        :raises TypeError:
235
            If the method is not a coroutine.
236
237
        :raises InvalidEventName:
238
            If the event name does not start with ``on_``, has already
239
            been registered or is not a valid event name.
240
        """
241
242
        if not iscoroutinefunction(coroutine):
243
            raise TypeError(
244
                "Any event which is registered must be a coroutine function"
245
            )
246
247
        name: str = coroutine.__name__.lower()
248
249
        if not name.startswith("on_"):
250
            raise InvalidEventName(
251
                f"The event named `{name}` must start with `on_`"
252
            )
253
254
        if _events.get(name) is not None:
255
            raise InvalidEventName(
256
                f"The event `{name}` has already been registered or is not "
257
                f"a valid event name."
258
            )
259
260
        _events[name] = coroutine
261
        return coroutine
262
263
    def run(self):
264
        """Start the event listener"""
265
        self.start_loop()
266
        run(self.http.close())
267
268
    async def handle_middleware(
269
            self,
270
            payload: GatewayDispatch,
271
            key: str,
272
            *args,
273
            **kwargs
274
    ) -> Tuple[Optional[Coro], List[Any], Dict[str, Any]]:
275
        """
276
        Handles all middleware recursively. Stops when it has found an
277
        event name which starts with ``on_``.
278
279
        :param payload:
280
            The original payload for the event.
281
282
        :param key:
283
            The index of the middleware in ``_events``.
284
285
        :param \\*args:
286
            The arguments which will be passed to the middleware.
287
288
        :param \\*\\*kwargs:
289
            The named arguments which will be passed to the middleware.
290
291
        :return:
292
            A tuple where the first element is the final executor
293
            (so the event) its index in ``_events``. The second and third
294
            element are the ``*args`` and ``**kwargs`` for the event.
295
        """
296
        ware: MiddlewareType = _events.get(key)
297
        next_call, arguments, params = ware, [], {}
298
299
        if iscoroutinefunction(ware):
300
            extractable = await ware(self, payload, *args, **kwargs)
301
302
            if not isinstance(extractable, tuple):
303
                raise RuntimeError(
304
                    f"Return type from `{key}` middleware must be tuple. "
305
                )
306
307
            next_call = get_index(extractable, 0, "")
308
            arguments = get_index(extractable, 1, [])
309
            params = get_index(extractable, 2, {})
310
311
        if next_call is None:
312
            raise RuntimeError(f"Middleware `{key}` has not been registered.")
313
314
        return (
315
            (next_call, arguments, params)
316
            if next_call.startswith("on_")
317
            else await self.handle_middleware(
318
                payload, next_call, *arguments, **params
319
            )
320
        )
321
322
    async def event_handler(self, _, payload: GatewayDispatch):
323
        """
324
        Handles all payload events with opcode 0.
325
326
        :param _:
327
            Socket param, but this isn't required for this handler. So
328
            its just a filler parameter, doesn't matter what is passed.
329
330
        :param payload:
331
            The payload sent from the Discord gateway, this contains the
332
            required data for the client to know what event it is and
333
            what specifically happened.
334
        """
335
        event_name = payload.event_name.lower()
336
337
        key, args, kwargs = await self.handle_middleware(payload, event_name)
338
339
        call = _events.get(key)
340
341
        if iscoroutinefunction(call):
342
            if should_pass_cls(call):
343
                await call(self, *args, **kwargs)
344
            else:
345
                await call(*args, **kwargs)
346
347
    @middleware("ready")
348
    async def on_ready_middleware(self, payload: GatewayDispatch):
349
        """
350
        Middleware for ``on_ready`` event.
351
352
        :param payload:
353
            The data received from the ready event.
354
        """
355
        self.bot = User.from_dict(payload.data.get("user"))
356
        await ChatCommandHandler(self).initialize()
357
        return "on_ready",
358
359
    @middleware("interaction_create")
360
    async def on_interaction_middleware(self, payload: GatewayDispatch):
361
        """
362
        Middleware for ``on_interaction``, which handles command
363
        execution.
364
365
        :param payload:
366
            The data received from the interaction event.
367
        """
368
        interaction: Interaction = Interaction.from_dict(payload.data)
369
        command = ChatCommandHandler.register.get(interaction.data.name)
370
371
        if command:
372
            defaults = {param: None for param in get_params(command.call)}
373
            params = {}
374
375
            if interaction.data.options is not MISSING:
376
                params = {
377
                    opt.name: opt.value for opt in interaction.data.options
378
                }
379
380
            kwargs = {**defaults, **params}
381
382
            if should_pass_cls(command.call):
383
                kwargs["self"] = self
384
385
            message = await command.call(**kwargs)
386
387
            if isinstance(message, Embed):
388
                message = Message(embeds=[message])
389
390
            elif not isinstance(message, Message):
391
                message = Message(message) if message else Message(
392
                    self.__received,
393
                    flags=InteractionFlags.EPHEMERAL
394
                )
395
396
                await self.http.post(
397
                    f"interactions/{interaction.id}/{interaction.token}/callback",
398
                    message.to_dict()
399
                )
400
401
        return "on_interaction_create", [interaction]
402
403
404
Bot = Client
405