DefaultThrottleHandler.init_throttler()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
from __future__ import annotations
4
5
from abc import ABC, abstractmethod
6
from typing import TYPE_CHECKING, DefaultDict
7
8
from .throttle_scope import ThrottleScope
9
from ..app.command import InteractableStructure
10
from ...exceptions import CommandCooldownError
11
from ...utils.slidingwindow import SlidingWindow
12
13
if TYPE_CHECKING:
14
    from typing import Dict, Optional
15
16
    from ...utils.types import Coro
17
18
19
class ThrottleInterface(ABC):
20
    """An ABC for throttling."""
21
22
    throttle: DefaultDict[
23
        Coro, Dict[Optional[str], SlidingWindow]
0 ignored issues
show
introduced by
The variable Coro does not seem to be defined in case TYPE_CHECKING on line 13 is False. Are you sure this can never be the case?
Loading history...
24
    ] = DefaultDict(dict)
25
26
    @staticmethod
27
    @abstractmethod
28
    def handle(command: InteractableStructure, **kwargs):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
29
        raise NotImplementedError
30
31
32
class DefaultThrottleHandler(ThrottleInterface, ABC):
33
    """The default throttle-handler based off the
34
    :class:`~pincer.objects.app.throttling.ThrottleInterface` ABC
35
    """
36
37
    __throttle_scopes = {
38
        ThrottleScope.GLOBAL: None,
39
        ThrottleScope.GUILD: "guild_id",
40
        ThrottleScope.CHANNEL: "channel_id",
41
        ThrottleScope.USER: "author.user.id",
42
    }
43
44
    @staticmethod
45
    def get_key_from_scope(command: InteractableStructure) -> Optional[int]:
46
        """Retrieve the appropriate key from the context through the
47
        throttle scope.
48
49
        Parameters
50
        ----------
51
        ctx : :class:`~pincer.objects.message.context.MessageContext`
52
            The context to retrieve with
53
54
        Returns
55
        -------
56
        Optional[:class:`int`]
57
            The throttle-scope enum
58
        """
59
        scope = DefaultThrottleHandler.__throttle_scopes[command.cooldown_scope]
60
61
        if not scope:
62
            return None
63
64
        last_obj = command
65
66
        for attr in scope.split("."):
67
            last_obj = getattr(last_obj, attr)
68
69
        return last_obj
70
71
    @staticmethod
72
    def init_throttler(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
73
        command: InteractableStructure, throttle_key: Optional[int]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
74
    ):
75
        DefaultThrottleHandler.throttle[command.call][
76
            throttle_key
77
        ] = SlidingWindow(command.cooldown, command.cooldown_scale)
78
79
    @staticmethod
80
    def handle(command: InteractableStructure, **kwargs):
81
        if command.cooldown <= 0:
82
            return
83
84
        throttle_key = DefaultThrottleHandler.get_key_from_scope(command)
85
        group = DefaultThrottleHandler.throttle.get(command.call)
86
        window_slider = group.get(throttle_key) if group is not None else None
87
88
        if window_slider:
89
            if not window_slider.allow():
90
                raise CommandCooldownError(
91
                    f"Cooldown for command {command.app.name} not met!", command
92
                )
93
        else:
94
            DefaultThrottleHandler.init_throttler(command, throttle_key)
95
            DefaultThrottleHandler.handle(command)
96