Passed
Pull Request — main (#176)
by
unknown
01:57
created

pincer.utils.event_mgr.EventMgr.wait_for()   A

Complexity

Conditions 2

Size

Total Lines 34
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 34
rs 9.7
c 0
b 0
f 0
cc 2
nop 4
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from abc import ABC, abstractmethod
5
from asyncio import Event, wait_for as _wait_for, get_running_loop, TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
6
from collections import deque
7
from typing import TYPE_CHECKING
8
9
from ..exceptions import TimeoutError as PincerTimeoutError
10
11
12
if TYPE_CHECKING:
13
    from typing import Any, List, Union
14
    from .types import CheckFunction
15
16
17
class _Processable(ABC):
18
19
    @abstractmethod
20
    def process(self, event_name: str, *args):
21
        """
22
        Method that is ran when an event is recieved from discord.
23
24
        Parameters
25
        ----------
26
        event_name : str
27
            The name of the event.
28
        *args : Any
29
            Arguments to evaluate check with.
30
31
        Returns
32
        -------
33
        bool
34
            Whether the event can be set
35
        """
36
37
    def matches_event(self, event_name: str, *args):
38
        """
39
        Parameters
40
        ----------
41
        event_name : str
42
            Name of event.
43
        *args : Any
44
            Arguments to evalue check with.
45
        """
46
        if self.event_name != event_name:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named event_name.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
47
            return False
48
49
        if self.check:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
50
            return self.check(*args)
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
51
52
        return True
53
54
55
def _lowest_value(*args):
56
    """
57
    Returns lowest value from list of numbers. ``None`` is not counted as a
58
    value. ``None`` is returned if all arguments are ``None``.
59
    """
60
    args_without_none = [n for n in args if n is not None]
61
62
    if len(args_without_none) == 0:
63
        return None
64
65
    return min(args_without_none)
66
67
68
class _Event(_Processable):
69
    """
70
    Parameters
71
    ----------
72
    event_name : str
73
        The name of the event.
74
    check : Optional[Callable[[Any], bool]]
75
        ``can_be_set`` only returns true if this function returns true.
76
        Will be ignored if set to None.
77
78
    Attributes
79
    ----------
80
    event : :class:`asyncio.Event`
81
        Even that is used to wait until the next valid discord event.
82
    return_value : Optional[str]
83
        Used to store the arguments from ``can_be_set`` so they can be
84
        returned later.
85
    """
86
87
    def __init__(
88
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
89
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
90
        check: CheckFunction
0 ignored issues
show
Bug introduced by
The variable CheckFunction was used before it was assigned.
Loading history...
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
91
    ):
92
        self.event_name = event_name
93
        self.check = check
94
        self.event = Event()
95
        self.return_value = None
96
        super().__init__()
97
98
    async def wait(self):
99
        """
100
        Waits until ``self.event`` is set.
101
        """
102
        await self.event.wait()
103
104
    def process(self, event_name: str, *args) -> bool:
105
        if self.matches_event(event_name, *args):
106
            self.return_value = args
107
            self.event.set()
108
109
110
class _LoopEmptyError(Exception):
111
    "Raised when the _LoopMgr is empty and cannot accept new item"
112
113
114
class _LoopMgr(_Processable):
115
    """
116
    Parameters
117
    ----------
118
    event_name : str
119
        The name of the event.
120
    check : Optional[Callable[[Any], bool]]
121
        ``can_be_set`` only returns true if this function returns true.
122
        Will be ignored if set to None.
123
124
    Attributes
125
    ----------
126
    can_expand : bool
127
        Whether the queue is allowed to grow. Turned to false once the
128
        EventMgr's timer runs out.
129
    events : :class:`collections.deque`
130
        Qeue of events to be processed.
131
    wait : :class:`asyncio.Event`
132
        Used to make ``get_next()` wait for the next event.
133
    """
134
135
    def __init__(self, event_name: str, check: CheckFunction) -> None:
136
        self.event_name = event_name
137
        self.check = check
138
139
        self.can_expand = True
140
        self.events = deque()
141
        self.wait = Event()
142
143
    def process(self, event_name: str, *args):
144
        if not self.can_expand:
145
            return
146
147
        if self.matches_event(event_name, *args):
148
            self.events.append(args)
149
            self.wait.set()
150
151
    async def get_next(self):
152
        """
153
        Returns the next item if the queue. If there are no items in the queue,
154
        it will return the next event that happens.
155
        """
156
        if not self.events:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
157
            if not self.can_expand:
158
                raise _LoopEmptyError
159
160
            self.wait.clear()
161
            await self.wait.wait()
162
            return self.events.popleft()
163
        else:
164
            return self.events.popleft()
165
166
167
class EventMgr:
168
    """
169
    Attributes
170
    ----------
171
    event_list : List[_DiscordEvent]
172
        The List of events that need to be processed.
173
    """
174
175
    def __init__(self):
176
        self.event_list: List[_Processable] = []
0 ignored issues
show
Bug introduced by
The variable List was used before it was assigned.
Loading history...
177
178
    def process_events(self, event_name, *args):
179
        """
180
        Parameters
181
        ----------
182
        event_name : str
183
            The name of the event to be processed.
184
        *args : Any
185
            The arguments returned from the middleware for this event.
186
        """
187
        for event in self.event_list:
188
            event.process(event_name, *args)
189
190
    async def wait_for(
191
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
192
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
193
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
194
        timeout: Union[float, None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
Bug introduced by
The variable Union was used before it was assigned.
Loading history...
195
    ) -> Any:
0 ignored issues
show
Bug introduced by
The variable Any was used before it was assigned.
Loading history...
196
        """
197
        Parameters
198
        ----------
199
        event_name : str
200
            The type of event. It should start with `on_`. This is the same
201
            name that is used for @Client.event.
202
        check : Union[Callable[[Any], bool], None]
203
            This function only returns a value if this return true.
204
        timeout: Union[float, None]
205
            Amount of seconds before timeout. Use None for no timeout.
206
207
        Returns
208
        ------
209
        Any
210
            What the Discord API returns for this event.
211
        """
212
213
        event = _Event(event_name, check)
214
        self.event_list.append(event)
215
216
        try:
217
            await _wait_for(event.wait(), timeout=timeout)
218
        except TimeoutError:
219
            raise PincerTimeoutError(
220
                "wait_for() timed out while waiting for an event."
221
            )
222
        self.event_list.remove(event)
223
        return event.return_value
224
225
    async def loop_for(
226
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
227
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
228
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
229
        iteration_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
230
        loop_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
231
    ) -> Any:
232
        """
233
        Parameters
234
        ----------
235
        event_name : str
236
            The type of event. It should start with `on_`. This is the same
237
            name that is used for @Client.event.
238
        check : Callable[[Any], bool]
239
            This function only returns a value if this return true.
240
        iteration_timeout: Union[float, None]
241
            Amount of seconds before timeout. Timeouts are for each loop.
242
        loop_timeout: Union[float, None]
243
            Amount of seconds before the entire loop times out. The generator
244
            will only raise a timeout error while it is waiting for an event.
245
246
        Yields
247
        ------
248
        Any
249
            What the Discord API returns for this event.
250
        """
251
252
        loop_mgr = _LoopMgr(event_name, check)
253
        self.event_list.append(loop_mgr)
254
255
        loop = get_running_loop()
256
257
        while True:
258
            start_time = loop.time()
259
260
            try:
261
                yield await _wait_for(
262
                    loop_mgr.get_next(),
263
                    timeout=_lowest_value(
264
                        loop_timeout, iteration_timeout
265
                    )
266
                )
267
268
            except TimeoutError:
269
                # Loop timed out. Loop through the remaining events recieved
270
                # before the timeout.
271
                loop_mgr.can_expand = False
272
                try:
273
                    while True:
274
                        yield await loop_mgr.get_next()
275
                except _LoopEmptyError:
276
                    raise PincerTimeoutError(
277
                        "loop_for() timed out while waiting for an event"
278
                    )
279
280
            loop_timeout -= loop.time() - start_time
281
282
            # loop_timeout can be below 0 if the user's code in the for loop
283
            # takes longer than the time left in loop_timeout
284
            if loop_timeout <= 0:
285
                raise PincerTimeoutError(
286
                    "loop_for() timed out while waiting for an event"
287
                )
288