Passed
Pull Request — main (#176)
by
unknown
01:36
created

pincer.utils.event_mgr.EventMgr.loop_for()   B

Complexity

Conditions 6

Size

Total Lines 57
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 57
rs 8.3466
c 0
b 0
f 0
cc 6
nop 5

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from asyncio import Event, wait_for as _wait_for, get_running_loop
5
from typing import Any, Callable, Union
6
7
8
class _DiscordEvent(Event):
9
    """
10
    Attributes
11
    ----------
12
    return_value : Optional[str]
13
        Used to store the arguments from ``can_be_set`` so they can be
14
        returned later.
15
    """
16
17
    def __init__(
18
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
19
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
20
        check: Union[Callable[[Any], bool], None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
21
    ):
22
        """
23
        Parameters
24
        ----------
25
        event_name : str
26
            The name of the event.
27
        check : Optional[Callable[[Any], bool]]
28
            ``can_be_set`` only returns true if this function returns true.
29
            Will be ignored if set to None.
30
        """
31
        self.event_name = event_name
32
        self.check = check
33
        self.return_value = None
34
        super().__init__()
35
36
    def can_be_set(self, event_name: str, *args) -> bool:
37
        """
38
        Parameters
39
        ----------
40
        event_name : str
41
            The name of the event.
42
        *args : Any
43
            Arguments to evaluate check with.
44
45
        Returns
46
        -------
47
        bool
48
            Whether the event can be set
49
        """
50
        if self.event_name != event_name:
51
            return False
52
53
        self.return_value = args
54
55
        if self.check:
56
            return self.check(*args)
57
58
        return True
59
60
61
class EventMgr:
62
    """
63
    Attributes
64
    ----------
65
    stack : List[_DiscordEvent]
66
        The List of events that need to be processed.
67
    """
68
69
    def __init__(self):
70
        self.stack = []
71
72
    def add_event(self, event_name: str, check: Union[Callable, None]):
73
        """
74
        Parameters
75
        ----------
76
        event_name : str
77
            The type of event to listen for. Uses the same naming scheme as
78
            @Client.event.
79
        check : Optional[Callable[[Any], bool]]
80
            Expression to evaluate when checking if an event is valid. Will
81
            return be set if this event is true. Will be ignored if set to
82
            None.
83
84
        Returns
85
        -------
86
        _DiscordEvent
87
            Event that was added to the stack.
88
        """
89
        event = _DiscordEvent(
90
            event_name=event_name,
91
            check=check
92
        )
93
        self.stack.append(event)
94
        return event
95
96
    def pop_event(self, event) -> Any:
97
        """
98
        Parameters
99
        ----------
100
        event : _DiscordEvent
101
            Event to remove from the stack.
102
103
        Returns
104
        -------
105
        Any
106
            ``event.return_value``
107
        """
108
        self.stack.remove(event)
109
        return event.return_value
110
111
    def process_events(self, event_name, *args):
112
        """
113
        Parameters
114
        ----------
115
        event_name : str
116
            The name of the event to be processed.
117
        *args : Any
118
            The arguments returned from the middleware for this event.
119
        """
120
        for event in self.stack:
121
            if event.can_be_set(event_name, *args):
122
                event.set()
123
124
    async def wait_for(
125
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
126
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
127
        check: Union[Callable[[Any], bool], None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
128
        timeout: Union[float, None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
129
    ) -> Any:
130
        """
131
        Parameters
132
        ----------
133
        event_name : str
134
            The type of event. It should start with `on_`. This is the same
135
            name that is used for @Client.event.
136
        check : Union[Callable[[Any], bool], None]
137
            This function only returns a value if this return true.
138
        timeout: Union[float, None]
139
            Amount of seconds before timeout. Use None for no timeout.
140
141
        Returns
142
        ------
143
        Any
144
            What the Discord API returns for this event.
145
        """
146
        event = self.add_event(event_name, check)
147
        try:
148
            await _wait_for(event.wait(), timeout=timeout)
149
        except TimeoutError:
150
            raise TimeoutError(
151
                "wait_for() timed out while waiting for an event."
152
            )
153
        return self.pop_event(event)
154
155
    async def loop_for(
156
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
157
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
158
        check: Union[Callable[[Any], bool], None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
159
        iteration_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
160
        loop_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
161
    ) -> Any:
162
        """
163
        Parameters
164
        ----------
165
        event_name : str
166
            The type of event. It should start with `on_`. This is the same
167
            name that is used for @Client.event.
168
        check : Callable[[Any], bool]
169
            This function only returns a value if this return true.
170
        iteration_timeout: Union[float, None]
171
            Amount of seconds before timeout. Timeouts are for each loop.
172
        loop_timeout: Union[float, None]
173
            Amount of seconds before the entire loop times out. The generator
174
            will only raise a timeout error while it is waiting for an event.
175
176
        Yields
177
        ------
178
        Any
179
            What the Discord API returns for this event.
180
        """
181
182
        if not loop_timeout:
183
            while True:
184
                yield await self.wait_for(event_name, check, iteration_timeout)
185
186
        loop = get_running_loop()
187
188
        while True:
189
            start_time = loop.time()
190
191
            try:
192
                yield await _wait_for(
193
                    self.wait_for(
194
                        event_name,
195
                        check,
196
                        iteration_timeout
197
                    ),
198
                    timeout=loop_timeout
199
                )
200
201
            except TimeoutError:
202
                raise TimeoutError(
203
                    "loop_for() timed out while waiting for an event"
204
                )
205
206
            loop_timeout -= loop.time() - start_time
207
208
            # loop_timeout can be below 0 if the user's code in the for loop
209
            # takes longer than the time left in loop_timeout
210
            if loop_timeout <= 0:
211
                break
212