pincer.utils.event_mgr.EventMgr.loop_for()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 61
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 27
dl 0
loc 61
rs 7.8319
c 0
b 0
f 0
cc 7
nop 5

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from __future__ import annotations
5
6
from abc import ABC, abstractmethod
7
from asyncio import Event, wait_for as _wait_for, TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
8
from collections import deque
9
from typing import TYPE_CHECKING
10
11
from ..exceptions import TimeoutError as PincerTimeoutError
12
13
if TYPE_CHECKING:
14
    from asyncio import AbstractEventLoop
0 ignored issues
show
introduced by
Imports from package asyncio are not grouped
Loading history...
15
    from typing import Any, List, Union, Optional
0 ignored issues
show
introduced by
Imports from package typing are not grouped
Loading history...
16
    from .types import CheckFunction
17
18
19
class _Processable(ABC):
20
    @abstractmethod
21
    def process(self, event_name: str, event_value: Any):
22
        """
23
        Method that is run when an event is received from discord.
24
25
        Parameters
26
        ----------
27
        event_name : str
28
            The name of the event.
29
        event_value : Any
30
            Object to evaluate check with.
31
32
        Returns
33
        -------
34
        bool
35
            Whether the event can be set
36
        """
37
38
    def matches_event(self, event_name: str, event_value: Any):
39
        """
40
        Parameters
41
        ----------
42
        event_name : str
43
            Name of event.
44
        event_value : Any
45
            Object to eval check with.
46
        """
47
        if self.event_name != event_name:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named event_name.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
48
            return False
49
50
        if self.check:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
51
            if event_value is not None:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
52
                return self.check(event_value)
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
53
            else:
54
                # Certain middleware do not have an event_value
55
                return self.check()
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
56
57
        return True
58
59
60
def _lowest_value(*args):
61
    """
62
    Returns the lowest value from list of numbers. ``None`` is not counted as a
63
    value. ``None`` is returned if all arguments are ``None``.
64
    """
65
    args_without_none = [n for n in args if n is not None]
66
67
    if not args_without_none:
68
        return None
69
70
    return min(args_without_none)
71
72
73
class _Event(_Processable):
74
    """
75
    Parameters
76
    ----------
77
    event_name : str
78
        The name of the event.
79
    check : Optional[Callable[[Any], bool]]
80
        ``can_be_set`` only returns true if this function returns true.
81
        Will be ignored if set to None.
82
83
    Attributes
84
    ----------
85
    event : :class:`asyncio.Event`
86
        Even that is used to wait until the next valid discord event.
87
    return_value : Optional[str]
88
        Used to store the arguments from ``can_be_set`` so they can be
89
        returned later.
90
    """
91
92
    def __init__(self, event_name: str, check: CheckFunction):
93
        self.event_name = event_name
94
        self.check = check
95
        self.event = Event()
96
        self.return_value = None
97
        super().__init__()
98
99
    async def wait(self):
100
        """Waits until ``self.event`` is set."""
101
        await self.event.wait()
102
103
    def process(self, event_name: str, event_value: Any):
104
        # TODO: fix docs
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
105
        """
106
107
        Parameters
108
        ----------
109
        event_name
110
111
        Returns
112
        -------
113
114
        """
115
        if self.matches_event(event_name, event_value):
116
            self.return_value = event_value
117
            self.event.set()
118
119
120
class _LoopEmptyError(Exception):
121
    """Raised when the _LoopMgr is empty and cannot accept new item"""
122
123
124
class _LoopMgr(_Processable):
125
    """
126
    Parameters
127
    ----------
128
    event_name : str
129
        The name of the event.
130
    check : Optional[Callable[[Any], bool]]
131
        ``can_be_set`` only returns true if this function returns true.
132
        Will be ignored if set to None.
133
134
    Attributes
135
    ----------
136
    can_expand : bool
137
        Whether the queue is allowed to grow. Turned to false once the
138
        EventMgr's timer runs out.
139
    events : :class:`collections.deque`
140
        Queue of events to be processed.
141
    wait : :class:`asyncio.Event`
142
        Used to make ``get_next()` wait for the next event.
143
    """
144
145
    def __init__(self, event_name: str, check: CheckFunction) -> None:
146
        self.event_name = event_name
147
        self.check = check
148
149
        self.can_expand = True
150
        self.events = deque()
151
        self.wait = Event()
