Passed
Pull Request — main (#176)
by
unknown
01:43
created

pincer.utils.event_mgr._lowest_value()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from abc import ABC, abstractmethod
5
from asyncio import Event, wait_for as _wait_for, get_running_loop, TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
6
from collections import deque
7
from typing import Any, Callable, Union
8
9
from .types import CheckFunction
10
11
12
class _Processable(ABC):
13
14
    @abstractmethod
15
    def process(self, event_name: str, *args):
16
        """Method that is ran when an event is recieved from discord"""
17
18
    def matches_event(self, event_name: str, *args):
19
        """
20
        Parameters
21
        ----------
22
        event_name : str
23
            Name of event.
24
        *args : Any
25
            Arguments to evalue check with.
26
        """
27
        if self.event_name != event_name:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named event_name.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
28
            return False
29
30
        if self.check:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
31
            return self.check(*args)
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
32
33
        return True
34
35
36
def _lowest_value(*args):
37
    """
38
    Returns lowest value from list of numbers. None is not counted as a value.
39
    """
40
    return min(
41
        [
42
            n for n in args if n
43
        ]
44
    )
45
46
47
class _Event(_Processable):
48
    """
49
    Attributes
50
    ----------
51
    return_value : Optional[str]
52
        Used to store the arguments from ``can_be_set`` so they can be
53
        returned later.
54
    """
55
56
    def __init__(
57
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
58
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
59
        check: CheckFunction
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
60
    ):
61
        """
62
        Parameters
63
        ----------
64
        event_name : str
65
            The name of the event.
66
        check : Optional[Callable[[Any], bool]]
67
            ``can_be_set`` only returns true if this function returns true.
68
            Will be ignored if set to None.
69
        """
70
        self.event_name = event_name
71
        self.event = Event()
72
        self.check = check
73
        self.return_value = None
74
        super().__init__()
75
76
    async def wait(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
77
        await self.event.wait()
78
79
    def process(self, event_name: str, *args) -> bool:
80
        """
81
        Parameters
82
        ----------
83
        event_name : str
84
            The name of the event.
85
        *args : Any
86
            Arguments to evaluate check with.
87
88
        Returns
89
        -------
90
        bool
91
            Whether the event can be set
92
        """
93
        if self.matches_event(event_name, *args):
94
            self.return_value = args
95
            self.event.set()
96
97
98
class __LoopEmptyError(Exception):
99
    "Raised when the _LoopMgr is empty and cannot accept new item"
100
101
102
class _LoopMgr(_Processable):
103
104
    def __init__(self, event_name: str, check: CheckFunction) -> None:
105
        self.event_name = event_name
106
        self.check = check
107
108
        self.can_expand = True
109
        self.events = deque()
110
        self.wait = Event()
111
112
    def process(self, event_name: str, *args):
113
        if not self.can_expand:
114
            return
115
116
        if self.matches_event(event_name, *args):
117
            self.events.append(args)
118
            self.wait.set()
119
120
    async def get_next(self):
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
121
        if len(self.events) == 0:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
122
            if not self.can_expand:
123
                raise __LoopEmptyError
124
125
            self.wait.clear()
126
            await self.wait.wait()
127
            return self.events.popleft()
128
        else:
129
            return self.events.popleft()
130
131
132
class EventMgr:
133
    """
134
    Attributes
135
    ----------
136
    event_list : List[_DiscordEvent]
137
        The List of events that need to be processed.
138
    """
139
140
    def __init__(self):
141
        self.event_list: _Processable = []
142
143
    def process_events(self, event_name, *args):
144
        """
145
        Parameters
146
        ----------
147
        event_name : str
148
            The name of the event to be processed.
149
        *args : Any
150
            The arguments returned from the middleware for this event.
151
        """
152
        for event in self.event_list:
153
            event.process(event_name, *args)
154
155
    async def wait_for(
156
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
157
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
158
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
159
        timeout: Union[float, None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
160
    ) -> Any:
161
        """
162
        Parameters
163
        ----------
164
        event_name : str
165
            The type of event. It should start with `on_`. This is the same
166
            name that is used for @Client.event.
167
        check : Union[Callable[[Any], bool], None]
168
            This function only returns a value if this return true.
169
        timeout: Union[float, None]
170
            Amount of seconds before timeout. Use None for no timeout.
171
172
        Returns
173
        ------
174
        Any
175
            What the Discord API returns for this event.
176
        """
177
178
        event = _Event(
179
            event_name=event_name,
180
            check=check
181
        )
182
        self.event_list.append(event)
183
184
        try:
185
            await _wait_for(event.wait(), timeout=timeout)
186
        except TimeoutError:
187
            raise TimeoutError(
188
                "wait_for() timed out while waiting for an event."
189
            )
190
        self.event_list.remove(event)
191
        return event.return_value
192
193
    async def loop_for(
194
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
195
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
196
        check: Union[Callable[[Any], bool], None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
197
        iteration_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
198
        loop_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
199
    ) -> Any:
200
        """
201
        Parameters
202
        ----------
203
        event_name : str
204
            The type of event. It should start with `on_`. This is the same
205
            name that is used for @Client.event.
206
        check : Callable[[Any], bool]
207
            This function only returns a value if this return true.
208
        iteration_timeout: Union[float, None]
209
            Amount of seconds before timeout. Timeouts are for each loop.
210
        loop_timeout: Union[float, None]
211
            Amount of seconds before the entire loop times out. The generator
212
            will only raise a timeout error while it is waiting for an event.
213
214
        Yields
215
        ------
216
        Any
217
            What the Discord API returns for this event.
218
        """
219
220
        loop_mgr = _LoopMgr(event_name, check)
221
        self.event_list.append(loop_mgr)
222
223
        loop = get_running_loop()
224
225
        while True:
226
            start_time = loop.time()
227
228
            try:
229
                yield await _wait_for(
230
                    loop_mgr.get_next(),
231
                    timeout=_lowest_value(
232
                        loop_timeout, iteration_timeout
233
                    )
234
                )
235
236
            except TimeoutError:
237
                # Loop timed out. Loop through the remaining events recieved
238
                # before the timeout.
239
                loop_mgr.can_expand = False
240
                try:
241
                    while True:
242
                        yield await loop_mgr.get_next()
243
                except __LoopEmptyError:
244
                    raise TimeoutError(
245
                        "loop_for() timed out while waiting for an event"
246
                    )
247
248
            loop_timeout -= loop.time() - start_time
249
250
            # loop_timeout can be below 0 if the user's code in the for loop
251
            # takes longer than the time left in loop_timeout
252
            if loop_timeout <= 0:
253
                raise TimeoutError(
254
                    "loop_for() timed out while waiting for an event"
255
                )
256