Passed
Push — master ( 4f36a2...55558a )
by Alexander
02:14
created

TestExecutionAddLink.test_add_link_with_name_longer_than_64_should_fail()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
# pylint: disable=invalid-name, attribute-defined-outside-init, objects-update-used
3
4
import time
5
from xmlrpc.client import Fault as XmlRPCFault
6
from xmlrpc.client import ProtocolError
7
8
from django.forms.models import model_to_dict
9
from django.test import override_settings
10
from django.utils import timezone
11
12
from tcms.core.contrib.linkreference.models import LinkReference
13
from tcms.core.helpers import comments
14
from tcms.rpc.tests.utils import APIPermissionsTestCase, APITestCase
15
from tcms.testruns.models import TestExecutionStatus
16
from tcms.tests.factories import (
17
    BuildFactory,
18
    LinkReferenceFactory,
19
    TestExecutionFactory,
20
    TestRunFactory,
21
    UserFactory,
22
)
23
24
25
class TestExecutionGetComments(APITestCase):
26
    """Test TestExecution.get_comments"""
27
28
    def _fixture_setup(self):
29
        super()._fixture_setup()
30
31
        self.comments = ["Text for first comment", "Text for second comment"]
32
33
        self.execution = TestExecutionFactory()
34
        for comment in self.comments:
35
            comments.add_comment([self.execution], comment, self.api_user)
36
37
    def test_get_comments(self):
38
        execution_comments = self.rpc_client.TestExecution.get_comments(
39
            self.execution.pk
40
        )
41
42
        self.assertEqual(len(self.comments), len(execution_comments))
43
        for comment in execution_comments:
44
            self.assertTrue(comment["comment"] in self.comments)
45
46
    def test_get_comments_non_existing_execution(self):
47
        with self.assertRaisesRegex(
48
            XmlRPCFault, "TestExecution matching query does not exist."
49
        ):
50
            self.rpc_client.TestExecution.get_comments(-1)
51
52
53
class TestExecutionGetCommentsPermissions(APIPermissionsTestCase):
54
    """Test permissions of TestExecution.get_comments"""
55
56
    permission_label = "django_comments.view_comment"
57
58
    def _fixture_setup(self):
59
        super()._fixture_setup()
60
61
        self.execution = TestExecutionFactory()
62
        self.comments = ["Text for first comment", "Text for second comment"]
63
64
        for comment in self.comments:
65
            comments.add_comment([self.execution], comment, self.tester)
66
67
    def verify_api_with_permission(self):
68
        execution_comments = self.rpc_client.TestExecution.get_comments(
69
            self.execution.pk
70
        )
71
72
        self.assertEqual(len(self.comments), len(execution_comments))
73
        for comment in execution_comments:
74
            self.assertTrue(comment["comment"] in self.comments)
75
76
    def verify_api_without_permission(self):
77
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
78
            self.rpc_client.TestExecution.get_comments(self.execution.pk)
79
80
81
class TestExecutionAddComment(APITestCase):
82
    """Test TestExecution.add_comment"""
83
84
    def _fixture_setup(self):
85
        super()._fixture_setup()
86
87
        self.execution_1 = TestExecutionFactory()
88
        self.execution_2 = TestExecutionFactory()
89
90
    def test_add_comment_with_pk_as_int(self):
91
        created_comment = self.rpc_client.TestExecution.add_comment(
92
            self.execution_2.pk, "Hello World!"
93
        )
94
        execution_comments = comments.get_comments(self.execution_2)
95
        self.assertEqual(1, execution_comments.count())
96
97
        first_comment = execution_comments.first()
98
        self.assertEqual("Hello World!", first_comment.comment)
99
        self.assertEqual(created_comment["comment"], first_comment.comment)
100
101
102
class TestExecutionRemoveComment(APITestCase):
103
    """Test TestExecution.remove_comment"""
104
105
    def _fixture_setup(self):
106
        super()._fixture_setup()
107
108
        self.user = UserFactory()
109
        self.execution = TestExecutionFactory()
110
111
    def test_delete_all_comments(self):
112
        comments.add_comment([self.execution], "Hello World!", self.user)
113
        comments.add_comment([self.execution], "More comments", self.user)
114
        self.rpc_client.TestExecution.remove_comment(self.execution.pk)
115
116
        result = comments.get_comments(self.execution)