152
153
    def process(self, event_name: str, event_value: Any):
154
        # TODO: fix docs
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
155
        """
156
157
        Parameters
158
        ----------
159
        event_name
160
        Returns
161
        -------
162
163
        """
164
        if not self.can_expand:
165
            return
166
167
        if self.matches_event(event_name, event_value):
168
            self.events.append(event_value)
169
            self.wait.set()
170
171
    async def get_next(self):
172
        """
173
        Returns the next item if the queue. If there are no items in the queue,
174
        it will return the next event that happens.
175
        """
176
        if not self.events:
177
            if not self.can_expand:
178
                raise _LoopEmptyError
179
180
            self.wait.clear()
181
            await self.wait.wait()
182
        return self.events.popleft()
183
184
185
class EventMgr:
186
    """
187
    Attributes
188
    ----------
189
    event_list : List[_DiscordEvent]
190
        The List of events that need to be processed.
191
    """
192
193
    def __init__(self, loop: AbstractEventLoop):
194
        self.event_list: List[_Processable] = []
195
        self.loop = loop
196
197
    def process_events(self, event_name, event_value):
198
        """
199
        Parameters
200
        ----------
201
        event_name : str
202
            The name of the event to be processed.
203
        event_value : Any
204
            The object returned from the middleware for this event.
205
        """
206
        for event in self.event_list:
207
            event.process(event_name, event_value)
208
209
    async def wait_for(
210
        self, event_name: str, check: CheckFunction, timeout: Optional[float]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
211
    ) -> Any:
212
        """
213
        Parameters
214
        ----------
215
        event_name : str
216
            The type of event. It should start with `on_`. This is the same
217
            name that is used for @Client.event.
218
        check : Union[Callable[[Any], bool], None]
219
            This function only returns a value if this return true.
220
        timeout: Union[float, None]
221
            Amount of seconds before timeout. Use None for no timeout.
222
223
        Returns
224
        ------
225
        Any
226
            What the Discord API returns for this event.
227
        """
228
229
        event = _Event(event_name, check)
230
        self.event_list.append(event)
231
232
        try:
233
            await _wait_for(event.wait(), timeout=timeout)
234
        except TimeoutError:
235
            raise PincerTimeoutError(
236
                "wait_for() timed out while waiting for an event."
237
            )
238
        self.event_list.remove(event)
239
        return event.return_value
240
241
    async def loop_for(
242
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
243
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
244
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
245
        iteration_timeout: Optional[float],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
246
        loop_timeout: Optional[float],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
247
    ) -> Any:
248
        """
249
        Parameters
250
        ----------
251
        event_name : str
252
            The type of event. It should start with `on_`. This is the same
253
            name that is used for @Client.event.
254
        check : Callable[[Any], bool]
255
            This function only returns a value if this return true.
256
        iteration_timeout: Union[float, None]
257
            Amount of seconds before timeout. Timeouts are for each loop.
258
        loop_timeout: Union[float, None]
259
            Amount of seconds before the entire loop times out. The generator
260
            will only raise a timeout error while it is waiting for an event.
261
262
        Yields
263
        ------
264
        Any
265
            What the Discord API returns for this event.
266
        """
267
268
        loop_mgr = _LoopMgr(event_name, check)
269
        self.event_list.append(loop_mgr)
270
271
        while True:
272
            start_time = self.loop.time()
273
274
            try:
275
                yield await _wait_for(
276
                    loop_mgr.get_next(),
277
                    timeout=_lowest_value(loop_timeout, iteration_timeout),
278
                )
279
280
            except TimeoutError:
281
                # Loop timed out. Loop through the remaining events received
282
                # before the timeout.
283
                loop_mgr.can_expand = False
284
                try:
285
                    while True:
286
                        yield await loop_mgr.get_next()
287
                except _LoopEmptyError:
288
                    raise PincerTimeoutError(
289
                        "loop_for() timed out while waiting for an event"
290
                    )
291
292
            # `not` can't be used here because there is a check for
293
            # `loop_timeout == 0`
294
            if loop_timeout is not None:
295
                loop_timeout -= self.loop.time() - start_time
296
297
                # loop_timeout can be below 0 if the user's code in the for loop
298
                # takes longer than the time left in loop_timeout
299
                if loop_timeout <= 0:
300
                    raise PincerTimeoutError(
301
                        "loop_for() timed out while waiting for an event"
302
                    )
303