| Total Complexity | 6 |
| Total Lines | 44 |
| Duplicated Lines | 0 % |
| Coverage | 94.44% |
| Changes | 0 | ||
| 1 | """scheduler used by the sndtrace_cp |
||
| 2 | |||
| 3 | This module is intended to replace the usage of the napp |
||
| 4 | called scheduler soon to be deprecated. |
||
| 5 | """ |
||
| 6 | 1 | import pytz |
|
| 7 | 1 | from apscheduler.jobstores.base import JobLookupError |
|
| 8 | 1 | from apscheduler.schedulers.background import BackgroundScheduler |
|
| 9 | |||
| 10 | |||
| 11 | 1 | class Scheduler: |
|
| 12 | """Class to set scheduler.""" |
||
| 13 | |||
| 14 | 1 | def __init__(self) -> None: |
|
| 15 | """Create an instance of Scheduler and starts it.""" |
||
| 16 | 1 | self.scheduler = BackgroundScheduler(timezone=pytz.utc) |
|
| 17 | 1 | self.scheduler.start() |
|
| 18 | |||
| 19 | # pylint: disable=too-many-arguments |
||
| 20 | 1 | def add_callable(self, id_, func, trigger, args=None, kwargs=None, |
|
| 21 | **trigger_args): |
||
| 22 | """Add job to scheduler""" |
||
| 23 | 1 | return self.scheduler.add_job(func, trigger=trigger, args=args, |
|
| 24 | kwargs=kwargs, |
||
| 25 | id=id_, **trigger_args) |
||
| 26 | |||
| 27 | 1 | def remove_job(self, id_): |
|
| 28 | """Remove job from scheduler""" |
||
| 29 | 1 | try: |
|
| 30 | 1 | self.scheduler.remove_job(id_) |
|
| 31 | 1 | except JobLookupError: |
|
| 32 | 1 | pass |
|
| 33 | |||
| 34 | 1 | def shutdown(self, wait): |
|
| 35 | """Shut down scheduler. |
||
| 36 | Delete the jobs inside the scheduler before calling |
||
| 37 | this function. |
||
| 38 | """ |
||
| 39 | self.scheduler.shutdown(wait) |
||
| 40 | |||
| 41 | 1 | def get_job(self, id_): |
|
| 42 | """Return an scheduled job with id_ as id""" |
||
| 43 | return self.scheduler.get_job(id_) |
||
| 44 |