117
        self.assertEqual(result.count(), 0)
118
119
    def test_delete_one_comment(self):
120
        comments.add_comment([self.execution], "Hello World!", self.user)
121
        comment_2 = comments.add_comment([self.execution], "More comments", self.user)
122
        comment_2 = model_to_dict(comment_2[0])
123
124
        self.rpc_client.TestExecution.remove_comment(self.execution.pk, comment_2["id"])
125
        result = comments.get_comments(self.execution)
126
        first_comment = result.first()
127
128
        self.assertEqual(result.count(), 1)
129
        self.assertEqual("Hello World!", first_comment.comment)
130
131
132
class TestExecutionRemoveCommentPermissions(APIPermissionsTestCase):
133
    """Test TestExecution.remove_comment permissions"""
134
135
    permission_label = "django_comments.delete_comment"
136
137
    def _fixture_setup(self):
138
        super()._fixture_setup()
139
140
        self.user = UserFactory()
141
        self.execution = TestExecutionFactory()
142
143
    def verify_api_with_permission(self):
144
        comments.add_comment([self.execution], "Hello World!", self.user)
145
        self.rpc_client.TestExecution.remove_comment(self.execution.pk)
146
147
        result = comments.get_comments(self.execution)
148
        self.assertEqual(result.count(), 0)
149
150
    def verify_api_without_permission(self):
151
        comments.add_comment([self.execution], "Hello World!", self.user)
152
153
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
154
            self.rpc_client.TestExecution.remove_comment(self.execution.pk)
155
156
157
@override_settings(LANGUAGE_CODE="en")
158
class TestExecutionAddLink(APITestCase):
159
    """Test TestExecution.add_link"""
160
161
    def _fixture_setup(self):
162
        super()._fixture_setup()
163
164
        self.execution = TestExecutionFactory()
165
166
    def test_attach_log_with_non_existing_id(self):
167
        with self.assertRaisesRegex(
168
            XmlRPCFault, ".*execution.*Select a valid choice.*"
169
        ):
170
            self.rpc_client.TestExecution.add_link(
171
                {"execution_id": -5, "name": "A test log", "url": "http://example.com"}
172
            )
173
174
    def test_attach_log(self):
175
        url = "http://127.0.0.1/test/test-log.log"
176
        result = self.rpc_client.TestExecution.add_link(
177
            {"execution_id": self.execution.pk, "name": "UT test logs", "url": url}
178
        )
179
        self.assertGreater(result["id"], 0)
180
        self.assertEqual(result["url"], url)
181
182
    def test_add_link_with_name_longer_than_64_should_fail(self):
183
        with self.assertRaisesRegex(XmlRPCFault, "has at most 64 characters"):
184
            name = "a" * 65
185
            url = "http://127.0.0.1/test/test-log.log"
186
            self.rpc_client.TestExecution.add_link(
187
                {"execution_id": self.execution.pk, "name": name, "url": url}
188
            )
189
190
191
class TestExecutionAddLinkPermissions(APIPermissionsTestCase):
192
    """Test permissions of TestExecution.add_link"""
193
194
    permission_label = "linkreference.add_linkreference"
195
196
    def _fixture_setup(self):
197
        super()._fixture_setup()
198
199
        self.execution = TestExecutionFactory()
200
201
    def verify_api_with_permission(self):
202
        links = self.execution.links()
203
        self.assertFalse(links.exists())
204
205
        url = "http://example.com"
206
        result = self.rpc_client.TestExecution.add_link(
207
            {"execution_id": self.execution.pk, "url": url}
208
        )
209
210
        links = self.execution.links()
211
        self.assertEqual(self.execution.pk, result["execution"])
212
        self.assertEqual(url, result["url"])
213
        self.assertEqual(1, links.count())
214
        self.assertEqual(url, links.first().url)
215
216
    def verify_api_without_permission(self):
217
        url = "http://127.0.0.1/test/test-log.log"
218
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
219
            self.rpc_client.TestExecution.add_link(
220
                {"execution_id": self.execution.pk, "name": "UT test logs", "url": url}
221
            )
222
223
224
class TestExecutionRemoveLink(APITestCase):
225
    def _fixture_setup(self):
226
        super()._fixture_setup()
227
228
        self.status_idle = TestExecutionStatus.objects.filter(weight=0).first()
