Issues (1445)

pincer/utils/tasks.py (13 issues)

1
# Copyright Pincer 2021-Present
0 ignored issues
show
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
from __future__ import annotations
5
6
import asyncio
7
import logging
8
from asyncio import TimerHandle, iscoroutinefunction
9
from datetime import timedelta
10
from typing import TYPE_CHECKING, Optional
11
12
from . import __package__
13
from .insertion import should_pass_cls
14
from ..exceptions import (
15
    TaskAlreadyRunning,
16
    TaskCancelError,
17
    TaskInvalidDelay,
18
    TaskIsNotCoroutine,
19
)
20
21
if TYPE_CHECKING:
22
    from typing import Callable, Set
23
    from .types import Coro
24
25
26
_log = logging.getLogger(__package__)
27
28
29
class TaskScheduler:
0 ignored issues
show
Missing class docstring
Loading history...
30
    def __init__(self, client):
31
        """
32
        Used to create tasks
33
        """
34
        self.client = client
35
        self.tasks: Set[Task] = set()
36
        self._loop = asyncio.get_event_loop()
37
38
    def loop(
0 ignored issues
show
Too many arguments (8/5)
Loading history...
39
        self,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
40
        days=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
41
        weeks=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
42
        hours=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
43
        minutes=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
44
        seconds=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
45
        milliseconds=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
46
        microseconds=0,
0 ignored issues
show
Wrong hanging indentation before block (add 4 spaces).
Loading history...
47
    ) -> Callable[[Coro], Task]:
48
        """A decorator to create a task that repeat the given amount of t
49
        :Example usage:
50
51
        .. code-block:: python
52
53
            from pincer import Client
54
            from pincer.utils import TaskScheduler
55
56
            client = Client("token")
57
            task = TaskScheduler(client)
58
59
            @task.loop(minutes=3)
60
            async def my_task(self):
61
                ...
62
63
            my_task.start()
64
            client.run()
65
66
        Parameters
67
        ----------
68
        days : :class:`int`
69
            Days to wait between iterations.
70
            |default| ``0``
71
        weeks : :class:`int`
72
            Days to wait between iterations.
73
            |default| ``0``
74
        hours : :class:`int`
75
            Days to wait between iterations.
76
            |default| ``0``
77
        minutes : :class:`int`
78
            Days to wait between iterations.
79
            |default| ``0``
80
        seconds : :class:`int`
81
            Days to wait between iterations.
82
            |default| ``0``
83
        milliseconds : :class:`int`
84
            Days to wait between iterations.
85
            |default| ``0``
86
        microseconds : :class:`int`
87
            Days to wait between iterations.
88
            |default| ``0``
89
        Raises
90
        ------
91
        TaskIsNotCoroutine:
92
            The task is not a coroutine.
93
        TaskInvalidDelay:
94
            The delay is 0 or negative.
95
        """
96
97
        def decorator(func: Coro) -> Task:
98
            if not iscoroutinefunction(func):
99
                raise TaskIsNotCoroutine(
100
                    f"Task `{func.__name__}` is not a coroutine, "
101
                    "which is required for tasks."
102
                )
103
104
            delay = timedelta(
105
                days=days,
106
                weeks=weeks,
107
                hours=hours,
108
                minutes=minutes,
109
                seconds=seconds,
110
                microseconds=microseconds,
111
                milliseconds=milliseconds,
112
            ).total_seconds()
113
114
            if delay <= 0:
115
                raise TaskInvalidDelay(
116
                    f"Task `{func.__name__}` has a delay of {delay} seconds, "
117
                    "which is invalid. Delay must be greater than zero."
118
                )
119
120
            return Task(self, func, delay)
121
122
        return decorator
123
124
    def register(self, task: Task):
125
        """Register a task.
126
        Parameters
127
        ----------
128
        task : :class:`~pincer.utils.tasks.Task`
129
            The task to register.
130
        """
131
        self.tasks.add(task)
132
        self.__execute(task)
133
134
    def __execute(self, task: Task):
135
        """Execute a task."""
136
        coro = task.coro(self.client) if task.client_required else task.coro()
137
        # Execute the coroutine
138
        asyncio.ensure_future(coro)
139
140
        # Schedule the coroutine's next execution
141
        task._handle = self._loop.call_later(task.delay, self.__execute, task)
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _handle was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
142
143
    def close(self):
144
        """Gracefully stops any running task."""
145
        for task in self.tasks.copy():
146
            if task.running:
147
                task.cancel()
148
149
150
class Task:
151
    """A Task is a coroutine that is scheduled to repeat every x seconds.
152
    Use a TaskScheduler in order to create a task.
153
    Parameters
154
    ----------
155
    scheduler: :class:`~pincer.utils.tasks.TaskScheduler`
156
        The scheduler to use.
157
    coro: :class:`~pincer.utils.types.Coro`
158
        The coroutine to register as a task.
159
    delay: :class:`float`
160
        Delay between each iteration of the task.
161
    """
162
163
    def __init__(self, scheduler: TaskScheduler, coro: Coro, delay: float):
164
        self._scheduler = scheduler
165
        self.coro = coro
166
        self.delay = delay
167
        self._handle: Optional[TimerHandle] = None
168
        self._client_required = should_pass_cls(coro)
169
170
    def __del__(self):
171
        if self.running:
172
            self.cancel()
173
        else:
174
            # Did the user forgot to call task.start() ?
175
            _log.warning(
176
                "Task `%s` was not scheduled. Did you forget to start it ?",
177
                self.coro.__name__,
178
            )
179
180
    @property
181
    def cancelled(self):
182
        """:class:`bool`: Check if the task has been cancelled or not."""
183
        return self.running and self._handle.cancelled()
184
185
    @property
186
    def running(self):
187
        """:class:`bool`: Check if the task is running."""
188
        return self._handle is not None
189
190
    def start(self):
191
        """Register the task in the TaskScheduler and start
192
        the execution of the task.
193
        """
194
        if self.running:
195
            raise TaskAlreadyRunning(
196
                f"Task `{self.coro.__name__}` is already running.", self
197
            )
198
199
        self._scheduler.register(self)
200
201
    def cancel(self):
202
        """Cancel the task."""
203
        if not self.running:
204
            raise TaskCancelError(
205
                f"Task `{self.coro.__name__}` is not running.", self
206
            )
207
208
        self._handle.cancel()
209
        if self in self._scheduler.tasks:
210
            self._scheduler.tasks.remove(self)
211
212
    @property
213
    def client_required(self):
214
        # TODO: fix docs
0 ignored issues
show
TODO and FIXME comments should generally be avoided.
Loading history...
215
        """
216
217
        Returns
218
        -------
219
220
        """
221
        return self._client_required
222