Test Failed
Push — master ( 0d7af6...292dca )
by Beraldo
04:07 queued 01:46
created

build.tests.test_scheduler   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 179
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 131
dl 0
loc 179
rs 10
c 0
b 0
f 0
wmc 11

11 Methods

Rating   Name   Duplication   Size   Complexity  
A TestScheduler.test_new_circuit_with_frequency() 0 33 1
A TestCircuitSchedule.test_as_dict() 0 9 1
A TestCircuitSchedule.test_with_interval() 0 11 1
A TestScheduler.test_new_circuit_with_run_time() 0 24 1
A TestScheduler.tearDown() 0 3 1
A TestCircuitSchedule.test_id() 0 3 1
A TestScheduler.test_new_circuit_with_interval() 0 30 1
A TestCircuitSchedule.test_with_frequency() 0 9 1
A TestScheduler.setUp() 0 3 1
A TestCircuitSchedule.test_with_date() 0 10 1
A TestCircuitSchedule.test_from_dict() 0 13 1
1
"""Module to test the schedule.py file."""
2
from datetime import datetime
3
from unittest import TestCase
4
from unittest.mock import patch
5
6
from apscheduler.triggers.cron import CronTrigger
7
from pytz import utc
8
9
from napps.kytos.mef_eline.models import EVC
10
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
11
from tests.helpers import get_controller_mock
12
13
14
class TestCircuitSchedule(TestCase):
15
    """Tests to verify circuit_schedule class."""
16
17
    def test_id(self):
18
        """Test method id with different values."""
19
        self.assertNotEqual(CircuitSchedule().id, CircuitSchedule().id)
20
21
    def test_with_date(self):
22
        """Test create circuit schedule with date."""
23
        time_fmt = "%Y-%m-%dT%H:%M:%S"
24
        options = {
25
            "action": "create",
26
            "date": datetime.now().strftime(time_fmt)
27
        }
28
        circuit_schedule = CircuitSchedule(**options)
29
        self.assertEqual("create", circuit_schedule.action)
30
        self.assertEqual(options["date"], circuit_schedule.date)
31
32
    def test_with_interval(self):
33
        """Test create circuit schedule with interval."""
34
        options = {
35
            "action": "create",
36
            "interval": {
37
                "hours": 2
38
            }
39
        }
40
        circuit_schedule = CircuitSchedule(**options)
41
        self.assertEqual("create", circuit_schedule.action)
42
        self.assertEqual(options["interval"], circuit_schedule.interval)
43
44
    def test_with_frequency(self):
45
        """Test create circuit schedule with frequency."""
46
        options = {
47
            "action": "create",
48
            "frequency": "1 * * * *"
49
        }
50
        circuit_schedule = CircuitSchedule(**options)
51
        self.assertEqual("create", circuit_schedule.action)
52
        self.assertEqual(options["frequency"], circuit_schedule.frequency)
53
54
    def test_from_dict(self):
55
        """Test create circuit schedule from dict."""
56
        circuit_schedule_dict = {
57
            "id": 52342432,
58
            "action": "create",
59
            "frequency": "1 * * * *"
60
        }
61
        circuit_schedule = CircuitSchedule.from_dict(circuit_schedule_dict)
62
        self.assertEqual(circuit_schedule.id, circuit_schedule_dict["id"])
63
        self.assertEqual(circuit_schedule.action,
64
                         circuit_schedule_dict["action"])
65
        self.assertEqual(circuit_schedule.frequency,
66
                         circuit_schedule_dict["frequency"])
67
68
    def test_as_dict(self):
69
        """Test method as_dict from circuit_schedule."""
70
        options = {
71
            "id": 234243242,
72
            "action": "create",
73
            "frequency": "1 * * * *"
74
        }
75
        circuit_schedule_dict = CircuitSchedule(**options).as_dict()
76
        self.assertEqual(options, circuit_schedule_dict)
77
78
79
class TestScheduler(TestCase):
80
    """Class to test the structure Schedule."""
81
82
    def setUp(self):
83
        """Procedure executed before each test."""
84
        self.scheduler = Scheduler()