229
        self.tester = UserFactory()
230
        self.execution = TestExecutionFactory(
231
            assignee=self.tester, tested_by=None, sortkey=10, status=self.status_idle
232
        )
233
234
    def setUp(self):
235
        super().setUp()
236
237
        self.rpc_client.TestExecution.add_link(
238
            {
239
                "execution_id": self.execution.pk,
240
                "name": "Related issue",
241
                "url": "https://localhost/issue/1",
242
            }
243
        )
244
        self.link = self.execution.links()[0]
245
246
    def test_doesnt_raise_with_non_existing_id(self):
247
        self.rpc_client.TestExecution.remove_link({"execution_id": -9})
248
        links = self.execution.links()
249
        self.assertEqual(1, links.count())
250
        self.assertEqual(self.link.pk, links[0].pk)
251
252
    def test_detach_log_with_non_exist_log(self):
253
        self.rpc_client.TestExecution.remove_link({"pk": 999999999})
254
        links = self.execution.links()
255
        self.assertEqual(1, links.count())
256
        self.assertEqual(self.link.pk, links[0].pk)
257
258
    def test_detach_log(self):
259
        self.rpc_client.TestExecution.remove_link(
260
            {"execution_id": self.execution.pk, "pk": self.link.pk}
261
        )
262
        self.assertEqual([], list(self.execution.links()))
263
264
265
class TestExecutionRemoveLinkPermissions(APIPermissionsTestCase):
266
    """Test permissions of TestExecution.remove_link"""
267
268
    permission_label = "linkreference.delete_linkreference"
269
270
    def _fixture_setup(self):
271
        super()._fixture_setup()
272
273
        self.execution = TestExecutionFactory()
274
        self.link = LinkReferenceFactory(execution=self.execution)
275
        self.another_link = LinkReferenceFactory(execution=self.execution)
276
277
    def verify_api_with_permission(self):
278
        links = self.execution.links()
279
        self.assertEqual(2, links.count())
280
        self.assertIn(self.link, links)
281
        self.assertIn(self.another_link, links)
282
283
        self.rpc_client.TestExecution.remove_link({"pk": self.link.pk})
284
285
        links = self.execution.links()
286
        self.assertEqual(1, links.count())
287
        self.assertIn(self.another_link, links)
288
        self.assertNotIn(self.link, links)
289
290
    def verify_api_without_permission(self):
291
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
292
            self.rpc_client.TestExecution.remove_link({"pk": self.another_link.pk})
293
294
295
class TestExecutionFilter(APITestCase):
296
    def _fixture_setup(self):
297
        super()._fixture_setup()
298
299
        self.status_idle = TestExecutionStatus.objects.filter(weight=0).first()
300
        self.tester = UserFactory()
301
        self.execution = TestExecutionFactory(
302
            assignee=self.tester, tested_by=None, sortkey=10, status=self.status_idle
303
        )
304
305
    def test_with_non_exist_id(self):
306
        found = self.rpc_client.TestExecution.filter({"pk": -1})
307
        self.assertEqual(0, len(found))
308
309
    def test_filter_by_id(self):
310
        execution = self.rpc_client.TestExecution.filter({"pk": self.execution.pk})[0]
311
312
        self.assertIsNotNone(execution)
313
        self.assertEqual(execution["id"], self.execution.pk)
314
        self.assertEqual(execution["assignee"], self.tester.pk)
315
        self.assertEqual(execution["tested_by"], None)
316
        self.assertEqual(
317
            execution["case_text_version"], self.execution.case_text_version
318
        )
319
        self.assertEqual(execution["start_date"], self.execution.start_date)
320
        self.assertEqual(execution["stop_date"], self.execution.stop_date)
321
        self.assertEqual(execution["sortkey"], self.execution.sortkey)
322
        self.assertEqual(execution["run"], self.execution.run.pk)
323
        self.assertEqual(execution["case"], self.execution.case.pk)
324
        self.assertEqual(execution["build"], self.execution.build.pk)
325
        self.assertEqual(execution["status"], self.status_idle.pk)
326
327
328
class TestExecutionGetLinks(APITestCase):
329
    def _fixture_setup(self):
330
        super()._fixture_setup()
331
332
        self.execution_1 = TestExecutionFactory()
333
        self.execution_2 = TestExecutionFactory()
