Passed
Pull Request — main (#176)
by
unknown
01:35
created

pincer.utils.event_mgr   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 283
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 28
eloc 107
dl 0
loc 283
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A _LoopMgr.get_next() 0 14 3
A _LoopMgr.process() 0 7 3
A _Processable.process() 0 3 1
A _Event.__init__() 0 10 1
B EventMgr.loop_for() 0 62 6
A _Event.wait() 0 5 1
A _Event.process() 0 4 2
A EventMgr.__init__() 0 2 1
A EventMgr.process_events() 0 11 2
A EventMgr.wait_for() 0 34 2
A _Processable.matches_event() 0 16 3
A _LoopMgr.__init__() 0 7 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A _lowest_value() 0 11 2
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from abc import ABC, abstractmethod
5
from asyncio import Event, wait_for as _wait_for, get_running_loop, TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
6
from collections import deque
7
from typing import Any, List, Union
8
9
from .types import CheckFunction
10
from ..exceptions import TimeoutError as PincerTimeoutError
11
12
13
class _Processable(ABC):
14
15
    @abstractmethod
16
    def process(self, event_name: str, *args):
17
        """
18
        Method that is ran when an event is recieved from discord.
19
20
        Parameters
21
        ----------
22
        event_name : str
23
            The name of the event.
24
        *args : Any
25
            Arguments to evaluate check with.
26
27
        Returns
28
        -------
29
        bool
30
            Whether the event can be set
31
        """
32
33
    def matches_event(self, event_name: str, *args):
34
        """
35
        Parameters
36
        ----------
37
        event_name : str
38
            Name of event.
39
        *args : Any
40
            Arguments to evalue check with.
41
        """
42
        if self.event_name != event_name:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named event_name.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
43
            return False
44
45
        if self.check:
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
46
            return self.check(*args)
0 ignored issues
show
Bug introduced by
The Instance of _Processable does not seem to have a member named check.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
47
48
        return True
49
50
51
def _lowest_value(*args):
52
    """
53
    Returns lowest value from list of numbers. ``None`` is not counted as a
54
    value. ``None`` is returned if all arguments are ``None``.
55
    """
56
    args_without_none = [n for n in args if n is not None]
57
58
    if len(args_without_none) == 0:
59
        return None
60
61
    return min(args_without_none)
62
63
64
class _Event(_Processable):
65
    """
66
    Parameters
67
    ----------
68
    event_name : str
69
        The name of the event.
70
    check : Optional[Callable[[Any], bool]]
71
        ``can_be_set`` only returns true if this function returns true.
72
        Will be ignored if set to None.
73
74
    Attributes
75
    ----------
76
    event : :class:`asyncio.Event`
77
        Even that is used to wait until the next valid discord event.
78
    return_value : Optional[str]
79
        Used to store the arguments from ``can_be_set`` so they can be
80
        returned later.
81
    """
82
83
    def __init__(
84
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
85
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
86
        check: CheckFunction
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
87
    ):
88
        self.event_name = event_name
89
        self.check = check
90
        self.event = Event()
91
        self.return_value = None
92
        super().__init__()
93
94
    async def wait(self):
95
        """
96
        Waits until ``self.event`` is set.
97
        """
98
        await self.event.wait()
99
100
    def process(self, event_name: str, *args) -> bool:
101
        if self.matches_event(event_name, *args):
102
            self.return_value = args
103
            self.event.set()
104
105
106
class _LoopEmptyError(Exception):
107
    "Raised when the _LoopMgr is empty and cannot accept new item"
108
109
110
class _LoopMgr(_Processable):
111
    """
112
    Parameters
113
    ----------
114
    event_name : str
115
        The name of the event.
116
    check : Optional[Callable[[Any], bool]]
117
        ``can_be_set`` only returns true if this function returns true.
118
        Will be ignored if set to None.
119
120
    Attributes
121
    ----------
122
    can_expand : bool
123
        Whether the queue is allowed to grow. Turned to false once the
124
        EventMgr's timer runs out.
125
    events : :class:`collections.deque`
126
        Qeue of events to be processed.
127
    wait : :class:`asyncio.Event`
128
        Used to make ``get_next()` wait for the next event.
129
    """
130
131
    def __init__(self, event_name: str, check: CheckFunction) -> None:
132
        self.event_name = event_name
133
        self.check = check
134
135
        self.can_expand = True
136
        self.events = deque()
137
        self.wait = Event()
138
139
    def process(self, event_name: str, *args):
140
        if not self.can_expand:
141
            return
142
143
        if self.matches_event(event_name, *args):
144
            self.events.append(args)
145
            self.wait.set()
146
147
    async def get_next(self):
148
        """
149
        Returns the next item if the queue. If there are no items in the queue,
150
        it will return the next event that happens.
151
        """
152
        if not self.events:
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
153
            if not self.can_expand:
154
                raise _LoopEmptyError
155
156
            self.wait.clear()
157
            await self.wait.wait()
158
            return self.events.popleft()
159
        else:
160
            return self.events.popleft()
161
162
163
class EventMgr:
164
    """
165
    Attributes
166
    ----------
167
    event_list : List[_DiscordEvent]
168
        The List of events that need to be processed.
169
    """
170
171
    def __init__(self):
172
        self.event_list: List[_Processable] = []
173
174
    def process_events(self, event_name, *args):
175
        """
176
        Parameters
177
        ----------
178
        event_name : str
179
            The name of the event to be processed.
180
        *args : Any
181
            The arguments returned from the middleware for this event.
182
        """
183
        for event in self.event_list:
184
            event.process(event_name, *args)
185
186
    async def wait_for(
187
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
188
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
189
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
190
        timeout: Union[float, None]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
191
    ) -> Any:
192
        """
193
        Parameters
194
        ----------
195
        event_name : str
196
            The type of event. It should start with `on_`. This is the same
197
            name that is used for @Client.event.
198
        check : Union[Callable[[Any], bool], None]
199
            This function only returns a value if this return true.
200
        timeout: Union[float, None]
201
            Amount of seconds before timeout. Use None for no timeout.
202
203
        Returns
204
        ------
205
        Any
206
            What the Discord API returns for this event.
207
        """
208
209
        event = _Event(event_name, check)
210
        self.event_list.append(event)
211
212
        try:
213
            await _wait_for(event.wait(), timeout=timeout)
214
        except TimeoutError:
215
            raise PincerTimeoutError(
216
                "wait_for() timed out while waiting for an event."
217
            )
218
        self.event_list.remove(event)
219
        return event.return_value
220
221
    async def loop_for(
222
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
223
        event_name: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
224
        check: CheckFunction,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
225
        iteration_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
226
        loop_timeout: Union[float, None],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
227
    ) -> Any:
228
        """
229
        Parameters
230
        ----------
231
        event_name : str
232
            The type of event. It should start with `on_`. This is the same
233
            name that is used for @Client.event.
234
        check : Callable[[Any], bool]
235
            This function only returns a value if this return true.
236
        iteration_timeout: Union[float, None]
237
            Amount of seconds before timeout. Timeouts are for each loop.
238
        loop_timeout: Union[float, None]
239
            Amount of seconds before the entire loop times out. The generator
240
            will only raise a timeout error while it is waiting for an event.
241
242
        Yields
243
        ------
244
        Any
245
            What the Discord API returns for this event.
246
        """
247
248
        loop_mgr = _LoopMgr(event_name, check)
249
        self.event_list.append(loop_mgr)
250
251
        loop = get_running_loop()
252
253
        while True:
254
            start_time = loop.time()
255
256
            try:
257
                yield await _wait_for(
258
                    loop_mgr.get_next(),
259
                    timeout=_lowest_value(
260
                        loop_timeout, iteration_timeout
261
                    )
262
                )
263
264
            except TimeoutError:
265
                # Loop timed out. Loop through the remaining events recieved
266
                # before the timeout.
267
                loop_mgr.can_expand = False
268
                try:
269
                    while True:
270
                        yield await loop_mgr.get_next()
271
                except _LoopEmptyError:
272
                    raise PincerTimeoutError(
273
                        "loop_for() timed out while waiting for an event"
274
                    )
275
276
            loop_timeout -= loop.time() - start_time
277
278
            # loop_timeout can be below 0 if the user's code in the for loop
279
            # takes longer than the time left in loop_timeout
280
            if loop_timeout <= 0:
281
                raise PincerTimeoutError(
282
                    "loop_for() timed out while waiting for an event"
283
                )
284