Passed
Pull Request — master (#36)
by
unknown
02:50
created

build.scheduler.Scheduler.remove_job()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""scheduler used by the sndtrace_cp
2
3
This module is intended to replace the usage of the napp
4
called scheduler soon to be deprecated.
5
"""
6 1
import pytz
7 1
from apscheduler.schedulers.background import BackgroundScheduler
8
9
10 1
class Scheduler:
11
    """Class to set scheduler."""
12
13 1
    def __init__(self) -> None:
14
        """Create an instance of Scheduler and starts it."""
15 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
16 1
        self.scheduler.start()
17
18
    # pylint: disable=too-many-arguments
19 1
    def add_callable(self, id_, func, trigger, args=None, kwargs=None,
20
                     **trigger_args):
21
        """Add job to scheduler"""
22 1
        return self.scheduler.add_job(func, trigger=trigger, args=args,
23
                                      kwargs=kwargs,
24
                                      id=id_, **trigger_args)
25
26 1
    def remove_job(self, id_):
27
        """Remove job from scheduler"""
28 1
        self.scheduler.remove_job(id_)
29
30 1
    def shutdown(self, wait):
31
        """Shut down scheduler.
32
        Delete the jobs inside the scheduler before calling
33
        this function.
34
        """
35
        self.scheduler.shutdown(wait)
36
37 1
    def get_job(self, id_):
38
        """Return an scheduled job with id_ as id"""
39
        return self.scheduler.get_job(id_)
40