Passed
Pull Request — main (#176)
by Yohann
01:42
created

pincer.utils.event_mgr._Processable.process()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from abc import ABC, abstractmethod
5
from asyncio import Event, wait_for as _wait_for, get_running_loop, TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
6
from collections import deque
7
from typing import Any, Callable, Union
8
9
from .types import CheckFunction
10
11
12
class _Processable(ABC):
13
14
    @abstractmethod
15
    def process(self, event_name: str, *args):
16
        """
17
        Method that is ran when an event is recieved from discord.
18
19
        Parameters
20
        ----------
21
        event_name : str
22
            The name of the event.
23
        *args : Any
24
            Arguments to evaluate check with.
25
26
        Returns
27
        -------
28
        bool
29
            Whether the event can be set
30
        """
31
32
    def matches_event(self, event_name: str, *args):
33
        """
34
        Parameters
35
        ----------
36
        event_name : str
37
            Name of event.
38
        *args : Any
39
            Arguments to evalue check with.
40
        """
41
        if self.event_name != event_name:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named event_name.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
42
            return False
43
44
        if self.check:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
45
            return self.check(*args)
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
46
47
        return True
48
49
50
def _lowest_value(*args):
51
    """
52
    Returns lowest value from list of numbers. ``None`` is not counted as a
53
    value. ``None`` is returned if all arguments are ``None``.
54
    """
55
    args_without_none = [n for n in args if n is not None]
56
57
    if len(args_without_none) == 0:
58
        return None
59
60
    return min(
61
        args_without_none
62
    )
63
64
65
class _Event(_Processable):
66
    """
67
    Parameters
68
    ----------
69
    event_name : str
70
        The name of the event.
71
    check : Optional[Callable[[Any], bool]]
72
        ``can_be_set`` only returns true if this function returns true.
73
        Will be ignored if set to None.
74
75
    Attributes
76
    ----------
77
    event : :class:`asyncio.Event`
78
        Even that is used to wait until the next valid discord event.
79
    return_value : Optional[str]
80
        Used to store the arguments from ``can_be_set`` so they can be
81
        returned later.
82
    """
83
84
    def __init__(
85
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
86
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
87
        check: CheckFunction
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
88
    ):
89
        self.event_name = event_name
90
        self.check = check
91
        self.event = Event()
92
        self.return_value = None
93
        super().__init__()
94
95
    async def wait(self):
96
        """
97
        Waits until ``self.event`` is set.
98
        """
99
        await self.event.wait()
100
101
    def process(self, event_name: str, *args) -> bool:
102
        if self.matches_event(event_name, *args):
103
            self.return_value = args
104
            self.event.set()
105
106
107
class _LoopEmptyError(Exception):
108
    "Raised when the _LoopMgr is empty and cannot accept new item"
109
110
111
class _LoopMgr(_Processable):
112
    """
113
    Parameters
114
    ----------
115
    event_name : str
116
        The name of the event.
117
    check : Optional[Callable[[Any], bool]]
118
        ``can_be_set`` only returns true if this function returns true.
119
        Will be ignored if set to None.
120
121
    Attributes
122
    ----------
123
    can_expand : bool
124
        Whether the queue is allowed to grow. Turned to false once the
125
        EventMgr's timer runs out.
126
    events : :class:`collections.deque`
127
        Qeue of events to be processed.
128
    wait : :class:`asyncio.Event`
129
        Used to make ``get_next()` wait for the next event.
130
    """
131
132
    def __init__(self, event_name: str, check: CheckFunction) -> None:
133
        self.event_name = event_name
134
        self.check = check
135
136
        self.can_expand = True
137
        self.events = deque()
138
        self.wait = Event()
139
140
    def process(self, event_name: str, *args):
141
        if not self.can_expand:
142
            return
143
144
        if self.matches_event(event_name, *args):
145
            self.events.append(args)
146
            self.wait.set()
147
148
    async def get_next(self):
149
        """
150
        Returns the next item if the queue. If there are no items in the queue,
151
        it will return the next event that happens.
152
        """
153
        if len(self.events) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
154
            if not self.can_expand:
155
                raise _LoopEmptyError
156
157
            self.wait.clear()
158
            await self.wait.wait()
159
            return self.events.popleft()
160
        else:
161
            return self.events.popleft()
162
163
164
class EventMgr:
165
    """
166
    Attributes
167
    ----------
168
    event_list : List[_DiscordEvent]
169
        The List of events that need to be processed.
170
    """
171
172
    def __init__(self):
173
        self.event_list: _Processable = []
174
175
    def process_events(self, event_name, *args):
176
        """
177
        Parameters
178
        ----------
179
        event_name : str
180
            The name of the event to be processed.
181
        *args : Any
182
            The arguments returned from the middleware for this event.
183
        """
184
        for event in self.event_list:
185
            event.process(event_name, *args)
186
187
    async def wait_for(
188
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
189
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
190
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
191
        timeout: Union[float, None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
192
    ) -> Any:
193
        """
194
        Parameters
195
        ----------
196
        event_name : str
197
            The type of event. It should start with `on_`. This is the same
198
            name that is used for @Client.event.
199
        check : Union[Callable[[Any], bool], None]
200
            This function only returns a value if this return true.
201
        timeout: Union[float, None]
202
            Amount of seconds before timeout. Use None for no timeout.
203
204
        Returns
205
        ------
206
        Any
207
            What the Discord API returns for this event.
208
        """
209
210
        event = _Event(
211
            event_name=event_name,
212
            check=check
213
        )
214
        self.event_list.append(event)
215
216
        try:
217
            await _wait_for(event.wait(), timeout=timeout)
218
        except TimeoutError:
219
            raise TimeoutError(
220
                "wait_for() timed out while waiting for an event."
221
            )
222
        self.event_list.remove(event)
223
        return event.return_value
224
225
    async def loop_for(
226
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
227
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
228
        check: Union[Callable[[Any], bool], None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
229
        iteration_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
230
        loop_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
231
    ) -> Any:
232
        """
233
        Parameters
234
        ----------
235
        event_name : str
236
            The type of event. It should start with `on_`. This is the same
237
            name that is used for @Client.event.
238
        check : Callable[[Any], bool]
239
            This function only returns a value if this return true.
240
        iteration_timeout: Union[float, None]
241
            Amount of seconds before timeout. Timeouts are for each loop.
242
        loop_timeout: Union[float, None]
243
            Amount of seconds before the entire loop times out. The generator
244
            will only raise a timeout error while it is waiting for an event.
245
246
        Yields
247
        ------
248
        Any
249
            What the Discord API returns for this event.
250
        """
251
252
        loop_mgr = _LoopMgr(event_name, check)
253
        self.event_list.append(loop_mgr)
254
255
        loop = get_running_loop()
256
257
        while True:
258
            start_time = loop.time()
259
260
            try:
261
                yield await _wait_for(
262
                    loop_mgr.get_next(),
263
                    timeout=_lowest_value(
264
                        loop_timeout, iteration_timeout
265
                    )
266
                )
267
268
            except TimeoutError:
269
                # Loop timed out. Loop through the remaining events recieved
270
                # before the timeout.
271
                loop_mgr.can_expand = False
272
                try:
273
                    while True:
274
                        yield await loop_mgr.get_next()
275
                except _LoopEmptyError:
276
                    raise TimeoutError(
277
                        "loop_for() timed out while waiting for an event"
278
                    )
279
280
            loop_timeout -= loop.time() - start_time
281
282
            # loop_timeout can be below 0 if the user's code in the for loop
283
            # takes longer than the time left in loop_timeout
284
            if loop_timeout <= 0:
285
                raise TimeoutError(
286
                    "loop_for() timed out while waiting for an event"
287
                )
288