Total Complexity | 5 |
Total Lines | 40 |
Duplicated Lines | 0 % |
Coverage | 92.86% |
Changes | 0 |
1 | """scheduler used by the sndtrace_cp |
||
2 | |||
3 | This module is intended to replace the usage of the napp |
||
4 | called scheduler soon to be deprecated. |
||
5 | """ |
||
6 | 1 | import pytz |
|
7 | 1 | from apscheduler.schedulers.background import BackgroundScheduler |
|
8 | |||
9 | |||
10 | 1 | class Scheduler: |
|
11 | """Class to set scheduler.""" |
||
12 | |||
13 | 1 | def __init__(self) -> None: |
|
14 | """Create an instance of Scheduler and starts it.""" |
||
15 | 1 | self.scheduler = BackgroundScheduler(timezone=pytz.utc) |
|
16 | 1 | self.scheduler.start() |
|
17 | |||
18 | # pylint: disable=too-many-arguments |
||
19 | 1 | def add_callable(self, id_, func, trigger, args=None, kwargs=None, |
|
20 | **trigger_args): |
||
21 | """Add job to scheduler""" |
||
22 | 1 | return self.scheduler.add_job(func, trigger=trigger, args=args, |
|
23 | kwargs=kwargs, |
||
24 | id=id_, **trigger_args) |
||
25 | |||
26 | 1 | def remove_job(self, id_): |
|
27 | """Remove job from scheduler""" |
||
28 | 1 | self.scheduler.remove_job(id_) |
|
29 | |||
30 | 1 | def shutdown(self, wait): |
|
31 | """Shut down scheduler. |
||
32 | Delete the jobs inside the scheduler before calling |
||
33 | this function. |
||
34 | """ |
||
35 | self.scheduler.shutdown(wait) |
||
36 | |||
37 | 1 | def get_job(self, id_): |
|
38 | """Return an scheduled job with id_ as id""" |
||
39 | return self.scheduler.get_job(id_) |
||
40 |