Passed
Pull Request — master (#36)
by
unknown
02:50
created

build.scheduler   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 40
Duplicated Lines 0 %

Test Coverage

Coverage 92.86%

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 40
rs 10
c 0
b 0
f 0
ccs 13
cts 14
cp 0.9286
wmc 5

5 Methods

Rating   Name   Duplication   Size   Complexity  
A Scheduler.__init__() 0 4 1
A Scheduler.get_job() 0 3 1
A Scheduler.shutdown() 0 6 1
A Scheduler.remove_job() 0 3 1
A Scheduler.add_callable() 0 6 1
1
"""scheduler used by the sndtrace_cp
2
3
This module is intended to replace the usage of the napp
4
called scheduler soon to be deprecated.
5
"""
6 1
import pytz
7 1
from apscheduler.schedulers.background import BackgroundScheduler
8
9
10 1
class Scheduler:
11
    """Class to set scheduler."""
12
13 1
    def __init__(self) -> None:
14
        """Create an instance of Scheduler and starts it."""
15 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
16 1
        self.scheduler.start()
17
18
    # pylint: disable=too-many-arguments
19 1
    def add_callable(self, id_, func, trigger, args=None, kwargs=None,
20
                     **trigger_args):
21
        """Add job to scheduler"""
22 1
        return self.scheduler.add_job(func, trigger=trigger, args=args,
23
                                      kwargs=kwargs,
24
                                      id=id_, **trigger_args)
25
26 1
    def remove_job(self, id_):
27
        """Remove job from scheduler"""
28 1
        self.scheduler.remove_job(id_)
29
30 1
    def shutdown(self, wait):
31
        """Shut down scheduler.
32
        Delete the jobs inside the scheduler before calling
33
        this function.
34
        """
35
        self.scheduler.shutdown(wait)
36
37 1
    def get_job(self, id_):
38
        """Return an scheduled job with id_ as id"""
39
        return self.scheduler.get_job(id_)
40