build.scheduler.CircuitSchedule.id()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Module responsible for handling schedulers."""
2 1
from uuid import uuid4
3
4 1
from apscheduler.jobstores.base import JobLookupError
5 1
from apscheduler.schedulers.background import BackgroundScheduler
6 1
from apscheduler.triggers.cron import CronTrigger
7 1
from pytz import utc
8
9 1
from kytos.core import log
10
11
12 1
class CircuitSchedule:
13
    """Schedule events."""
14
15 1
    def __init__(self, **kwargs):
16
        """Create a CircuitSchedule object."""
17 1
        self._id = kwargs.get("id", uuid4().hex)
18 1
        self.date = kwargs.get("date", None)
19
        # The minimum number of seconds to wait between retries
20 1
        self.interval = kwargs.get("interval", None)
21
        # Frequency uses Cron format. Ex: "* * * * * *"
22 1
        self.frequency = kwargs.get("frequency", None)
23 1
        self.action = kwargs.get("action", "create")
24
25 1
    @property
26 1
    def id(self):  # pylint: disable=invalid-name
27
        """Return this EVC's ID."""
28 1
        return self._id
29
30 1
    @id.setter
31 1
    def id(self, value):  # pylint: disable=invalid-name
32
        """Set this EVC's ID."""
33 1
        self._id = value
34
35 1
    def as_dict(self):
36
        """Return a dictionary representing an circuit schedule object."""
37 1
        circuit_scheduler_dict = {"id": self.id, "action": self.action}
38
39 1
        if self.date:
40
            circuit_scheduler_dict["date"] = self.date
41 1
        if self.frequency:
42 1
            circuit_scheduler_dict["frequency"] = self.frequency
43 1
        if self.interval:
44 1
            circuit_scheduler_dict["interval"] = self.interval
45
46 1
        return circuit_scheduler_dict
47
48 1
    @classmethod
49 1
    def from_dict(cls, data):
50
        """Return a CircuitSchedule object from dict."""
51 1
        return cls(**data)
52
53
54 1
class Scheduler:
55
    """Class to schedule the circuits rules.
56
57
    It is responsible to create/remove schedule jobs based on
58
    Circuit Schedules.
59
    """
60
61 1
    def __init__(self):
62
        """Create a new schedule structure."""
63 1
        self.scheduler = BackgroundScheduler(timezone=utc)
64 1
        self.scheduler.start()
65
66 1
    def shutdown(self):
67
        """Shutdown the scheduler."""
68 1
        self.scheduler.shutdown(wait=False)
69
70 1
    def add(self, circuit):
71
        """
72
        Add all circuit_scheduler from specific circuit.
73
74
        Args:
75
            circuit (napps.kytos.mef_eline.models.EVCBase): EVC circuit
76
77
        """
78 1
        for circuit_scheduler in circuit.circuit_scheduler:
79 1
            self.add_circuit_job(circuit, circuit_scheduler)
80
81 1
    def remove(self, circuit):
82
        """Remove all scheduler from a circuit."""
83 1
        for job in circuit.circuit_scheduler:
84
            self.cancel_job(job.id)
85
86 1
    def add_circuit_job(self, circuit, circuit_scheduler):
87
        """
88
        Prepare the Circuit data to be added to the Scheduler.
89
90
        :param circuit(napps.kytos.mef_eline.models.EVCBase): EVC circuit
91
        :param circuit_scheduler (CircuitSchedule): Circuit schedule data
92
        :return:
93
        """
94 1
        if circuit_scheduler.action == "create":
95 1
            job_call = circuit.deploy
96 1
        elif circuit_scheduler.action == "remove":
97 1
            job_call = circuit.remove
98
        else:
99
            raise ValueError("Scheduler action must be 'create' or 'remove'")
100
101 1
        data = {"id": circuit_scheduler.id}
102 1
        if circuit_scheduler.date:
103 1
            data.update({"run_date": circuit_scheduler.date})
104
        else:
105 1
            data.update(
106
                {
107
                    "start_date": circuit.start_date,
108
                    "end_date": circuit.end_date,
109
                }
110
            )
111
112 1
        if circuit_scheduler.interval:
113 1
            data.update(circuit_scheduler.interval)
114
115 1
        self.add_job(circuit_scheduler, job_call, data)
116
117 1
    def add_job(self, circuit_scheduler, job_call, data):
118
        """
119
        Add a specific cron job to the scheduler.
120
121
        Args:
122
            circuit_scheduler: CircuitSchedule object
123
            job_call: function to be called by the job
124
            data: Dict to pass to the job_call as parameter
125
                if job_call is a date, the template is like:
126
                 {'id': <ID>, 'run_date': date } or
127
                 {'id': <ID>, 'start_date': date, 'end_date': date }
128
                if job_call is an interval, the template is like:
129
                    {   'id': <ID>,
130
                        'hours': 2,
131
                        'minutes': 3
132
                    }
133
                if job_call is frequency, the template is the cron format.
134
135
        """
136 1
        if circuit_scheduler.date:
137 1
            self.scheduler.add_job(job_call, "date", **data)
138
139 1
        elif circuit_scheduler.interval:
140 1
            self.scheduler.add_job(job_call, "interval", **data)
141
142 1
        elif circuit_scheduler.frequency:
143 1
            cron = CronTrigger.from_crontab(
144
                circuit_scheduler.frequency, timezone=utc
145
            )
146 1
            self.scheduler.add_job(job_call, cron, **data)
147
148 1
    def cancel_job(self, circuit_scheduler_id):
149
        """Cancel a specific job from scheduler."""
150 1
        try:
151 1
            self.scheduler.remove_job(circuit_scheduler_id)
152
        except JobLookupError as job_error:
153
            # Job was not found... Maybe someone already remove it.
154
            log.error(f"Scheduler error cancelling job. {0}".format(job_error))
155