Test Failed
Pull Request — master (#359)
by
unknown
03:41
created

TestScheduler.setup_method()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
ccs 1
cts 1
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Module to test the schedule.py file."""
2 1
import datetime
3 1
from unittest.mock import patch
4 1
5
from apscheduler.triggers.cron import CronTrigger
6 1
from pytz import utc
7 1
8
from napps.kytos.mef_eline.models import EVC
9 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
10 1
from napps.kytos.mef_eline.tests.helpers import get_controller_mock
11 1
12
13
class TestCircuitSchedule():
14 1
    """Tests to verify circuit_schedule class."""
15
16
    def test_id(self):
17 1
        """Test method id with different values."""
18
        assert CircuitSchedule().id != CircuitSchedule().id
19 1
20
    def test_with_date(self):
21 1
        """Test create circuit schedule with date."""
22
        time_fmt = "%Y-%m-%dT%H:%M:%S"
23 1
        options = {
24 1
            "action": "create",
25
            "date": datetime.datetime.now().strftime(time_fmt),
26
        }
27
        circuit_schedule = CircuitSchedule(**options)
28 1
        assert "create" == circuit_schedule.action
29 1
        assert options["date"] == circuit_schedule.date
30 1
31
    def test_with_interval(self):
32 1
        """Test create circuit schedule with interval."""
33
        options = {"action": "create", "interval": {"hours": 2}}
34 1
        circuit_schedule = CircuitSchedule(**options)
35 1
        assert "create" == circuit_schedule.action
36 1
        assert options["interval"] == circuit_schedule.interval
37 1
38
    def test_with_frequency(self):
39 1
        """Test create circuit schedule with frequency."""
40
        options = {"action": "create", "frequency": "1 * * * *"}
41 1
        circuit_schedule = CircuitSchedule(**options)
42 1
        assert "create" == circuit_schedule.action
43 1
        assert options["frequency"] == circuit_schedule.frequency
44 1
45
    def test_from_dict(self):
46 1
        """Test create circuit schedule from dict."""
47
        circuit_schedule_dict = {
48 1
            "id": 52342432,
49
            "action": "create",
50
            "frequency": "1 * * * *",
51
        }
52
        circuit_schedule = CircuitSchedule.from_dict(circuit_schedule_dict)
53 1
        assert circuit_schedule.id == circuit_schedule_dict["id"]
54 1
        assert circuit_schedule.action == circuit_schedule_dict["action"]
55 1
        assert circuit_schedule.frequency == circuit_schedule_dict["frequency"]
56
57
    def test_as_dict(self):
58 1
        """Test method as_dict from circuit_schedule."""
59
        options = {
60
            "id": 234243242,
61
            "action": "create",
62 1
            "frequency": "1 * * * *",
63
        }
64 1
        circuit_schedule_dict = CircuitSchedule(**options).as_dict()
65
        assert options == circuit_schedule_dict
66
67
68
class TestScheduler():
69 1
    """Class to test the structure Schedule."""
70 1
71
    def setup_method(self):
72
        """Procedure executed before each test."""
73 1
        self.scheduler = Scheduler()
74
75
    def teardown_method(self):
76 1
        """Proedure executed after each test."""
77
        self.scheduler.shutdown()
78 1
79
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
80 1
    @patch("napps.kytos.mef_eline.models.EVC._validate")
81
    def test_new_circuit_with_run_time(
82 1
        self, validate_mock, scheduler_add_job_mock
83
    ):
84 1
        """Test if add new circuit with run_time."""
85 1
        scheduler_add_job_mock.return_value = True
86 1
        validate_mock.return_value = True
87
        time_fmt = "%Y-%m-%dT%H:%M:%S"
88
        date = datetime.datetime.now().strftime(time_fmt)
89
        circuit_scheduler = CircuitSchedule(action="remove", date=date)
90 1
        options = {
91 1
            "controller": get_controller_mock(),
92 1
            "name": "my evc1",
93 1
            "uni_a": "uni_a",
94 1
            "uni_z": "uni_z",
95 1
            "circuit_scheduler": [circuit_scheduler],
96
        }
97
        evc = EVC(**options)
98
        self.scheduler.add(evc)
99
        expected_parameters = {
100
            "id": circuit_scheduler.id,
101
            "run_date": circuit_scheduler.date,
102 1
        }
103 1
        scheduler_add_job_mock.assert_called_once_with(
104 1
            evc.remove, "date", **expected_parameters
105
        )
106
107
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
108 1
    @patch("napps.kytos.mef_eline.models.EVC._validate")
109
    def test_new_circuit_with_interval(
110
        self, validate_mock, scheduler_add_job_mock
111
    ):
112 1
        """Test if add new circuit with interval."""
113 1
        scheduler_add_job_mock.return_value = True
114 1
        validate_mock.return_value = True
115
        interval = {"hours": 2, "minutes": 3}
116
        circuit_scheduler = CircuitSchedule(action="create", interval=interval)
117
        options = {
118 1
            "controller": get_controller_mock(),
119 1
            "name": "my evc1",
120 1
            "uni_a": "uni_a",
121 1
            "uni_z": "uni_z",
122 1
            "start_date": "2019-08-09T19:25:06",
123
            "circuit_scheduler": [circuit_scheduler],
124
        }
125
        evc = EVC(**options)
126
        self.scheduler.add(evc)
127
128
        expected_parameters = {
129
            "id": circuit_scheduler.id,
130 1
            "hours": 2,
131 1
            "minutes": 3,
132
            "end_date": None,
133 1
            "start_date": datetime.datetime(
134
                2019, 8, 9, 19, 25, 6, 0, tzinfo=datetime.timezone.utc
135
            ),
136
        }
137
        scheduler_add_job_mock.assert_called_once_with(
138
            evc.deploy, "interval", **expected_parameters
139
        )
140
141
    @patch("apscheduler.triggers.cron.CronTrigger.from_crontab")
142 1
    @patch("apscheduler.schedulers.background.BackgroundScheduler.add_job")
143
    @patch("napps.kytos.mef_eline.models.EVC._validate")
144
    def test_new_circuit_with_frequency(
145
        self, validate_mock, scheduler_add_job_mock, trigger_mock
146 1
    ):
147 1
        """Test if add new circuit with frequency."""
148 1
        scheduler_add_job_mock.return_value = True
149 1
        validate_mock.return_value = True
150
151
        frequency = "* * * * *"
152
        circuit_scheduler = CircuitSchedule(
153 1
            action="create", frequency=frequency
154 1
        )
155
156 1
        trigger = CronTrigger.from_crontab(
157 1
            circuit_scheduler.frequency, timezone=utc
158
        )
159
        trigger_mock.return_value = trigger
160
161 1
        options = {
162
            "controller": get_controller_mock(),
163
            "name": "my evc1",
164 1
            "uni_a": "uni_a",
165
            "uni_z": "uni_z",
166 1
            "start_date": "2019-08-09T19:25:06",
167
            "circuit_scheduler": [circuit_scheduler],
168
        }
169
        evc = EVC(**options)
170
        self.scheduler.add(evc)
171
        expected_parameters = {
172
            "id": circuit_scheduler.id,
173
            "end_date": None,
174 1
            "start_date": datetime.datetime(
175 1
                2019, 8, 9, 19, 25, 6, 0, tzinfo=datetime.timezone.utc
176 1
            ),
177
        }
178
        scheduler_add_job_mock.assert_called_once_with(
179
            evc.deploy, trigger, **expected_parameters
180
        )
181