334
335
        self.rpc_client.TestExecution.add_link(
336
            {
337
                "execution_id": self.execution_1.pk,
338
                "name": "Test logs",
339
                "url": "http://kiwitcms.org",
340
            }
341
        )
342
343
    def test_get_links_with_non_exist_id(self):
344
        result = self.rpc_client.TestExecution.get_links({"execution": -9})
345
        self.assertEqual([], result)
346
347
    def test_get_empty_logs(self):
348
        logs = self.rpc_client.TestExecution.get_links(
349
            {"execution": self.execution_2.pk}
350
        )
351
        self.assertIsInstance(logs, list)
352
        self.assertEqual(len(logs), 0)
353
354
    def test_get_links(self):
355
        execution_log = LinkReference.objects.get(execution=self.execution_1.pk)
356
        logs = self.rpc_client.TestExecution.get_links(
357
            {"execution": self.execution_1.pk}
358
        )
359
        self.assertIsInstance(logs, list)
360
        self.assertEqual(len(logs), 1)
361
362
        self.assertEqual(logs[0]["id"], execution_log.pk)
363
        self.assertEqual(logs[0]["name"], "Test logs")
364
        self.assertEqual(logs[0]["url"], "http://kiwitcms.org")
365
        self.assertEqual(logs[0]["execution"], self.execution_1.pk)
366
        self.assertIn("created_on", logs[0])
367
        self.assertFalse(logs[0]["is_defect"])
368
369
370
class TestExecutionHistory(APITestCase):
371
    def _fixture_setup(self):
372
        super()._fixture_setup()
373
374
        self.execution = TestExecutionFactory()
375
376
    def test_history_for_non_existing_execution(self):
377
        with self.assertRaisesRegex(
378
            XmlRPCFault, "TestExecution matching query does not exist."
379
        ):
380
            self.rpc_client.TestExecution.history(-5)
381
382
    def test_history_new_execution(self):
383
        execution = TestExecutionFactory()
384
385
        history = self.rpc_client.TestExecution.history(execution.pk)
386
387
        self.assertEqual(1, len(history))
388
389
    def test_history(self):
390
        """
391
        Test that for an execution that has been updated 3 times,
392
        there are 4 history entries (first one is the creation of the object).
393
394
        Note: the `time.sleep` call after every update is necessary,
395
        because otherwise the changes happen too fast,
396
        and the XML-RPC protocol follows ISO 8601 which doesn't have sub-seconds precision.
397
        Hence the measurable delta is 1 second.
398
        """
399
        time.sleep(1)
400
401
        self.execution.build = BuildFactory()
402
        self.execution.save()
403
        time.sleep(1)
404
405
        user = UserFactory()
406
        self.execution.assignee = user
407
        self.execution.save()
408
        time.sleep(1)
409
410
        self.execution.tested_by = user
411
        self.execution.save()
412
        time.sleep(1)
413
414
        history = self.rpc_client.TestExecution.history(self.execution.pk)
415
        self.assertEqual(4, len(history))
416
417
        # assert entries are in the right order
418
        previous = timezone.now()
419
        for history_entry in history:
420
            self.assertTrue(history_entry["history_date"] < previous)
421
            previous = history_entry["history_date"]
422
423
424
class TestExecutionHistoryPermissions(APIPermissionsTestCase):
425
    """Test permissions of TestExecution.history"""
426
427
    permission_label = "testruns.view_testexecution"
428
429
    def _fixture_setup(self):
430
        super()._fixture_setup()
431
432
        self.execution = TestExecutionFactory()
433
434
    def verify_api_with_permission(self):
435
        history = self.rpc_client.TestExecution.history(self.execution.pk)
436
        self.assertEqual(1, len(history))
437
438
    def verify_api_without_permission(self):
439
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
440
            self.rpc_client.TestExecution.history(self.execution.pk)
441
442
443
@override_settings(LANGUAGE_CODE="en")
444
class TestExecutionUpdate(APITestCase):
445
    def _fixture_setup(self):
446
        super()._fixture_setup()
447
448
        self.user = UserFactory()
449
        self.build = BuildFactory()
450
        self.execution_1 = TestExecutionFactory()
451
        self.execution_2 = TestExecutionFactory()
452
        self.status_positive = TestExecutionStatus.objects.filter(weight__gt=0).last()
453
454
    def test_update_with_single_test_execution(self):
