Code Duplication    Length = 54-54 lines in 2 locations

django-data/image/validation/tests/test_tasks.py 1 location

@@ 218-271 (lines=54) @@
215
             'notification_message': 'Errors in EBI API endpoints. Please try '
216
                                     'again later'}, 1)
217
218
    @patch('validation.tasks.send_message_to_websocket')
219
    @patch('asyncio.get_event_loop')
220
    @patch("validation.helpers.validation.read_in_ruleset",
221
           side_effect=OntologyCacheError("test exception"))
222
    @patch("validation.tasks.ValidateTask.retry")
223
    def test_issues_with_ontologychache(
224
            self, my_retry, my_validate, asyncio_mock,
225
            send_message_to_websocket_mock):
226
        """Test errors with validation API when loading OntologyCache
227
        objects"""
228
229
        # mocking asyncio return value
230
        tmp = asyncio_mock.return_value
231
        tmp.run_until_complete = Mock()
232
233
        # call task. No retries with issues at EBI
234
        res = self.my_task.run(submission_id=self.submission_id)
235
236
        # assert a success with validation taks
237
        self.assertEqual(res, "success")
238
239
        # check submission status and message
240
        self.submission.refresh_from_db()
241
242
        # this is the message I want
243
        message = "Errors in EBI API endpoints. Please try again later"
244
245
        # check submission.status changed to NEED_REVISION
246
        self.assertEqual(self.submission.status, LOADED)
247
        self.assertIn(
248
            message,
249
            self.submission.message)
250
251
        # test email sent
252
        self.assertEqual(len(mail.outbox), 1)
253
254
        # read email
255
        email = mail.outbox[0]
256
257
        self.assertEqual(
258
            "Error in IMAGE Validation: %s" % message,
259
            email.subject)
260
261
        self.assertTrue(my_validate.called)
262
        self.assertFalse(my_retry.called)
263
264
        self.assertEqual(asyncio_mock.call_count, 1)
265
        self.assertEqual(tmp.run_until_complete.call_count, 1)
266
        self.assertEqual(send_message_to_websocket_mock.call_count, 1)
267
        send_message_to_websocket_mock.assert_called_with(
268
            {'message': 'Loaded',
269
             'notification_message': ('Errors in EBI API endpoints. Please '
270
                                      'try again later')},
271
            1
272
        )
273
274
    @patch('validation.tasks.send_message_to_websocket')

django-data/image/biosample/tests/test_tasks.py 1 location

@@ 137-190 (lines=54) @@
134
        self.assertTrue(my_submit.called)
135
        self.assertTrue(my_retry.called)
136
137
    @patch('biosample.tasks.send_message_to_websocket')
138
    @patch('asyncio.get_event_loop')
139
    @patch("biosample.tasks.SubmitTask.retry")
140
    @patch("biosample.tasks.SubmitTask.submit_biosample")
141
    def test_issues_with_api(self, my_submit, my_retry, asyncio_mock,
142
                             send_message_to_websocket_mock):
143
        """Test errors with submission API"""
144
        tmp = asyncio_mock.return_value
145
        tmp.run_until_complete = Mock()
146
147
        # Set a side effect on the patched methods
148
        # so that they raise the errors we want.
149
        my_retry.side_effect = Retry()
150
        my_submit.side_effect = ConnectionError()
151
152
        # call task. No retries with issues at EBI
153
        res = self.my_task.run(submission_id=1)
154
155
        # assert a success with validation taks
156
        self.assertEqual(res, "success")
157
158
        # check submission status and message
159
        self.submission_obj.refresh_from_db()
160
161
        # this is the message I want
162
        message = "Errors in EBI API endpoints. Please try again later"
163
164
        # check submission.status NOT changed
165
        self.assertEqual(self.submission_obj.status, READY)
166
        self.assertIn(
167
            message,
168
            self.submission_obj.message)
169
170
        # test email sent
171
        self.assertEqual(len(mail.outbox), 1)
172
173
        # read email
174
        email = mail.outbox[0]
175
176
        self.assertEqual(
177
            "Error in biosample submission 1",
178
            email.subject)
179
180
        self.assertTrue(my_submit.called)
181
        self.assertFalse(my_retry.called)
182
183
        self.assertEqual(asyncio_mock.call_count, 1)
184
        self.assertEqual(tmp.run_until_complete.call_count, 1)
185
        self.assertEqual(send_message_to_websocket_mock.call_count, 1)
186
        send_message_to_websocket_mock.assert_called_with(
187
            {
188
                'message': 'Ready',
189
                'notification_message': 'Errors in EBI API endpoints. '
190
                                        'Please try again later'}, 1)
191
192
    @patch('biosample.tasks.send_message_to_websocket')
193
    @patch('asyncio.get_event_loop')