TestScheduler.teardown_method()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Module to test the schedule.py file."""
2 1
import datetime
3 1
from unittest.mock import patch
4
5 1
from apscheduler.triggers.cron import CronTrigger
6 1
from pytz import utc
7
8 1
from napps.kytos.mef_eline.models import EVC
9 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
10 1
from napps.kytos.mef_eline.tests.helpers import get_controller_mock
11
12
13 1
class TestCircuitSchedule():
14
    """Tests to verify circuit_schedule class."""
15
16 1
    def test_id(self):
17
        """Test method id with different values."""
18 1
        assert CircuitSchedule().id != CircuitSchedule().id
19
20 1
    def test_with_date(self):
21
        """Test create circuit schedule with date."""
22 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
23 1
        options = {
24
            "action": "create",
25
            "date": datetime.datetime.now().strftime(time_fmt),
26
        }
27 1
        circuit_schedule = CircuitSchedule(**options)
28 1
        assert "create" == circuit_schedule.action
29 1
        assert options["date"] == circuit_schedule.date
30
31 1
    def test_with_interval(self):
32
        """Test create circuit schedule with interval."""
33 1
        options = {"action": "create", "interval": {"hours": 2}}
34 1
        circuit_schedule = CircuitSchedule(**options)
35 1
        assert "create" == circuit_schedule.action
36 1
        assert options["interval"] == circuit_schedule.interval
37
38 1
    def test_with_frequency(self):
39
        """Test create circuit schedule with frequency."""
40 1
        options = {"action": "create", "frequency": "1 * * * *"}
41 1
        circuit_schedule = CircuitSchedule(**options)
42 1
        assert "create" == circuit_schedule.action
43 1
        assert options["frequency"] == circuit_schedule.frequency
44
45 1
    def test_from_dict(self):
46
        """Test create circuit schedule from dict."""
47 1
        circuit_schedule_dict = {
48
            "id": 52342432,
49
            "action": "create",
50
            "frequency": "1 * * * *",
51
        }
52 1
        circuit_schedule = CircuitSchedule.from_dict(circuit_schedule_dict)
53 1
        assert circuit_schedule.id == circuit_schedule_dict["id"]
54 1
        assert circuit_schedule.action == circuit_schedule_dict["action"]
55 1
        assert circuit_schedule.frequency == circuit_schedule_dict["frequency"]
56
57 1
    def test_as_dict(self):
58
        """Test method as_dict from circuit_schedule."""
59 1
        options = {
60
            "id": 234243242,
61
            "action": "create",
62
            "frequency": "1 * * * *",
63
        }
64 1
        circuit_schedule_dict = CircuitSchedule(**options).as_dict()
65 1
        assert options == circuit_schedule_dict
66
67
68 1
class TestScheduler():
69
    """Class to test the structure Schedule."""
70
71 1
    def setup_method(self):
72
        """Procedure executed before each test."""
73 1
        self.scheduler = Scheduler()
74
75 1
    def teardown_method(self):
76
        """Proedure executed after each test."""
77 1
        self.scheduler.shutdown()
78
79 1
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
80 1
    @patch("napps.kytos.mef_eline.models.EVC._validate")
81 1
    def test_new_circuit_with_run_time(
82
        self, validate_mock, scheduler_add_job_mock
83
    ):
84
        """Test if add new circuit with run_time."""
85 1
        scheduler_add_job_mock.return_value = True
86 1
        validate_mock.return_value = True
87 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
88 1
        date = datetime.datetime.now().strftime(time_fmt)
89 1
        circuit_scheduler = CircuitSchedule(action="remove", date=date)
90 1
        options = {
91
            "controller": get_controller_mock(),
92
            "name": "my evc1",
93
            "uni_a": "uni_a",
94
            "uni_z": "uni_z",
95
            "circuit_scheduler": [circuit_scheduler],
96
        }
97 1
        evc = EVC(**options)
98 1
        self.scheduler.add(evc)
99 1
        expected_parameters = {
100
            "id": circuit_scheduler.id,
101
            "run_date": circuit_scheduler.date,
102
        }
103 1
        scheduler_add_job_mock.assert_called_once_with(
104
            evc.remove, "date", **expected_parameters
105
        )
106
107 1
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
108 1
    @patch("napps.kytos.mef_eline.models.EVC._validate")
109 1
    def test_new_circuit_with_interval(
110
        self, validate_mock, scheduler_add_job_mock
111
    ):
112
        """Test if add new circuit with interval."""
113 1
        scheduler_add_job_mock.return_value = True
114 1
        validate_mock.return_value = True
115 1
        interval = {"hours": 2, "minutes": 3}
116 1
        circuit_scheduler = CircuitSchedule(action="create", interval=interval)
117 1
        options = {
118
            "controller": get_controller_mock(),
119
            "name": "my evc1",
120
            "uni_a": "uni_a",
121
            "uni_z": "uni_z",
122
            "start_date": "2019-08-09T19:25:06",
123
            "circuit_scheduler": [circuit_scheduler],
124
        }
125 1
        evc = EVC(**options)
126 1
        self.scheduler.add(evc)
127
128 1
        expected_parameters = {
129
            "id": circuit_scheduler.id,
130
            "hours": 2,
131
            "minutes": 3,
132
            "end_date": None,
133
            "start_date": datetime.datetime(
134
                2019, 8, 9, 19, 25, 6, 0, tzinfo=datetime.timezone.utc
135
            ),
136
        }
137 1
        scheduler_add_job_mock.assert_called_once_with(
138
            evc.deploy, "interval", **expected_parameters
139
        )
140
141 1
    @patch("apscheduler.triggers.cron.CronTrigger.from_crontab")
142 1
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
143 1
    @patch("napps.kytos.mef_eline.models.EVC._validate")
144 1
    def test_new_circuit_with_frequency(
145
        self, validate_mock, scheduler_add_job_mock, trigger_mock
146
    ):
147
        """Test if add new circuit with frequency."""
148 1
        scheduler_add_job_mock.return_value = True
149 1
        validate_mock.return_value = True
150
151 1
        frequency = "* * * * *"
152 1
        circuit_scheduler = CircuitSchedule(
153
            action="create", frequency=frequency
154
        )
155
156 1
        trigger = CronTrigger.from_crontab(
157
            circuit_scheduler.frequency, timezone=utc
158
        )
159 1
        trigger_mock.return_value = trigger
160
161 1
        options = {
162
            "controller": get_controller_mock(),
163
            "name": "my evc1",
164
            "uni_a": "uni_a",
165
            "uni_z": "uni_z",
166
            "start_date": "2019-08-09T19:25:06",
167
            "circuit_scheduler": [circuit_scheduler],
168
        }
169 1
        evc = EVC(**options)
170 1
        self.scheduler.add(evc)
171 1
        expected_parameters = {
172
            "id": circuit_scheduler.id,
173
            "end_date": None,
174
            "start_date": datetime.datetime(
175
                2019, 8, 9, 19, 25, 6, 0, tzinfo=datetime.timezone.utc
176
            ),
177
        }
178 1
        scheduler_add_job_mock.assert_called_once_with(
179
            evc.deploy, trigger, **expected_parameters
180
        )
181