Completed
Push — main ( 3a3ea0...93fa25 )
by Yohann
23s queued 12s
created

pincer.objects.app.throttling   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 86
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 10
eloc 51
dl 0
loc 86
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A DefaultThrottleHandler.handle() 0 18 5
A ThrottleInterface.handle() 0 13 1
A DefaultThrottleHandler.init_throttler() 0 4 1
A DefaultThrottleHandler.get_key_from_scope() 0 18 3
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
from __future__ import annotations
4
5
from abc import ABC, abstractmethod
6
from typing import TYPE_CHECKING
7
8
from .throttle_scope import ThrottleScope
9
from ...exceptions import CommandCooldownError
10
from ...utils.slidingwindow import SlidingWindow
11
12
if TYPE_CHECKING:
13
    from typing import Dict, Optional
14
    from ..message.context import MessageContext
15
    from ...utils import Coro
16
17
18
class ThrottleInterface(ABC):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
19
    throttle: Dict[Coro, Dict[Optional[str], SlidingWindow]] = {}
0 ignored issues
show
introduced by
The variable Coro does not seem to be defined in case TYPE_CHECKING on line 12 is False. Are you sure this can never be the case?
Loading history...
20
21
    @staticmethod
22
    @abstractmethod
23
    def handle(ctx: MessageContext, **kwargs):
24
        """
25
        Handles a context. This method is executed before the command is.
26
27
        :param ctx:
28
            The context of the command.
29
30
        :param kwargs:
31
            The extra kwargs passed for the cooldown.
32
        """
33
        raise NotImplementedError
34
35
36
class DefaultThrottleHandler(ThrottleInterface, ABC):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
37
    __throttle_scopes = {
38
        ThrottleScope.GLOBAL: None,
39
        ThrottleScope.GUILD: "guild_id",
40
        ThrottleScope.CHANNEL: "channel_id",
41
        ThrottleScope.USER: "author.user.id"
42
    }
43
44
    @staticmethod
45
    def get_key_from_scope(ctx: MessageContext) -> Optional[int]:
46
        """
47
        Retrieve the the appropriate key from the context through the
48
        throttle scope.
49
        """
50
        scope = DefaultThrottleHandler.__throttle_scopes[
51
            ctx.command.cooldown_scope]
52
53
        if not scope:
54
            return None
55
56
        last_obj = ctx
57
58
        for attr in scope.split("."):
59
            last_obj = getattr(last_obj, attr)
60
61
        return last_obj
62
63
    @staticmethod
64
    def init_throttler(ctx: MessageContext, throttle_key: Optional[int]):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
65
        DefaultThrottleHandler.throttle[ctx.command.call][throttle_key] \
66
            = SlidingWindow(ctx.command.cooldown, ctx.command.cooldown_scale)
67
68
    @staticmethod
69
    def handle(ctx: MessageContext, **kwargs):
70
        if ctx.command.cooldown <= 0:
71
            return
72
73
        throttle_key = DefaultThrottleHandler.get_key_from_scope(ctx)
74
        group = DefaultThrottleHandler.throttle.get(ctx.command.call)
75
        window_slider = group.get(throttle_key) if group is not None else None
76
77
        if window_slider:
78
            if not window_slider.allow():
79
                raise CommandCooldownError(
80
                    f"Cooldown for command {ctx.command.app.name} not met!",
81
                    ctx
82
                )
83
        else:
84
            DefaultThrottleHandler.init_throttler(ctx, throttle_key)
85
            DefaultThrottleHandler.handle(ctx)
86