455
        execution = TestExecutionFactory(tested_by=None)
456
457
        result = self.rpc_client.TestExecution.update(
458
            execution.pk,
459
            {
460
                "build": self.build.pk,
461
                "assignee": self.user.pk,
462
                "sortkey": 90,
463
                "start_date": "2021-02-25",
464
                "stop_date": "2021-02-28 12:12:12",
465
            },
466
        )
467
468
        execution.refresh_from_db()
469
470
        self.assertEqual(result["id"], execution.pk)
471
        self.assertEqual(result["assignee"], self.user.pk)
472
        self.assertEqual(result["tested_by"], None)
473
        self.assertIn("case_text_version", result)
474
        self.assertEqual(result["start_date"], execution.start_date)
475
        self.assertEqual(result["stop_date"], execution.stop_date)
476
        self.assertEqual(result["sortkey"], 90)
477
        self.assertIn("run", result)
478
        self.assertIn("case", result)
479
        self.assertEqual(result["build"], self.build.pk)
480
        self.assertIn("status", result)
481
482
    def test_update_with_assignee_id(self):
483
        self.assertNotEqual(self.execution_1.assignee, self.user)
484
        execution = self.rpc_client.TestExecution.update(
485
            self.execution_1.pk, {"assignee": self.user.pk}
486
        )
487
        self.execution_1.refresh_from_db()
488
489
        self.assertEqual(execution["assignee"], self.user.pk)
490
        self.assertEqual(self.execution_1.assignee, self.user)
491
492
    def test_update_with_assignee_email(self):
493
        self.assertNotEqual(self.execution_1.assignee, self.user)
494
        execution = self.rpc_client.TestExecution.update(
495
            self.execution_1.pk, {"assignee": self.user.email}
496
        )
497
        self.execution_1.refresh_from_db()
498
499
        self.assertEqual(execution["assignee"], self.user.pk)
500
        self.assertEqual(self.execution_1.assignee, self.user)
501
502
    def test_update_with_assignee_username(self):
503
        self.assertNotEqual(self.execution_1.assignee, self.user)
504
        execution = self.rpc_client.TestExecution.update(
505
            self.execution_1.pk, {"assignee": self.user.username}
506
        )
507
        self.execution_1.refresh_from_db()
508
509
        self.assertEqual(execution["assignee"], self.user.pk)
510
        self.assertEqual(self.execution_1.assignee, self.user)
511
512
    def test_update_with_tested_by_id(self):
513
        self.assertNotEqual(self.execution_2.tested_by, self.user)
514
        execution = self.rpc_client.TestExecution.update(
515
            self.execution_2.pk, {"tested_by": self.user.pk}
516
        )
517
        self.execution_2.refresh_from_db()
518
519
        self.assertEqual(execution["tested_by"], self.user.pk)
520
        self.assertEqual(self.execution_2.tested_by, self.user)
521
522
    def test_update_with_tested_by_email(self):
523
        self.assertNotEqual(self.execution_2.tested_by, self.user)
524
        execution = self.rpc_client.TestExecution.update(
525
            self.execution_2.pk, {"tested_by": self.user.email}
526
        )
527
        self.execution_2.refresh_from_db()
528
529
        self.assertEqual(execution["tested_by"], self.user.pk)
530
        self.assertEqual(self.execution_2.tested_by, self.user)
531
532
    def test_update_with_tested_by_username(self):
533
        self.assertNotEqual(self.execution_2.tested_by, self.user)
534
        execution = self.rpc_client.TestExecution.update(
535
            self.execution_2.pk, {"tested_by": self.user.username}
536
        )
537
        self.execution_2.refresh_from_db()
538
539
        self.assertEqual(execution["tested_by"], self.user.pk)
540
        self.assertEqual(self.execution_2.tested_by, self.user)
541
542
    def test_update_with_non_existing_build(self):
543
        with self.assertRaisesRegex(XmlRPCFault, "Select a valid choice"):
544
            self.rpc_client.TestExecution.update(
545
                self.execution_1.pk, {"build": 1111111}
546
            )
547
548
    def test_update_with_non_existing_assignee_id(self):
549
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user_id"):
550
            self.rpc_client.TestExecution.update(
551
                self.execution_1.pk, {"assignee": 1111111}
552
            )
