Passed
Pull Request — master (#36)
by Vinicius
03:14
created

build.scheduler.Scheduler.add_callable()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 7
dl 0
loc 6
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""scheduler used by the sndtrace_cp
2
3
This module is intended to replace the usage of the napp
4
called scheduler soon to be deprecated.
5
"""
6 1
import pytz
7 1
from apscheduler.jobstores.base import JobLookupError
8 1
from apscheduler.schedulers.background import BackgroundScheduler
9
10
11 1
class Scheduler:
12
    """Class to set scheduler."""
13
14 1
    def __init__(self) -> None:
15
        """Create an instance of Scheduler and starts it."""
16 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
17 1
        self.scheduler.start()
18
19
    # pylint: disable=too-many-arguments
20 1
    def add_callable(self, id_, func, trigger, args=None, kwargs=None,
21
                     **trigger_args):
22
        """Add job to scheduler"""
23 1
        return self.scheduler.add_job(func, trigger=trigger, args=args,
24
                                      kwargs=kwargs,
25
                                      id=id_, **trigger_args)
26
27 1
    def remove_job(self, id_):
28
        """Remove job from scheduler"""
29 1
        try:
30 1
            self.scheduler.remove_job(id_)
31 1
        except JobLookupError:
32 1
            pass
33
34 1
    def shutdown(self, wait):
35
        """Shut down scheduler.
36
        Delete the jobs inside the scheduler before calling
37
        this function.
38
        """
39
        self.scheduler.shutdown(wait)
40
41 1
    def get_job(self, id_):
42
        """Return an scheduled job with id_ as id"""
43
        return self.scheduler.get_job(id_)
44