Code Duplication    Length = 38-43 lines in 10 locations

tests/test_models.py 5 locations

@@ 834-876 (lines=43) @@
831
        msg = f"{evc} deployed after link down."
832
        log_mocked.debug.assert_called_once_with(msg)
833
834
    @patch('napps.kytos.mef_eline.models.log')
835
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
836
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
837
    def test_handle_link_down_case_3(self, deploy_to_mocked, deploy_mocked,
838
                                     log_mocked):
839
        """Test if circuit without dynamic path is return failed."""
840
        deploy_mocked.return_value = False
841
        deploy_to_mocked.return_value = False
842
        primary_path = [
843
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
844
                                metadata={"s_vlan": 5},
845
                                status=EntityStatus.DOWN),
846
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
847
                                metadata={"s_vlan": 6},
848
                                status=EntityStatus.UP),
849
        ]
850
        backup_path = [
851
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
852
                                metadata={"s_vlan": 5},
853
                                status=EntityStatus.DOWN),
854
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
855
                                metadata={"s_vlan": 6},
856
                                status=EntityStatus.UP),
857
        ]
858
        attributes = {
859
            "name": "circuit_name",
860
            "uni_a": get_uni_mocked(is_valid=True),
861
            "uni_z": get_uni_mocked(is_valid=True),
862
            "primary_path": primary_path,
863
            "backup_path": backup_path,
864
            "enabled": True
865
        }
866
867
        evc = EVC(**attributes)
868
        evc.current_path = evc.backup_path
869
        current_handle_link_down = evc.handle_link_down()
870
        self.assertEqual(deploy_mocked.call_count, 0)
871
        self.assertEqual(deploy_to_mocked.call_count, 1)
872
        deploy_to_mocked.assert_called_once_with('primary_path',
873
                                                 evc.primary_path)
874
        self.assertFalse(current_handle_link_down)
875
        msg = f'Failed to re-deploy {evc} after link down.'
876
        log_mocked.debug.assert_called_once_with(msg)
877
878
    @patch('napps.kytos.mef_eline.models.log')
879
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 791-832 (lines=42) @@
788
        msg = f"{evc} deployed after link down."
789
        log_mocked.debug.assert_called_once_with(msg)
790
791
    @patch('napps.kytos.mef_eline.models.log')
792
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
793
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
794
    def test_handle_link_down_case_2(self, deploy_to_mocked, deploy_mocked,
795
                                     log_mocked):
796
        """Test if deploy_to backup path is called."""
797
        deploy_mocked.return_value = True
798
        deploy_to_mocked.return_value = True
799
        primary_path = [
800
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
801
                                metadata={"s_vlan": 5},
802
                                status=EntityStatus.UP),
803
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
804
                                metadata={"s_vlan": 6},
805
                                status=EntityStatus.UP),
806
        ]
807
        backup_path = [
808
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
809
                                metadata={"s_vlan": 5},
810
                                status=EntityStatus.DOWN),
811
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
812
                                metadata={"s_vlan": 6},
813
                                status=EntityStatus.UP),
814
        ]
815
        attributes = {
816
            "name": "circuit_name",
817
            "uni_a": get_uni_mocked(is_valid=True),
818
            "uni_z": get_uni_mocked(is_valid=True),
819
            "primary_path": primary_path,
820
            "backup_path": backup_path,
821
            "enabled": True
822
        }
823
824
        evc = EVC(**attributes)
825
        evc.current_path = evc.backup_path
826
        current_handle_link_down = evc.handle_link_down()
827
        self.assertEqual(deploy_mocked.call_count, 0)
828
        deploy_to_mocked.assert_called_once_with('primary_path',
829
                                                 evc.primary_path)
830
        self.assertTrue(current_handle_link_down)
831
        msg = f"{evc} deployed after link down."
832
        log_mocked.debug.assert_called_once_with(msg)
833
834
    @patch('napps.kytos.mef_eline.models.log')
835
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 748-789 (lines=42) @@
745
        deployed = evc.deploy_to('primary_path', evc.primary_path)
746
        self.assertFalse(deployed)
747
748
    @patch('napps.kytos.mef_eline.models.log')
749
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
750
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
751
    def test_handle_link_down_case_1(self, deploy_to_mocked, deploy_mocked,
752
                                     log_mocked):
753
        """Test if deploy_to backup path is called."""
754
        deploy_mocked.return_value = True
755
        deploy_to_mocked.return_value = True
756
        primary_path = [
757
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
758
                                metadata={"s_vlan": 5},
759
                                status=EntityStatus.DOWN),