85
86
    def tearDown(self):
87
        """Proedure executed after each test."""
88
        self.scheduler.shutdown()
89
90
    @patch('apscheduler.schedulers.background.BackgroundScheduler.add_job')
91
    @patch('napps.kytos.mef_eline.models.EVC._validate')
92
    def test_new_circuit_with_run_time(self, validate_mock,
93
                                       scheduler_add_job_mock):
94
        """Test if add new circuit with run_time."""
95
        scheduler_add_job_mock.return_value = True
96
        validate_mock.return_value = True
97
        time_fmt = "%Y-%m-%dT%H:%M:%S"
98
        date = datetime.now().strftime(time_fmt)
99
        circuit_scheduler = CircuitSchedule(action="remove", date=date)
100
        options = {"controller": get_controller_mock(),
101
                   "name": 'my evc1',
102
                   "uni_a": 'uni_a',
103
                   "uni_z": 'uni_z',
104
                   "circuit_scheduler": [circuit_scheduler]
105
                   }
106
        evc = EVC(**options)
107
        self.scheduler.add(evc)
108
        expected_parameters = {
109
            "id": circuit_scheduler.id,
110
            "run_date": circuit_scheduler.date,
111
            }
112
        scheduler_add_job_mock.assert_called_once_with(evc.remove, 'date',
113
                                                       **expected_parameters)
114
115
    @patch('apscheduler.schedulers.background.BackgroundScheduler.add_job')
116
    @patch('napps.kytos.mef_eline.models.EVC._validate')
117
    def test_new_circuit_with_interval(self, validate_mock,
118
                                       scheduler_add_job_mock):
119
        """Test if add new circuit with interval."""
120
        scheduler_add_job_mock.return_value = True
121
        validate_mock.return_value = True
122
        interval = {
123
            'hours': 2,
124
            'minutes': 3
125
        }
126
        circuit_scheduler = CircuitSchedule(action="create", interval=interval)
127
        options = {"controller": get_controller_mock(),
128
                   "name": 'my evc1',
129
                   "uni_a": 'uni_a',
130
                   "uni_z": 'uni_z',
131
                   "circuit_scheduler": [circuit_scheduler]
132
                   }
133
        evc = EVC(**options)
134
        self.scheduler.add(evc)
135
136
        expected_parameters = {
137
            "end_date": None,
138
            "id": circuit_scheduler.id,
139
            "hours": 2,
140
            "minutes": 3,
141
            "start_date": evc.start_date,
142
        }
143
        scheduler_add_job_mock.assert_called_once_with(evc.deploy, 'interval',
144
                                                       **expected_parameters)
145
146
    @patch('apscheduler.triggers.cron.CronTrigger.from_crontab')
147
    @patch('apscheduler.schedulers.background.BackgroundScheduler.add_job')
148
    @patch('napps.kytos.mef_eline.models.EVC._validate')
149
    def test_new_circuit_with_frequency(self, validate_mock,
150
                                        scheduler_add_job_mock,
151
                                        trigger_mock):
152
        """Test if add new circuit with frequency."""
153
        scheduler_add_job_mock.return_value = True
154
        validate_mock.return_value = True
155
156
        frequency = "* * * * *"
157
        circuit_scheduler = CircuitSchedule(action="create",
158
                                            frequency=frequency)
159
160
        trigger = CronTrigger.from_crontab(circuit_scheduler.frequency,
161
                                           timezone=utc)
162
        trigger_mock.return_value = trigger
163
164
        options = {"controller": get_controller_mock(),
165
                   "name": 'my evc1',
166
                   "uni_a": 'uni_a',
167
                   "uni_z": 'uni_z',
168
                   "circuit_scheduler": [circuit_scheduler]
169
                   }
170
        evc = EVC(**options)
171
        self.scheduler.add(evc)
172
        expected_parameters = {
173
            "end_date":  None,
174
            "id": circuit_scheduler.id,
175
            "start_date": evc.start_date
176
        }
177
        scheduler_add_job_mock.assert_called_once_with(evc.deploy, trigger,
178
                                                       **expected_parameters)
179