Passed
Pull Request — master (#36)
by Vinicius
03:14
created

build.scheduler   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 44
Duplicated Lines 0 %

Test Coverage

Coverage 94.44%

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 44
rs 10
c 0
b 0
f 0
ccs 17
cts 18
cp 0.9444
wmc 6

5 Methods

Rating   Name   Duplication   Size   Complexity  
A Scheduler.get_job() 0 3 1
A Scheduler.shutdown() 0 6 1
A Scheduler.remove_job() 0 6 2
A Scheduler.__init__() 0 4 1
A Scheduler.add_callable() 0 6 1
1
"""scheduler used by the sndtrace_cp
2
3
This module is intended to replace the usage of the napp
4
called scheduler soon to be deprecated.
5
"""
6 1
import pytz
7 1
from apscheduler.jobstores.base import JobLookupError
8 1
from apscheduler.schedulers.background import BackgroundScheduler
9
10
11 1
class Scheduler:
12
    """Class to set scheduler."""
13
14 1
    def __init__(self) -> None:
15
        """Create an instance of Scheduler and starts it."""
16 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
17 1
        self.scheduler.start()
18
19
    # pylint: disable=too-many-arguments
20 1
    def add_callable(self, id_, func, trigger, args=None, kwargs=None,
21
                     **trigger_args):
22
        """Add job to scheduler"""
23 1
        return self.scheduler.add_job(func, trigger=trigger, args=args,
24
                                      kwargs=kwargs,
25
                                      id=id_, **trigger_args)
26
27 1
    def remove_job(self, id_):
28
        """Remove job from scheduler"""
29 1
        try:
30 1
            self.scheduler.remove_job(id_)
31 1
        except JobLookupError:
32 1
            pass
33
34 1
    def shutdown(self, wait):
35
        """Shut down scheduler.
36
        Delete the jobs inside the scheduler before calling
37
        this function.
38
        """
39
        self.scheduler.shutdown(wait)
40
41 1
    def get_job(self, id_):
42
        """Return an scheduled job with id_ as id"""
43
        return self.scheduler.get_job(id_)
44