553
554
    def test_update_with_non_existing_assignee_email(self):
555
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
556
            self.rpc_client.TestExecution.update(
557
                self.execution_1.pk, {"assignee": "[email protected]"}
558
            )
559
560
    def test_update_with_non_existing_assignee_username(self):
561
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
562
            self.rpc_client.TestExecution.update(
563
                self.execution_1.pk, {"assignee": "nonExistentUsername"}
564
            )
565
566
    def test_update_with_non_existing_tested_by_id(self):
567
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user_id"):
568
            self.rpc_client.TestExecution.update(
569
                self.execution_2.pk, {"tested_by": 1111111}
570
            )
571
572
    def test_update_with_non_existing_tested_by_email(self):
573
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
574
            self.rpc_client.TestExecution.update(
575
                self.execution_2.pk, {"tested_by": "[email protected]"}
576
            )
577
578
    def test_update_with_non_existing_tested_by_username(self):
579
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user:"):
580
            self.rpc_client.TestExecution.update(
581
                self.execution_2.pk, {"tested_by": "nonExistentUsername"}
582
            )
583
584
    def test_update_when_case_text_version_is_integer(self):
585
        initial_case_text_version = self.execution_1.case_text_version
586
        self.update_test_case_text()
587
588
        execution = self.rpc_client.TestExecution.update(
589
            self.execution_1.pk,
590
            {
591
                "case_text_version": str(
592
                    self.execution_1.case.history.latest().history_id
593
                )
594
            },
595
        )
596
        self.execution_1.refresh_from_db()
597
598
        latest_case_text_version = self.execution_1.case_text_version
599
        self.assertNotEqual(initial_case_text_version, latest_case_text_version)
600
        self.assertEqual(execution["case_text_version"], latest_case_text_version)
601
        self.assertEqual(
602
            self.execution_1.case.history.latest().history_id, latest_case_text_version
603
        )
604
605
    def test_update_when_case_text_version_is_string_latest(self):
606
        initial_case_text_version = self.execution_1.case_text_version
607
        self.update_test_case_text()
608
609
        execution = self.rpc_client.TestExecution.update(
610
            self.execution_1.pk, {"case_text_version": "latest"}
611
        )
612
        self.execution_1.refresh_from_db()
613
614
        latest_case_text_version = self.execution_1.case_text_version
615
        self.assertNotEqual(initial_case_text_version, latest_case_text_version)
616
        self.assertEqual(execution["case_text_version"], latest_case_text_version)
617
        self.assertEqual(
618
            self.execution_1.case.history.latest().history_id, latest_case_text_version
619
        )
620
621
    def update_test_case_text(self):
622
        self.execution_1.case.summary = "Summary Updated"
623
        self.execution_1.case.text = "Text Updated"
624
        self.execution_1.case.save()
625
626
    def test_update_with_no_perm(self):
627
        self.rpc_client.Auth.logout()
628
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
629
            self.rpc_client.TestExecution.update(
630
                self.execution_1.pk, {"stop_date": timezone.now()}
631
            )
632
633
634
class TestExecutionUpdateStatus(APITestCase):
635
    def _fixture_setup(self):
636
        super()._fixture_setup()
637
638
        self.user = UserFactory()
639
        self.execution_1 = TestExecutionFactory()
640
        self.status_positive = TestExecutionStatus.objects.filter(weight__gt=0).last()
641
        self.status_negative = TestExecutionStatus.objects.filter(weight__lt=0).last()
642
        self.status_in_progress = TestExecutionStatus.objects.filter(weight=0).last()
643
644
    def test_changes_tested_by(self):
645
        execution = TestExecutionFactory(tested_by=None)
646
647
        self.rpc_client.TestExecution.update(
648
            execution.pk, {"status": self.status_positive.pk}
649
        )
650
        execution.refresh_from_db()
651
652
        self.assertEqual(execution.tested_by, self.api_user)
653
        self.assertEqual(execution.status, self.status_positive)
654
655
    def test_when_tested_by_specified_does_not_change_tested_by(self):
656
        execution = TestExecutionFactory(tested_by=None)
657
658
        self.rpc_client.TestExecution.update(
659
            execution.pk,
660
            {
661
                "status": self.status_positive.pk,
662
                "tested_by": self.user.pk,
663
            },
664
        )
665
        execution.refresh_from_db()