760
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
761
                                metadata={"s_vlan": 6},
762
                                status=EntityStatus.UP),
763
        ]
764
        backup_path = [
765
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
766
                                metadata={"s_vlan": 5},
767
                                status=EntityStatus.UP),
768
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
769
                                metadata={"s_vlan": 6},
770
                                status=EntityStatus.UP),
771
        ]
772
        attributes = {
773
            "name": "circuit_name",
774
            "uni_a": get_uni_mocked(is_valid=True),
775
            "uni_z": get_uni_mocked(is_valid=True),
776
            "primary_path": primary_path,
777
            "backup_path": backup_path,
778
            "enabled": True
779
        }
780
        evc = EVC(**attributes)
781
782
        evc.current_path = evc.primary_path
783
        current_handle_link_down = evc.handle_link_down()
784
        self.assertEqual(deploy_mocked.call_count, 0)
785
        deploy_to_mocked.assert_called_once_with('backup_path',
786
                                                 evc.backup_path)
787
        self.assertTrue(current_handle_link_down)
788
        msg = f"{evc} deployed after link down."
789
        log_mocked.debug.assert_called_once_with(msg)
790
791
    @patch('napps.kytos.mef_eline.models.log')
792
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 962-1001 (lines=40) @@
959
        self.assertEqual(deploy_to_mocked.call_count, 0)
960
        self.assertTrue(current_handle_link_up)
961
962
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
963
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
964
    def test_handle_link_up_case_2(self, deploy_to_mocked, deploy_mocked):
965
        """Test if it is changing from backup_path to primary_path."""
966
        deploy_mocked.return_value = True
967
        deploy_to_mocked.return_value = True
968
        primary_path = [
969
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
970
                                metadata={"s_vlan": 5},
971
                                status=EntityStatus.UP),
972
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
973
                                metadata={"s_vlan": 6},
974
                                status=EntityStatus.UP),
975
        ]
976
        backup_path = [
977
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
978
                                metadata={"s_vlan": 5},
979
                                status=EntityStatus.UP),
980
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
981
                                metadata={"s_vlan": 6},
982
                                status=EntityStatus.UP),
983
        ]
984
        attributes = {
985
            "name": "circuit_name",
986
            "uni_a": get_uni_mocked(is_valid=True),
987
            "uni_z": get_uni_mocked(is_valid=True),
988
            "primary_path": primary_path,
989
            "backup_path": backup_path,
990
            "enabled": True,
991
            "dynamic_backup_path": True
992
        }
993
994
        evc = EVC(**attributes)
995
        evc.current_path = evc.backup_path
996
        current_handle_link_up = evc.handle_link_up(primary_path[0])
997
        self.assertEqual(deploy_mocked.call_count, 0)
998
        self.assertEqual(deploy_to_mocked.call_count, 1)
999
        deploy_to_mocked.assert_called_once_with('primary_path',
1000
                                                 evc.primary_path)
1001
        self.assertTrue(current_handle_link_up)
1002
1003
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
1004
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
@@ 923-960 (lines=38) @@
920
        msg = f"{evc} deployed after link down."
921
        log_mocked.debug.assert_called_once_with(msg)
922
923
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
924
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
925
    def test_handle_link_up_case_1(self, deploy_to_mocked, deploy_mocked):
926
        """Test if handle link up do nothing when is using primary path."""
927
        deploy_mocked.return_value = True
928
        deploy_to_mocked.return_value = True
929
        primary_path = [
930
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
931
                                metadata={"s_vlan": 5},
932
                                status=EntityStatus.UP),
933
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
934
                                metadata={"s_vlan": 6},
935
                                status=EntityStatus.UP),
936
        ]
937
        backup_path = [
938
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
939
                                metadata={"s_vlan": 5},
940
                                status=EntityStatus.UP),
941
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
942
                                metadata={"s_vlan": 6},
943
                                status=EntityStatus.UP),
944
        ]
945
        attributes = {
946
            "name": "circuit_name",
947
            "uni_a": get_uni_mocked(is_valid=True),
948
            "uni_z": get_uni_mocked(is_valid=True),
949
            "primary_path": primary_path,
950
            "backup_path": backup_path,
951
            "enabled": True,
952
            "dynamic_backup_path": True
953
        }
954
955
        evc = EVC(**attributes)
956
        evc.current_path = evc.primary_path
957
        current_handle_link_up = evc.handle_link_up(backup_path[0])
958
        self.assertEqual(deploy_mocked.call_count, 0)
959
        self.assertEqual(deploy_to_mocked.call_count, 0)
960
        self.assertTrue(current_handle_link_up)
