Passed
Pull Request — master (#36)
by
unknown
04:19 queued 21s
created

build.scheduler.Scheduler.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
crap 1
1
"""scheduler used by the sndtrace_cp
2
3
This module is intended to replace the usage of the napp
4
called scheduler soon to be deprecated.
5
"""
6 1
import pytz
7 1
from apscheduler.schedulers.background import BackgroundScheduler
8
9
10 1
class Scheduler:
11
    """Class to set scheduler."""
12
13 1
    def __init__(self) -> None:
14
        """Create an instance of Scheduler and starts it."""
15 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
16 1
        self.scheduler.start()
17
18
    # pylint: disable=too-many-arguments
19 1
    def add_callable(self, id_, func, trigger, args=None, kwargs=None,
20
                     **trigger_args):
21
        """Add job to scheduler"""
22 1
        self.scheduler.add_job(func, trigger=trigger, args=args, kwargs=kwargs,
23
                               id=id_, **trigger_args)
24
25 1
    def remove_job(self, id_):
26
        """Remove job from scheduler"""
27 1
        self.scheduler.remove_job(id_)
28
29 1
    def shutdown(self, wait):
30
        """Shut down scheduler.
31
        Delete the jobs inside the scheduler before calling
32
        this function.
33
        """
34
        self.scheduler.shutdown(wait)
35
36 1
    def get_job(self, id_):
37
        """Return an scheduled job with id_ as id"""
38
        return self.scheduler.get_job(id_)
39