666
667
        self.assertEqual(execution.tested_by, self.user)
668
        self.assertEqual(execution.status, self.status_positive)
669
670
    def test_changes_build(self):
671
        # simulate what happens in reality where TestExeuctions are created
672
        # taking their initial .build values from the parent TestRun
673
        self.execution_1.build = self.execution_1.run.build
674
        self.execution_1.save()
675
676
        # now simulate a re-test scenario where TR.build has already changed
677
        # e.g. longer test cycle covering multiple builds
678
        self.execution_1.run.build = BuildFactory(name="b02")
679
        self.execution_1.run.save()
680
681
        self.rpc_client.TestExecution.update(
682
            self.execution_1.pk, {"status": self.status_positive.pk}
683
        )
684
685
        self.execution_1.refresh_from_db()
686
        self.assertEqual(self.execution_1.status, self.status_positive)
687
        self.assertEqual(self.execution_1.build.name, "b02")
688
689
    def test_when_build_specified_does_not_change_build(self):
690
        # simulate what happens in reality where TestExeuctions are created
691
        # taking their initial .build values from the parent TestRun
692
        self.execution_1.build = self.execution_1.run.build
693
        self.execution_1.save()
694
695
        build03 = BuildFactory(name="b03")
696
697
        self.rpc_client.TestExecution.update(
698
            self.execution_1.pk,
699
            {
700
                "status": self.status_positive.pk,
701
                "build": build03.pk,
702
            },
703
        )
704
705
        self.execution_1.refresh_from_db()
706
        self.assertEqual(self.execution_1.status, self.status_positive)
707
        self.assertEqual(self.execution_1.build.name, "b03")
708
        # these are different b/c the API call (e.g. from a plugin) has
709
        # passed an explicit build value
710
        self.assertNotEqual(self.execution_1.run.build, build03)
711
712
    def test_non_zero_status_changes_stop_date(self):
713
        """
714
        Non-zero weight statuses will set stop_date
715
        """
716
        few_secs_ago = timezone.now()
717
        self.execution_1.stop_date = None
718
        self.execution_1.save()
719
720
        self.rpc_client.TestExecution.update(
721
            self.execution_1.pk,
722
            {
723
                "status": self.status_positive.pk,
724
            },
725
        )
726
727
        self.execution_1.refresh_from_db()
728
        self.assertGreater(self.execution_1.stop_date, few_secs_ago)
729
730
    def test_zero_status_changes_stop_date(self):
731
        """
732
        Zero weight statuses will set stop_date to None,
733
        e.g. re-test the TE!
734
        """
735
        self.execution_1.stop_date = timezone.now()
736
        self.execution_1.save()
737
738
        self.rpc_client.TestExecution.update(
739
            self.execution_1.pk,
740
            {
741
                "status": TestExecutionStatus.objects.filter(weight=0).first().pk,
742
            },
743
        )
744
745
        self.execution_1.refresh_from_db()
746
        self.assertIsNone(self.execution_1.stop_date)
747
748
    def test_update_status_changes_run_status_completed(self):
749
        """
750
        When updating the status of a TestExecution to a completed one (status.weigth != 0),
751
        if all executions for a run are completed, set the run as completed
752
        """
753
        run = TestRunFactory(stop_date=None)
754
755
        TestExecutionFactory(status=self.status_positive, run=run)
756
        TestExecutionFactory(status=self.status_negative, run=run)
757
        execution = TestExecutionFactory(status=self.status_in_progress, run=run)
758
759
        self.rpc_client.TestExecution.update(
760
            execution.pk, {"status": self.status_positive.pk}
761
        )
762
763
        run.refresh_from_db()
764
        self.assertIsNotNone(run.stop_date)
765
766
    def test_update_status_changes_run_neutral(self):
767
        """
768
        When updating the status of a TestExecution to a not completed one (status.weigth == 0),
769
        set the run as not completed
770
        """
771
        run = TestRunFactory(stop_date=timezone.now())
772
773
        execution = TestExecutionFactory(status=self.status_positive, run=run)
774
        _ = TestExecutionFactory(status=self.status_negative, run=run)
775
776
        self.rpc_client.TestExecution.update(
777
            execution.pk, {"status": self.status_in_progress.pk}
778
        )
779
780
        run.refresh_from_db()
781
        self.assertIsNone(run.stop_date)
782