1 | # Copyright Pincer 2021-Present |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
2 | # Full MIT License can be found in `LICENSE` at the project root. |
||
3 | |||
4 | from __future__ import annotations |
||
5 | |||
6 | import asyncio |
||
7 | import logging |
||
8 | from asyncio import TimerHandle, iscoroutinefunction |
||
9 | from datetime import timedelta |
||
10 | from typing import TYPE_CHECKING, Optional |
||
11 | |||
12 | from . import __package__ |
||
13 | from .insertion import should_pass_cls |
||
14 | from ..exceptions import ( |
||
15 | TaskAlreadyRunning, |
||
16 | TaskCancelError, |
||
17 | TaskInvalidDelay, |
||
18 | TaskIsNotCoroutine, |
||
19 | ) |
||
20 | |||
21 | if TYPE_CHECKING: |
||
22 | from typing import Callable, Set |
||
23 | from .types import Coro |
||
24 | |||
25 | |||
26 | _log = logging.getLogger(__package__) |
||
27 | |||
28 | |||
29 | class TaskScheduler: |
||
0 ignored issues
–
show
|
|||
30 | def __init__(self, client): |
||
31 | """ |
||
32 | Used to create tasks |
||
33 | """ |
||
34 | self.client = client |
||
35 | self.tasks: Set[Task] = set() |
||
36 | self._loop = asyncio.get_event_loop() |
||
37 | |||
38 | def loop( |
||
0 ignored issues
–
show
|
|||
39 | self, |
||
0 ignored issues
–
show
|
|||
40 | days=0, |
||
0 ignored issues
–
show
|
|||
41 | weeks=0, |
||
0 ignored issues
–
show
|
|||
42 | hours=0, |
||
0 ignored issues
–
show
|
|||
43 | minutes=0, |
||
0 ignored issues
–
show
|
|||
44 | seconds=0, |
||
0 ignored issues
–
show
|
|||
45 | milliseconds=0, |
||
0 ignored issues
–
show
|
|||
46 | microseconds=0, |
||
0 ignored issues
–
show
|
|||
47 | ) -> Callable[[Coro], Task]: |
||
48 | """A decorator to create a task that repeat the given amount of t |
||
49 | :Example usage: |
||
50 | |||
51 | .. code-block:: python |
||
52 | |||
53 | from pincer import Client |
||
54 | from pincer.utils import TaskScheduler |
||
55 | |||
56 | client = Client("token") |
||
57 | task = TaskScheduler(client) |
||
58 | |||
59 | @task.loop(minutes=3) |
||
60 | async def my_task(self): |
||
61 | ... |
||
62 | |||
63 | my_task.start() |
||
64 | client.run() |
||
65 | |||
66 | Parameters |
||
67 | ---------- |
||
68 | days : :class:`int` |
||
69 | Days to wait between iterations. |
||
70 | |default| ``0`` |
||
71 | weeks : :class:`int` |
||
72 | Days to wait between iterations. |
||
73 | |default| ``0`` |
||
74 | hours : :class:`int` |
||
75 | Days to wait between iterations. |
||
76 | |default| ``0`` |
||
77 | minutes : :class:`int` |
||
78 | Days to wait between iterations. |
||
79 | |default| ``0`` |
||
80 | seconds : :class:`int` |
||
81 | Days to wait between iterations. |
||
82 | |default| ``0`` |
||
83 | milliseconds : :class:`int` |
||
84 | Days to wait between iterations. |
||
85 | |default| ``0`` |
||
86 | microseconds : :class:`int` |
||
87 | Days to wait between iterations. |
||
88 | |default| ``0`` |
||
89 | Raises |
||
90 | ------ |
||
91 | TaskIsNotCoroutine: |
||
92 | The task is not a coroutine. |
||
93 | TaskInvalidDelay: |
||
94 | The delay is 0 or negative. |
||
95 | """ |
||
96 | |||
97 | def decorator(func: Coro) -> Task: |
||
98 | if not iscoroutinefunction(func): |
||
99 | raise TaskIsNotCoroutine( |
||
100 | f"Task `{func.__name__}` is not a coroutine, " |
||
101 | "which is required for tasks." |
||
102 | ) |
||
103 | |||
104 | delay = timedelta( |
||
105 | days=days, |
||
106 | weeks=weeks, |
||
107 | hours=hours, |
||
108 | minutes=minutes, |
||
109 | seconds=seconds, |
||
110 | microseconds=microseconds, |
||
111 | milliseconds=milliseconds, |
||
112 | ).total_seconds() |
||
113 | |||
114 | if delay <= 0: |
||
115 | raise TaskInvalidDelay( |
||
116 | f"Task `{func.__name__}` has a delay of {delay} seconds, " |
||
117 | "which is invalid. Delay must be greater than zero." |
||
118 | ) |
||
119 | |||
120 | return Task(self, func, delay) |
||
121 | |||
122 | return decorator |
||
123 | |||
124 | def register(self, task: Task): |
||
125 | """Register a task. |
||
126 | Parameters |
||
127 | ---------- |
||
128 | task : :class:`~pincer.utils.tasks.Task` |
||
129 | The task to register. |
||
130 | """ |
||
131 | self.tasks.add(task) |
||
132 | self.__execute(task) |
||
133 | |||
134 | def __execute(self, task: Task): |
||
135 | """Execute a task.""" |
||
136 | coro = task.coro(self.client) if task.client_required else task.coro() |
||
137 | # Execute the coroutine |
||
138 | asyncio.ensure_future(coro) |
||
139 | |||
140 | # Schedule the coroutine's next execution |
||
141 | task._handle = self._loop.call_later(task.delay, self.__execute, task) |
||
0 ignored issues
–
show
It seems like
_handle was declared protected and should not be accessed from this context.
Prefixing a member variable class MyParent:
def __init__(self):
self._x = 1;
self.y = 2;
class MyChild(MyParent):
def some_method(self):
return self._x # Ok, since accessed from a child class
class AnotherClass:
def some_method(self, instance_of_my_child):
return instance_of_my_child._x # Would be flagged as AnotherClass is not
# a child class of MyParent
![]() |
|||
142 | |||
143 | def close(self): |
||
144 | """Gracefully stops any running task.""" |
||
145 | for task in self.tasks.copy(): |
||
146 | if task.running: |
||
147 | task.cancel() |
||
148 | |||
149 | |||
150 | class Task: |
||
151 | """A Task is a coroutine that is scheduled to repeat every x seconds. |
||
152 | Use a TaskScheduler in order to create a task. |
||
153 | Parameters |
||
154 | ---------- |
||
155 | scheduler: :class:`~pincer.utils.tasks.TaskScheduler` |
||
156 | The scheduler to use. |
||
157 | coro: :class:`~pincer.utils.types.Coro` |
||
158 | The coroutine to register as a task. |
||
159 | delay: :class:`float` |
||
160 | Delay between each iteration of the task. |
||
161 | """ |
||
162 | |||
163 | def __init__(self, scheduler: TaskScheduler, coro: Coro, delay: float): |
||
164 | self._scheduler = scheduler |
||
165 | self.coro = coro |
||
166 | self.delay = delay |
||
167 | self._handle: Optional[TimerHandle] = None |
||
168 | self._client_required = should_pass_cls(coro) |
||
169 | |||
170 | def __del__(self): |
||
171 | if self.running: |
||
172 | self.cancel() |
||
173 | else: |
||
174 | # Did the user forgot to call task.start() ? |
||
175 | _log.warning( |
||
176 | "Task `%s` was not scheduled. Did you forget to start it ?", |
||
177 | self.coro.__name__, |
||
178 | ) |
||
179 | |||
180 | @property |
||
181 | def cancelled(self): |
||
182 | """:class:`bool`: Check if the task has been cancelled or not.""" |
||
183 | return self.running and self._handle.cancelled() |
||
184 | |||
185 | @property |
||
186 | def running(self): |
||
187 | """:class:`bool`: Check if the task is running.""" |
||
188 | return self._handle is not None |
||
189 | |||
190 | def start(self): |
||
191 | """Register the task in the TaskScheduler and start |
||
192 | the execution of the task. |
||
193 | """ |
||
194 | if self.running: |
||
195 | raise TaskAlreadyRunning( |
||
196 | f"Task `{self.coro.__name__}` is already running.", self |
||
197 | ) |
||
198 | |||
199 | self._scheduler.register(self) |
||
200 | |||
201 | def cancel(self): |
||
202 | """Cancel the task.""" |
||
203 | if not self.running: |
||
204 | raise TaskCancelError( |
||
205 | f"Task `{self.coro.__name__}` is not running.", self |
||
206 | ) |
||
207 | |||
208 | self._handle.cancel() |
||
209 | if self in self._scheduler.tasks: |
||
210 | self._scheduler.tasks.remove(self) |
||
211 | |||
212 | @property |
||
213 | def client_required(self): |
||
214 | # TODO: fix docs |
||
0 ignored issues
–
show
|
|||
215 | """ |
||
216 | |||
217 | Returns |
||
218 | ------- |
||
219 | |||
220 | """ |
||
221 | return self._client_required |
||
222 |