961
962
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
963
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')

tests/models/test_link_protection.py 5 locations

@@ 208-250 (lines=43) @@
205
        msg = f"{evc} deployed after link down."
206
        log_mocked.debug.assert_called_once_with(msg)
207
208
    @patch('napps.kytos.mef_eline.models.log')
209
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
210
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
211
    def test_handle_link_down_case_3(self, deploy_to_mocked, deploy_mocked,
212
                                     log_mocked):
213
        """Test if circuit without dynamic path is return failed."""
214
        deploy_mocked.return_value = False
215
        deploy_to_mocked.return_value = False
216
        primary_path = [
217
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
218
                                metadata={"s_vlan": 5},
219
                                status=EntityStatus.DOWN),
220
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
221
                                metadata={"s_vlan": 6},
222
                                status=EntityStatus.UP),
223
        ]
224
        backup_path = [
225
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
226
                                metadata={"s_vlan": 5},
227
                                status=EntityStatus.DOWN),
228
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
229
                                metadata={"s_vlan": 6},
230
                                status=EntityStatus.UP),
231
        ]
232
        attributes = {
233
            "name": "circuit_name",
234
            "uni_a": get_uni_mocked(is_valid=True),
235
            "uni_z": get_uni_mocked(is_valid=True),
236
            "primary_path": primary_path,
237
            "backup_path": backup_path,
238
            "enabled": True
239
        }
240
241
        evc = EVC(**attributes)
242
        evc.current_path = evc.backup_path
243
        current_handle_link_down = evc.handle_link_down()
244
        self.assertEqual(deploy_mocked.call_count, 0)
245
        self.assertEqual(deploy_to_mocked.call_count, 1)
246
        deploy_to_mocked.assert_called_once_with('primary_path',
247
                                                 evc.primary_path)
248
        self.assertFalse(current_handle_link_down)
249
        msg = f'Failed to re-deploy {evc} after link down.'
250
        log_mocked.debug.assert_called_once_with(msg)
251
252
    @patch('napps.kytos.mef_eline.models.log')
253
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 165-206 (lines=42) @@
162
        msg = f"{evc} deployed after link down."
163
        log_mocked.debug.assert_called_once_with(msg)
164
165
    @patch('napps.kytos.mef_eline.models.log')
166
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
167
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
168
    def test_handle_link_down_case_2(self, deploy_to_mocked, deploy_mocked,
169
                                     log_mocked):
170
        """Test if deploy_to backup path is called."""
171
        deploy_mocked.return_value = True
172
        deploy_to_mocked.return_value = True
173
        primary_path = [
174
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
175
                                metadata={"s_vlan": 5},
176
                                status=EntityStatus.UP),
177
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
178
                                metadata={"s_vlan": 6},
179
                                status=EntityStatus.UP),
180
        ]
181
        backup_path = [
182
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
183
                                metadata={"s_vlan": 5},
184
                                status=EntityStatus.DOWN),
185
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
186
                                metadata={"s_vlan": 6},
187
                                status=EntityStatus.UP),
188
        ]
189
        attributes = {
190
            "name": "circuit_name",
191
            "uni_a": get_uni_mocked(is_valid=True),
192
            "uni_z": get_uni_mocked(is_valid=True),
193
            "primary_path": primary_path,
194
            "backup_path": backup_path,
195
            "enabled": True
196
        }
197
198
        evc = EVC(**attributes)
199
        evc.current_path = evc.backup_path
200
        current_handle_link_down = evc.handle_link_down()
201
        self.assertEqual(deploy_mocked.call_count, 0)
202
        deploy_to_mocked.assert_called_once_with('primary_path',
203
                                                 evc.primary_path)
204
        self.assertTrue(current_handle_link_down)
205
        msg = f"{evc} deployed after link down."
206
        log_mocked.debug.assert_called_once_with(msg)
207
208
    @patch('napps.kytos.mef_eline.models.log')
209
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 122-163 (lines=42) @@
119
        deployed = evc.deploy_to('primary_path', evc.primary_path)
120
        self.assertFalse(deployed)
121
122
    @patch('napps.kytos.mef_eline.models.log')
123
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
124
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
125
    def test_handle_link_down_case_1(self, deploy_to_mocked, deploy_mocked,
126
                                     log_mocked):
127
        """Test if deploy_to backup path is called."""
128
        deploy_mocked.return_value = True
129
        deploy_to_mocked.return_value = True
130
        primary_path = [
131
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
132
                                metadata={"s_vlan": 5},
133
                                status=EntityStatus.DOWN),
134
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
135
                                metadata={"s_vlan": 6},
