Passed
Branch main (3a3ea0)
by Yohann
01:30
created

pincer.utils.tasks.Task.start()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# Copyright Pincer 2021-Present
0 ignored issues
show
introduced by
Missing module docstring
Loading history...
2
# Full MIT License can be found in `LICENSE` at the project root.
3
4
import asyncio
5
import logging
6
from asyncio import TimerHandle, iscoroutinefunction
7
from datetime import timedelta
8
from typing import Callable, Set
9
10
11
from ..exceptions import (
12
    TaskAlreadyRunning, TaskCancelError, TaskInvalidDelay,
13
    TaskIsNotCoroutine
14
)
15
from . import __package__
16
from .insertion import should_pass_cls
17
from .types import Coro
18
19
_log = logging.getLogger(__package__)
20
21
22
class Task:
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
23
    def __init__(self, scheduler: 'TaskScheduler', coro: Coro, delay: float):
24
        """
25
        A Task is a coroutine that is scheduled to repeat every x seconds.
26
        Use a TaskScheduler in order to create a task.
27
        """
28
        self._scheduler = scheduler
29
        self.coro = coro
30
        self.delay = delay
31
        self._handle: TimerHandle = None
32
        self._client_required = should_pass_cls(coro)
33
34
    def __del__(self):
35
        if self.running:
36
            self.cancel()
37
        else:
38
            # Did the user forgot to call task.start() ?
39
            _log.warn(
0 ignored issues
show
introduced by
Using deprecated method warn()
Loading history...
40
                "Task `%s` was not scheduled. Did you forget to start it ?",
41
                self.coro.__name__
42
            )
43
44
    @property
45
    def cancelled(self):
46
        """Check if the task has been cancelled or not."""
47
        return self.running and self._handle.cancelled()
48
49
    @property
50
    def running(self):
51
        """Check if the task is running."""
52
        return self._handle is not None
53
54
    def start(self):
55
        """
56
        Register the task in the TaskScheduler and start
57
        the execution of the task.
58
        """
59
        if self.running:
60
            raise TaskAlreadyRunning(
61
                f'Task `{self.coro.__name__}` is already running.', self
62
            )
63
64
        self._scheduler.register(self)
65
66
    def cancel(self):
67
        """Cancel the task."""
68
        if not self.running:
69
            raise TaskCancelError(
70
                f'Task `{self.coro.__name__}` is not running.', self
71
            )
72
73
        self._handle.cancel()
74
        if self in self._scheduler.tasks:
75
            self._scheduler.tasks.remove(self)
76
77
78
class TaskScheduler:
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
79
    def __init__(self, client):
80
        self.client = client
81
        self.tasks: Set[Task] = set()
82
        self._loop = asyncio.get_event_loop()
83
84
    def loop(
0 ignored issues
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
85
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
86
        days=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
87
        weeks=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
88
        hours=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
89
        minutes=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
90
        seconds=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
91
        microseconds=0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
92
        milliseconds=0
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
93
    ) -> Callable[[Coro], Task]:
94
        """
95
        Create a task that repeat the given amount of time.
96
97
        :Example usage:
98
99
        .. code-block:: python
100
            from pincer import Client
101
            from pincer.utils import TaskScheduler
102
103
            client = Client("token")
104
            task = TaskScheduler(client)
105
106
            @task.loop(minutes=3)
107
            async def my_task(self):
108
                ...
109
110
            my_task.start()
111
            client.run()
112
        """
113
        def decorator(func: Coro) -> Task:
114
            if not iscoroutinefunction(func):
115
                raise TaskIsNotCoroutine(
116
                    f'Task `{func.__name__}` is not a coroutine, '
117
                    'which is required for tasks.'
118
                )
119
120
            delay = timedelta(
121
                days=days,
122
                weeks=weeks,
123
                hours=hours,
124
                minutes=minutes,
125
                seconds=seconds,
126
                microseconds=microseconds,
127
                milliseconds=milliseconds
128
            ).total_seconds()
129
130
            if delay <= 0:
131
                raise TaskInvalidDelay(
132
                    f'Task `{func.__name__}` has a delay of {delay} seconds, '
133
                    'which is invalid. Delay must be greater than zero.'
134
                )
135
136
            return Task(self, func, delay)
137
138
        return decorator
139
140
    def register(self, task: Task):
141
        """Register a task"""
142
        self.tasks.add(task)
143
        self.__execute(task)
144
145
    def __execute(self, task: Task):
146
        if task._client_required:
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _client_required was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
147
            coro = task.coro(self.client)
148
        else:
149
            coro = task.coro()
150
151
        # Execute the coroutine
152
        asyncio.ensure_future(coro)
153
154
        # Schedule the coroutine's next execution
155
        task._handle = self._loop.call_later(task.delay, self.__execute, task)
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _handle was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
156
157
    def close(self):
158
        """Gracefully stops any running task."""
159
        for task in self.tasks.copy():
160
            if task.running:
161
                task.cancel()
162