Test Failed
Push — master ( 98eee7...9ba056 )
by Antonio
03:04 queued 11s
created

build.scheduler   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 95
Duplicated Lines 0 %

Test Coverage

Coverage 87.93%

Importance

Changes 0
Metric Value
eloc 62
dl 0
loc 95
ccs 51
cts 58
cp 0.8793
rs 10
c 0
b 0
f 0
wmc 20

9 Methods

Rating   Name   Duplication   Size   Complexity  
A CircuitSchedule.id() 0 4 1
A CircuitSchedule.from_dict() 0 4 1
A Scheduler.cancel_job() 0 3 1
A CircuitSchedule.as_dict() 0 12 4
A CircuitSchedule.__init__() 0 7 1
B Scheduler.add() 0 27 8
A Scheduler.shutdown() 0 3 1
A Scheduler.__init__() 0 4 1
A Scheduler.remove() 0 4 2
1
"""Module responsible to handle schedulers."""
2 2
from uuid import uuid4
3
4 2
from apscheduler.jobstores.base import ConflictingIdError
5 2
from apscheduler.schedulers.background import BackgroundScheduler
6 2
from apscheduler.triggers.cron import CronTrigger
7 2
from pytz import utc
8
9 2
from kytos.core import log
10
11
12 2
class CircuitSchedule:
13
    """Schedule events."""
14
15 2
    def __init__(self, **kwargs):
16
        """Create a CircuitSchedule object."""
17 2
        self._id = kwargs.get('id', uuid4().hex)
18 2
        self.date = kwargs.get('date', None)
19 2
        self.interval = kwargs.get('interval', None)
20 2
        self.frequency = kwargs.get('frequency', None)
21 2
        self.action = kwargs.get('action', 'create')
22
23 2
    @property
24
    def id(self):  # pylint: disable=invalid-name
25
        """Return this EVC's ID."""
26 2
        return self._id
27
28 2
    def as_dict(self):
29
        """Return a dictionary representing an circuit schedule object."""
30 2
        circuit_scheduler_dict = {'id': self.id, 'action': self.action}
31
32 2
        if self.date:
33
            circuit_scheduler_dict['date'] = self.date
34 2
        if self.frequency:
35 2
            circuit_scheduler_dict['frequency'] = self.frequency
36 2
        if self.interval:
37 2
            circuit_scheduler_dict['interval'] = self.interval
38
39 2
        return circuit_scheduler_dict
40
41 2
    @classmethod
42
    def from_dict(cls, data):
43
        """Return a CircuitSchedule object from dict."""
44 2
        return cls(**data)
45
46
47 2
class Scheduler:
48
    """Class to schedule the circuits rules."""
49
50 2
    def __init__(self):
51
        """Create a new schedule structure."""
52 2
        self.scheduler = BackgroundScheduler(timezone=utc)
53 2
        self.scheduler.start()
54
55 2
    def shutdown(self):
56
        """Shutdown the scheduler."""
57 2
        self.scheduler.shutdown(wait=False)
58
59 2
    def add(self, circuit):
60
        """Add all circuit_scheduler from specific circuit."""
61 2
        for circuit_scheduler in circuit.circuit_scheduler:
62 2
            data = {'id': circuit_scheduler.id}
63 2
            action = None
64
65 2
            if circuit_scheduler.action == 'create':
66 2
                action = circuit.deploy
67 2
            elif circuit_scheduler.action == 'remove':
68 2
                action = circuit.remove
69
70 2
            if circuit_scheduler.date:
71 2
                data.update({'run_date': circuit_scheduler.date})
72 2
                trigger = 'date'
73 2
            elif circuit_scheduler.interval:
74 2
                data.update(circuit_scheduler.interval)
75 2
                trigger = 'interval'
76 2
            elif circuit_scheduler.frequency:
77 2
                trigger = CronTrigger.from_crontab(circuit_scheduler.frequency,
78
                                                   timezone=utc)
79
            else:
80
                continue
81
82 2
            try:
83 2
                self.scheduler.add_job(action, trigger, **data)
84
            except ConflictingIdError:
85
                log.info(f'Job with id {circuit_scheduler.id} already added.')
86
87 2
    def remove(self, circuit):
88
        """Remove all scheduler from a circuit."""
89
        for job in circuit.circuit_scheduler:
90
            self.cancel_job(job.id)
91
92 2
    def cancel_job(self, circuit_scheduler_id):
93
        """Cancel a specific job from scheduler."""
94
        self.scheduler.remove_job(circuit_scheduler_id)
95