136
                                status=EntityStatus.UP),
137
        ]
138
        backup_path = [
139
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
140
                                metadata={"s_vlan": 5},
141
                                status=EntityStatus.UP),
142
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
143
                                metadata={"s_vlan": 6},
144
                                status=EntityStatus.UP),
145
        ]
146
        attributes = {
147
            "name": "circuit_name",
148
            "uni_a": get_uni_mocked(is_valid=True),
149
            "uni_z": get_uni_mocked(is_valid=True),
150
            "primary_path": primary_path,
151
            "backup_path": backup_path,
152
            "enabled": True
153
        }
154
        evc = EVC(**attributes)
155
156
        evc.current_path = evc.primary_path
157
        current_handle_link_down = evc.handle_link_down()
158
        self.assertEqual(deploy_mocked.call_count, 0)
159
        deploy_to_mocked.assert_called_once_with('backup_path',
160
                                                 evc.backup_path)
161
        self.assertTrue(current_handle_link_down)
162
        msg = f"{evc} deployed after link down."
163
        log_mocked.debug.assert_called_once_with(msg)
164
165
    @patch('napps.kytos.mef_eline.models.log')
166
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 336-375 (lines=40) @@
333
        self.assertEqual(deploy_to_mocked.call_count, 0)
334
        self.assertTrue(current_handle_link_up)
335
336
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
337
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
338
    def test_handle_link_up_case_2(self, deploy_to_mocked, deploy_mocked):
339
        """Test if it is changing from backup_path to primary_path."""
340
        deploy_mocked.return_value = True
341
        deploy_to_mocked.return_value = True
342
        primary_path = [
343
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
344
                                metadata={"s_vlan": 5},
345
                                status=EntityStatus.UP),
346
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
347
                                metadata={"s_vlan": 6},
348
                                status=EntityStatus.UP),
349
        ]
350
        backup_path = [
351
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
352
                                metadata={"s_vlan": 5},
353
                                status=EntityStatus.UP),
354
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
355
                                metadata={"s_vlan": 6},
356
                                status=EntityStatus.UP),
357
        ]
358
        attributes = {
359
            "name": "circuit_name",
360
            "uni_a": get_uni_mocked(is_valid=True),
361
            "uni_z": get_uni_mocked(is_valid=True),
362
            "primary_path": primary_path,
363
            "backup_path": backup_path,
364
            "enabled": True,
365
            "dynamic_backup_path": True
366
        }
367
368
        evc = EVC(**attributes)
369
        evc.current_path = evc.backup_path
370
        current_handle_link_up = evc.handle_link_up(primary_path[0])
371
        self.assertEqual(deploy_mocked.call_count, 0)
372
        self.assertEqual(deploy_to_mocked.call_count, 1)
373
        deploy_to_mocked.assert_called_once_with('primary_path',
374
                                                 evc.primary_path)
375
        self.assertTrue(current_handle_link_up)
376
377
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
378
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
@@ 297-334 (lines=38) @@
294
        msg = f"{evc} deployed after link down."
295
        log_mocked.debug.assert_called_once_with(msg)
296
297
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
298
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
299
    def test_handle_link_up_case_1(self, deploy_to_mocked, deploy_mocked):
300
        """Test if handle link up do nothing when is using primary path."""
301
        deploy_mocked.return_value = True
302
        deploy_to_mocked.return_value = True
303
        primary_path = [
304
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
305
                                metadata={"s_vlan": 5},
306
                                status=EntityStatus.UP),
307
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
308
                                metadata={"s_vlan": 6},
309
                                status=EntityStatus.UP),
310
        ]
311
        backup_path = [
312
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
313
                                metadata={"s_vlan": 5},
314
                                status=EntityStatus.UP),
315
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
316
                                metadata={"s_vlan": 6},
317
                                status=EntityStatus.UP),
318
        ]
319
        attributes = {
320
            "name": "circuit_name",
321
            "uni_a": get_uni_mocked(is_valid=True),
322
            "uni_z": get_uni_mocked(is_valid=True),
323
            "primary_path": primary_path,
324
            "backup_path": backup_path,
325
            "enabled": True,
326
            "dynamic_backup_path": True
327
        }
328
329
        evc = EVC(**attributes)
330
        evc.current_path = evc.primary_path
331
        current_handle_link_up = evc.handle_link_up(backup_path[0])
332
        self.assertEqual(deploy_mocked.call_count, 0)
333
        self.assertEqual(deploy_to_mocked.call_count, 0)
334
        self.assertTrue(current_handle_link_up)
335
336
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
337
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')