Passed
Push — master ( afd6fa...9657ba )
by Alexander
02:39
created

TestExecutionUpdate.test_update_with_single_test_execution()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 27
rs 9.352
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
# pylint: disable=invalid-name, attribute-defined-outside-init, objects-update-used
3
4
import time
5
from xmlrpc.client import Fault as XmlRPCFault
6
from xmlrpc.client import ProtocolError
7
8
from django.forms.models import model_to_dict
9
from django.test import override_settings
10
from django.utils import timezone
11
12
from tcms.core.contrib.linkreference.models import LinkReference
13
from tcms.core.helpers import comments
14
from tcms.rpc.tests.utils import APIPermissionsTestCase, APITestCase
15
from tcms.testruns.models import TestExecutionStatus
16
from tcms.tests.factories import (
17
    BuildFactory,
18
    LinkReferenceFactory,
19
    TestExecutionFactory,
20
    TestRunFactory,
21
    UserFactory,
22
)
23
24
25
class TestExecutionGetComments(APITestCase):
26
    """ Test TestExecution.get_comments """
27
28
    def _fixture_setup(self):
29
        super()._fixture_setup()
30
31
        self.comments = ["Text for first comment", "Text for second comment"]
32
33
        self.execution = TestExecutionFactory()
34
        for comment in self.comments:
35
            comments.add_comment([self.execution], comment, self.api_user)
36
37
    def test_get_comments(self):
38
        execution_comments = self.rpc_client.TestExecution.get_comments(
39
            self.execution.pk
40
        )
41
42
        self.assertEqual(len(self.comments), len(execution_comments))
43
        for comment in execution_comments:
44
            self.assertTrue(comment["comment"] in self.comments)
45
46
    def test_get_comments_non_existing_execution(self):
47
        with self.assertRaisesRegex(
48
            XmlRPCFault, "TestExecution matching query does not exist."
49
        ):
50
            self.rpc_client.TestExecution.get_comments(-1)
51
52
53
class TestExecutionGetCommentsPermissions(APIPermissionsTestCase):
54
    """Test permissions of TestExecution.get_comments"""
55
56
    permission_label = "django_comments.view_comment"
57
58
    def _fixture_setup(self):
59
        super()._fixture_setup()
60
61
        self.execution = TestExecutionFactory()
62
        self.comments = ["Text for first comment", "Text for second comment"]
63
64
        for comment in self.comments:
65
            comments.add_comment([self.execution], comment, self.tester)
66
67
    def verify_api_with_permission(self):
68
        execution_comments = self.rpc_client.TestExecution.get_comments(
69
            self.execution.pk
70
        )
71
72
        self.assertEqual(len(self.comments), len(execution_comments))
73
        for comment in execution_comments:
74
            self.assertTrue(comment["comment"] in self.comments)
75
76
    def verify_api_without_permission(self):
77
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
78
            self.rpc_client.TestExecution.get_comments(self.execution.pk)
79
80
81
class TestExecutionAddComment(APITestCase):
82
    """Test TestExecution.add_comment"""
83
84
    def _fixture_setup(self):
85
        super()._fixture_setup()
86
87
        self.execution_1 = TestExecutionFactory()
88
        self.execution_2 = TestExecutionFactory()
89
90
    def test_add_comment_with_pk_as_int(self):
91
        created_comment = self.rpc_client.TestExecution.add_comment(
92
            self.execution_2.pk, "Hello World!"
93
        )
94
        execution_comments = comments.get_comments(self.execution_2)
95
        self.assertEqual(1, execution_comments.count())
96
97
        first_comment = execution_comments.first()
98
        self.assertEqual("Hello World!", first_comment.comment)
99
        self.assertEqual(created_comment["comment"], first_comment.comment)
100
101
102
class TestExecutionRemoveComment(APITestCase):
103
    """Test TestExecution.remove_comment"""
104
105
    def _fixture_setup(self):
106
        super()._fixture_setup()
107
108
        self.user = UserFactory()
109
        self.execution = TestExecutionFactory()
110
111
    def test_delete_all_comments(self):
112
        comments.add_comment([self.execution], "Hello World!", self.user)
113
        comments.add_comment([self.execution], "More comments", self.user)
114
        self.rpc_client.TestExecution.remove_comment(self.execution.pk)
115
116
        result = comments.get_comments(self.execution)
117
        self.assertEqual(result.count(), 0)
118
119
    def test_delete_one_comment(self):
120
        comments.add_comment([self.execution], "Hello World!", self.user)
121
        comment_2 = comments.add_comment([self.execution], "More comments", self.user)
122
        comment_2 = model_to_dict(comment_2[0])
123
124
        self.rpc_client.TestExecution.remove_comment(self.execution.pk, comment_2["id"])
125
        result = comments.get_comments(self.execution)
126
        first_comment = result.first()
127
128
        self.assertEqual(result.count(), 1)
129
        self.assertEqual("Hello World!", first_comment.comment)
130
131
132
class TestExecutionRemoveCommentPermissions(APIPermissionsTestCase):
133
    """Test TestExecution.remove_comment permissions"""
134
135
    permission_label = "django_comments.delete_comment"
136
137
    def _fixture_setup(self):
138
        super()._fixture_setup()
139
140
        self.user = UserFactory()
141
        self.execution = TestExecutionFactory()
142
143
    def verify_api_with_permission(self):
144
        comments.add_comment([self.execution], "Hello World!", self.user)
145
        self.rpc_client.TestExecution.remove_comment(self.execution.pk)
146
147
        result = comments.get_comments(self.execution)
148
        self.assertEqual(result.count(), 0)
149
150
    def verify_api_without_permission(self):
151
        comments.add_comment([self.execution], "Hello World!", self.user)
152
153
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
154
            self.rpc_client.TestExecution.remove_comment(self.execution.pk)
155
156
157
@override_settings(LANGUAGE_CODE="en")
158
class TestExecutionAddLink(APITestCase):
159
    """Test TestExecution.add_link"""
160
161
    def _fixture_setup(self):
162
        super()._fixture_setup()
163
164
        self.execution = TestExecutionFactory()
165
166
    def test_attach_log_with_non_existing_id(self):
167
        with self.assertRaisesRegex(
168
            XmlRPCFault, "constraint fail|violates foreign key"
169
        ):
170
            self.rpc_client.TestExecution.add_link(
171
                {"execution_id": -5, "name": "A test log", "url": "http://example.com"}
172
            )
173
174
    def test_attach_log(self):
175
        url = "http://127.0.0.1/test/test-log.log"
176
        result = self.rpc_client.TestExecution.add_link(
177
            {"execution_id": self.execution.pk, "name": "UT test logs", "url": url}
178
        )
179
        self.assertGreater(result["id"], 0)
180
        self.assertEqual(result["url"], url)
181
182
183
class TestExecutionAddLinkPermissions(APIPermissionsTestCase):
184
    """Test permissions of TestExecution.add_link"""
185
186
    permission_label = "linkreference.add_linkreference"
187
188
    def _fixture_setup(self):
189
        super()._fixture_setup()
190
191
        self.execution = TestExecutionFactory()
192
193
    def verify_api_with_permission(self):
194
        links = self.execution.links()
195
        self.assertFalse(links.exists())
196
197
        url = "http://example.com"
198
        result = self.rpc_client.TestExecution.add_link(
199
            {"execution_id": self.execution.pk, "url": url}
200
        )
201
202
        links = self.execution.links()
203
        self.assertEqual(self.execution.pk, result["execution"])
204
        self.assertEqual(url, result["url"])
205
        self.assertEqual(1, links.count())
206
        self.assertEqual(url, links.first().url)
207
208
    def verify_api_without_permission(self):
209
        url = "http://127.0.0.1/test/test-log.log"
210
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
211
            self.rpc_client.TestExecution.add_link(
212
                {"execution_id": self.execution.pk, "name": "UT test logs", "url": url}
213
            )
214
215
216
class TestExecutionRemoveLink(APITestCase):
217
    def _fixture_setup(self):
218
        super()._fixture_setup()
219
220
        self.status_idle = TestExecutionStatus.objects.filter(weight=0).first()
221
        self.tester = UserFactory()
222
        self.execution = TestExecutionFactory(
223
            assignee=self.tester, tested_by=None, sortkey=10, status=self.status_idle
224
        )
225
226
    def setUp(self):
227
        super().setUp()
228
229
        self.rpc_client.TestExecution.add_link(
230
            {
231
                "execution_id": self.execution.pk,
232
                "name": "Related issue",
233
                "url": "https://localhost/issue/1",
234
            }
235
        )
236
        self.link = self.execution.links()[0]
237
238
    def test_doesnt_raise_with_non_existing_id(self):
239
        self.rpc_client.TestExecution.remove_link({"execution_id": -9})
240
        links = self.execution.links()
241
        self.assertEqual(1, links.count())
242
        self.assertEqual(self.link.pk, links[0].pk)
243
244
    def test_detach_log_with_non_exist_log(self):
245
        self.rpc_client.TestExecution.remove_link({"pk": 999999999})
246
        links = self.execution.links()
247
        self.assertEqual(1, links.count())
248
        self.assertEqual(self.link.pk, links[0].pk)
249
250
    def test_detach_log(self):
251
        self.rpc_client.TestExecution.remove_link(
252
            {"execution_id": self.execution.pk, "pk": self.link.pk}
253
        )
254
        self.assertEqual([], list(self.execution.links()))
255
256
257
class TestExecutionRemoveLinkPermissions(APIPermissionsTestCase):
258
    """Test permissions of TestExecution.remove_link"""
259
260
    permission_label = "linkreference.delete_linkreference"
261
262
    def _fixture_setup(self):
263
        super()._fixture_setup()
264
265
        self.execution = TestExecutionFactory()
266
        self.link = LinkReferenceFactory(execution=self.execution)
267
        self.another_link = LinkReferenceFactory(execution=self.execution)
268
269
    def verify_api_with_permission(self):
270
        links = self.execution.links()
271
        self.assertEqual(2, links.count())
272
        self.assertIn(self.link, links)
273
        self.assertIn(self.another_link, links)
274
275
        self.rpc_client.TestExecution.remove_link({"pk": self.link.pk})
276
277
        links = self.execution.links()
278
        self.assertEqual(1, links.count())
279
        self.assertIn(self.another_link, links)
280
        self.assertNotIn(self.link, links)
281
282
    def verify_api_without_permission(self):
283
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
284
            self.rpc_client.TestExecution.remove_link({"pk": self.another_link.pk})
285
286
287
class TestExecutionFilter(APITestCase):
288
    def _fixture_setup(self):
289
        super()._fixture_setup()
290
291
        self.status_idle = TestExecutionStatus.objects.filter(weight=0).first()
292
        self.tester = UserFactory()
293
        self.execution = TestExecutionFactory(
294
            assignee=self.tester, tested_by=None, sortkey=10, status=self.status_idle
295
        )
296
297
    def test_with_non_exist_id(self):
298
        found = self.rpc_client.TestExecution.filter({"pk": -1})
299
        self.assertEqual(0, len(found))
300
301
    def test_filter_by_id(self):
302
        execution = self.rpc_client.TestExecution.filter({"pk": self.execution.pk})[0]
303
304
        self.assertIsNotNone(execution)
305
        self.assertEqual(execution["id"], self.execution.pk)
306
        self.assertEqual(execution["assignee"], self.tester.pk)
307
        self.assertEqual(execution["tested_by"], None)
308
        self.assertEqual(
309
            execution["case_text_version"], self.execution.case_text_version
310
        )
311
        self.assertEqual(execution["start_date"], self.execution.start_date)
312
        self.assertEqual(execution["stop_date"], self.execution.stop_date)
313
        self.assertEqual(execution["sortkey"], self.execution.sortkey)
314
        self.assertEqual(execution["run"], self.execution.run.pk)
315
        self.assertEqual(execution["case"], self.execution.case.pk)
316
        self.assertEqual(execution["build"], self.execution.build.pk)
317
        self.assertEqual(execution["status"], self.status_idle.pk)
318
319
320
class TestExecutionGetLinks(APITestCase):
321
    def _fixture_setup(self):
322
        super()._fixture_setup()
323
324
        self.execution_1 = TestExecutionFactory()
325
        self.execution_2 = TestExecutionFactory()
326
327
        self.rpc_client.TestExecution.add_link(
328
            {
329
                "execution_id": self.execution_1.pk,
330
                "name": "Test logs",
331
                "url": "http://kiwitcms.org",
332
            }
333
        )
334
335
    def test_get_links_with_non_exist_id(self):
336
        result = self.rpc_client.TestExecution.get_links({"execution": -9})
337
        self.assertEqual([], result)
338
339
    def test_get_empty_logs(self):
340
        logs = self.rpc_client.TestExecution.get_links(
341
            {"execution": self.execution_2.pk}
342
        )
343
        self.assertIsInstance(logs, list)
344
        self.assertEqual(len(logs), 0)
345
346
    def test_get_links(self):
347
        execution_log = LinkReference.objects.get(execution=self.execution_1.pk)
348
        logs = self.rpc_client.TestExecution.get_links(
349
            {"execution": self.execution_1.pk}
350
        )
351
        self.assertIsInstance(logs, list)
352
        self.assertEqual(len(logs), 1)
353
354
        self.assertEqual(logs[0]["id"], execution_log.pk)
355
        self.assertEqual(logs[0]["name"], "Test logs")
356
        self.assertEqual(logs[0]["url"], "http://kiwitcms.org")
357
        self.assertEqual(logs[0]["execution"], self.execution_1.pk)
358
        self.assertIn("created_on", logs[0])
359
        self.assertFalse(logs[0]["is_defect"])
360
361
362
class TestExecutionHistory(APITestCase):
363
    def _fixture_setup(self):
364
        super()._fixture_setup()
365
366
        self.execution = TestExecutionFactory()
367
368
    def test_history_for_non_existing_execution(self):
369
        with self.assertRaisesRegex(
370
            XmlRPCFault, "TestExecution matching query does not exist."
371
        ):
372
            self.rpc_client.TestExecution.history(-5)
373
374
    def test_history_new_execution(self):
375
        execution = TestExecutionFactory()
376
377
        history = self.rpc_client.TestExecution.history(execution.pk)
378
379
        self.assertEqual(1, len(history))
380
381
    def test_history(self):
382
        """
383
        Test that for an execution that has been updated 3 times,
384
        there are 4 history entries (first one is the creation of the object).
385
386
        Note: the `time.sleep` call after every update is necessary,
387
        because otherwise the changes happen too fast,
388
        and the XML-RPC protocol follows ISO 8601 which doesn't have sub-seconds precision.
389
        Hence the measurable delta is 1 second.
390
        """
391
        time.sleep(1)
392
393
        self.execution.build = BuildFactory()
394
        self.execution.save()
395
        time.sleep(1)
396
397
        user = UserFactory()
398
        self.execution.assignee = user
399
        self.execution.save()
400
        time.sleep(1)
401
402
        self.execution.tested_by = user
403
        self.execution.save()
404
        time.sleep(1)
405
406
        history = self.rpc_client.TestExecution.history(self.execution.pk)
407
        self.assertEqual(4, len(history))
408
409
        # assert entries are in the right order
410
        previous = timezone.now()
411
        for history_entry in history:
412
            self.assertTrue(history_entry["history_date"] < previous)
413
            previous = history_entry["history_date"]
414
415
416
class TestExecutionHistoryPermissions(APIPermissionsTestCase):
417
    """Test permissions of TestExecution.history"""
418
419
    permission_label = "testruns.view_testexecution"
420
421
    def _fixture_setup(self):
422
        super()._fixture_setup()
423
424
        self.execution = TestExecutionFactory()
425
426
    def verify_api_with_permission(self):
427
        history = self.rpc_client.TestExecution.history(self.execution.pk)
428
        self.assertEqual(1, len(history))
429
430
    def verify_api_without_permission(self):
431
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
432
            self.rpc_client.TestExecution.history(self.execution.pk)
433
434
435
@override_settings(LANGUAGE_CODE="en")
436
class TestExecutionUpdate(APITestCase):
437
    def _fixture_setup(self):
438
        super()._fixture_setup()
439
440
        self.user = UserFactory()
441
        self.build = BuildFactory()
442
        self.execution_1 = TestExecutionFactory()
443
        self.execution_2 = TestExecutionFactory()
444
        self.status_positive = TestExecutionStatus.objects.filter(weight__gt=0).last()
445
446
    def test_update_with_single_test_execution(self):
447
        execution = TestExecutionFactory(tested_by=None)
448
449
        result = self.rpc_client.TestExecution.update(
450
            execution.pk,
451
            {
452
                "build": self.build.pk,
453
                "assignee": self.user.pk,
454
                "sortkey": 90,
455
                "start_date": "2021-02-25",
456
                "stop_date": "2021-02-28 12:12:12",
457
            },
458
        )
459
460
        execution.refresh_from_db()
461
462
        self.assertEqual(result["id"], execution.pk)
463
        self.assertEqual(result["assignee"], self.user.pk)
464
        self.assertEqual(result["tested_by"], None)
465
        self.assertIn("case_text_version", result)
466
        self.assertEqual(result["start_date"], execution.start_date)
467
        self.assertEqual(result["stop_date"], execution.stop_date)
468
        self.assertEqual(result["sortkey"], 90)
469
        self.assertIn("run", result)
470
        self.assertIn("case", result)
471
        self.assertEqual(result["build"], self.build.pk)
472
        self.assertIn("status", result)
473
474
    def test_update_with_assignee_id(self):
475
        self.assertNotEqual(self.execution_1.assignee, self.user)
476
        execution = self.rpc_client.TestExecution.update(
477
            self.execution_1.pk, {"assignee": self.user.pk}
478
        )
479
        self.execution_1.refresh_from_db()
480
481
        self.assertEqual(execution["assignee"], self.user.pk)
482
        self.assertEqual(self.execution_1.assignee, self.user)
483
484
    def test_update_with_assignee_email(self):
485
        self.assertNotEqual(self.execution_1.assignee, self.user)
486
        execution = self.rpc_client.TestExecution.update(
487
            self.execution_1.pk, {"assignee": self.user.email}
488
        )
489
        self.execution_1.refresh_from_db()
490
491
        self.assertEqual(execution["assignee"], self.user.pk)
492
        self.assertEqual(self.execution_1.assignee, self.user)
493
494
    def test_update_with_assignee_username(self):
495
        self.assertNotEqual(self.execution_1.assignee, self.user)
496
        execution = self.rpc_client.TestExecution.update(
497
            self.execution_1.pk, {"assignee": self.user.username}
498
        )
499
        self.execution_1.refresh_from_db()
500
501
        self.assertEqual(execution["assignee"], self.user.pk)
502
        self.assertEqual(self.execution_1.assignee, self.user)
503
504
    def test_update_with_tested_by_id(self):
505
        self.assertNotEqual(self.execution_2.tested_by, self.user)
506
        execution = self.rpc_client.TestExecution.update(
507
            self.execution_2.pk, {"tested_by": self.user.pk}
508
        )
509
        self.execution_2.refresh_from_db()
510
511
        self.assertEqual(execution["tested_by"], self.user.pk)
512
        self.assertEqual(self.execution_2.tested_by, self.user)
513
514
    def test_update_with_tested_by_email(self):
515
        self.assertNotEqual(self.execution_2.tested_by, self.user)
516
        execution = self.rpc_client.TestExecution.update(
517
            self.execution_2.pk, {"tested_by": self.user.email}
518
        )
519
        self.execution_2.refresh_from_db()
520
521
        self.assertEqual(execution["tested_by"], self.user.pk)
522
        self.assertEqual(self.execution_2.tested_by, self.user)
523
524
    def test_update_with_tested_by_username(self):
525
        self.assertNotEqual(self.execution_2.tested_by, self.user)
526
        execution = self.rpc_client.TestExecution.update(
527
            self.execution_2.pk, {"tested_by": self.user.username}
528
        )
529
        self.execution_2.refresh_from_db()
530
531
        self.assertEqual(execution["tested_by"], self.user.pk)
532
        self.assertEqual(self.execution_2.tested_by, self.user)
533
534
    def test_update_with_non_existing_build(self):
535
        with self.assertRaisesRegex(XmlRPCFault, "Select a valid choice"):
536
            self.rpc_client.TestExecution.update(
537
                self.execution_1.pk, {"build": 1111111}
538
            )
539
540
    def test_update_with_non_existing_assignee_id(self):
541
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user_id"):
542
            self.rpc_client.TestExecution.update(
543
                self.execution_1.pk, {"assignee": 1111111}
544
            )
545
546
    def test_update_with_non_existing_assignee_email(self):
547
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
548
            self.rpc_client.TestExecution.update(
549
                self.execution_1.pk, {"assignee": "[email protected]"}
550
            )
551
552
    def test_update_with_non_existing_assignee_username(self):
553
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
554
            self.rpc_client.TestExecution.update(
555
                self.execution_1.pk, {"assignee": "nonExistentUsername"}
556
            )
557
558
    def test_update_with_non_existing_tested_by_id(self):
559
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user_id"):
560
            self.rpc_client.TestExecution.update(
561
                self.execution_2.pk, {"tested_by": 1111111}
562
            )
563
564
    def test_update_with_non_existing_tested_by_email(self):
565
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user"):
566
            self.rpc_client.TestExecution.update(
567
                self.execution_2.pk, {"tested_by": "[email protected]"}
568
            )
569
570
    def test_update_with_non_existing_tested_by_username(self):
571
        with self.assertRaisesRegex(XmlRPCFault, "Unknown user:"):
572
            self.rpc_client.TestExecution.update(
573
                self.execution_2.pk, {"tested_by": "nonExistentUsername"}
574
            )
575
576
    def test_update_when_case_text_version_is_integer(self):
577
        initial_case_text_version = self.execution_1.case_text_version
578
        self.update_test_case_text()
579
580
        execution = self.rpc_client.TestExecution.update(
581
            self.execution_1.pk,
582
            {
583
                "case_text_version": str(
584
                    self.execution_1.case.history.latest().history_id
585
                )
586
            },
587
        )
588
        self.execution_1.refresh_from_db()
589
590
        latest_case_text_version = self.execution_1.case_text_version
591
        self.assertNotEqual(initial_case_text_version, latest_case_text_version)
592
        self.assertEqual(execution["case_text_version"], latest_case_text_version)
593
        self.assertEqual(
594
            self.execution_1.case.history.latest().history_id, latest_case_text_version
595
        )
596
597
    def test_update_when_case_text_version_is_string_latest(self):
598
        initial_case_text_version = self.execution_1.case_text_version
599
        self.update_test_case_text()
600
601
        execution = self.rpc_client.TestExecution.update(
602
            self.execution_1.pk, {"case_text_version": "latest"}
603
        )
604
        self.execution_1.refresh_from_db()
605
606
        latest_case_text_version = self.execution_1.case_text_version
607
        self.assertNotEqual(initial_case_text_version, latest_case_text_version)
608
        self.assertEqual(execution["case_text_version"], latest_case_text_version)
609
        self.assertEqual(
610
            self.execution_1.case.history.latest().history_id, latest_case_text_version
611
        )
612
613
    def update_test_case_text(self):
614
        self.execution_1.case.summary = "Summary Updated"
615
        self.execution_1.case.text = "Text Updated"
616
        self.execution_1.case.save()
617
618
    def test_update_with_no_perm(self):
619
        self.rpc_client.Auth.logout()
620
        with self.assertRaisesRegex(ProtocolError, "403 Forbidden"):
621
            self.rpc_client.TestExecution.update(
622
                self.execution_1.pk, {"stop_date": timezone.now()}
623
            )
624
625
626
class TestExecutionUpdateStatus(APITestCase):
627
    def _fixture_setup(self):
628
        super()._fixture_setup()
629
630
        self.user = UserFactory()
631
        self.execution_1 = TestExecutionFactory()
632
        self.status_positive = TestExecutionStatus.objects.filter(weight__gt=0).last()
633
        self.status_negative = TestExecutionStatus.objects.filter(weight__lt=0).last()
634
        self.status_in_progress = TestExecutionStatus.objects.filter(weight=0).last()
635
636
    def test_changes_tested_by(self):
637
        execution = TestExecutionFactory(tested_by=None)
638
639
        self.rpc_client.TestExecution.update(
640
            execution.pk, {"status": self.status_positive.pk}
641
        )
642
        execution.refresh_from_db()
643
644
        self.assertEqual(execution.tested_by, self.api_user)
645
        self.assertEqual(execution.status, self.status_positive)
646
647
    def test_when_tested_by_specified_does_not_change_tested_by(self):
648
        execution = TestExecutionFactory(tested_by=None)
649
650
        self.rpc_client.TestExecution.update(
651
            execution.pk,
652
            {
653
                "status": self.status_positive.pk,
654
                "tested_by": self.user.pk,
655
            },
656
        )
657
        execution.refresh_from_db()
658
659
        self.assertEqual(execution.tested_by, self.user)
660
        self.assertEqual(execution.status, self.status_positive)
661
662
    def test_changes_build(self):
663
        # simulate what happens in reality where TestExeuctions are created
664
        # taking their initial .build values from the parent TestRun
665
        self.execution_1.build = self.execution_1.run.build
666
        self.execution_1.save()
667
668
        # now simulate a re-test scenario where TR.build has already changed
669
        # e.g. longer test cycle covering multiple builds
670
        self.execution_1.run.build = BuildFactory(name="b02")
671
        self.execution_1.run.save()
672
673
        self.rpc_client.TestExecution.update(
674
            self.execution_1.pk, {"status": self.status_positive.pk}
675
        )
676
677
        self.execution_1.refresh_from_db()
678
        self.assertEqual(self.execution_1.status, self.status_positive)
679
        self.assertEqual(self.execution_1.build.name, "b02")
680
681
    def test_when_build_specified_does_not_change_build(self):
682
        # simulate what happens in reality where TestExeuctions are created
683
        # taking their initial .build values from the parent TestRun
684
        self.execution_1.build = self.execution_1.run.build
685
        self.execution_1.save()
686
687
        build03 = BuildFactory(name="b03")
688
689
        self.rpc_client.TestExecution.update(
690
            self.execution_1.pk,
691
            {
692
                "status": self.status_positive.pk,
693
                "build": build03.pk,
694
            },
695
        )
696
697
        self.execution_1.refresh_from_db()
698
        self.assertEqual(self.execution_1.status, self.status_positive)
699
        self.assertEqual(self.execution_1.build.name, "b03")
700
        # these are different b/c the API call (e.g. from a plugin) has
701
        # passed an explicit build value
702
        self.assertNotEqual(self.execution_1.run.build, build03)
703
704
    def test_non_zero_status_changes_stop_date(self):
705
        """
706
        Non-zero weight statuses will set stop_date
707
        """
708
        few_secs_ago = timezone.now()
709
        self.execution_1.stop_date = None
710
        self.execution_1.save()
711
712
        self.rpc_client.TestExecution.update(
713
            self.execution_1.pk,
714
            {
715
                "status": self.status_positive.pk,
716
            },
717
        )
718
719
        self.execution_1.refresh_from_db()
720
        self.assertGreater(self.execution_1.stop_date, few_secs_ago)
721
722
    def test_zero_status_changes_stop_date(self):
723
        """
724
        Zero weight statuses will set stop_date to None,
725
        e.g. re-test the TE!
726
        """
727
        self.execution_1.stop_date = timezone.now()
728
        self.execution_1.save()
729
730
        self.rpc_client.TestExecution.update(
731
            self.execution_1.pk,
732
            {
733
                "status": TestExecutionStatus.objects.filter(weight=0).first().pk,
734
            },
735
        )
736
737
        self.execution_1.refresh_from_db()
738
        self.assertIsNone(self.execution_1.stop_date)
739
740
    def test_update_status_changes_run_status_completed(self):
741
        """
742
        When updating the status of a TestExecution to a completed one (status.weigth != 0),
743
        if all executions for a run are completed, set the run as completed
744
        """
745
        run = TestRunFactory(stop_date=None)
746
747
        TestExecutionFactory(status=self.status_positive, run=run)
748
        TestExecutionFactory(status=self.status_negative, run=run)
749
        execution = TestExecutionFactory(status=self.status_in_progress, run=run)
750
751
        self.rpc_client.TestExecution.update(
752
            execution.pk, {"status": self.status_positive.pk}
753
        )
754
755
        run.refresh_from_db()
756
        self.assertIsNotNone(run.stop_date)
757
758
    def test_update_status_changes_run_neutral(self):
759
        """
760
        When updating the status of a TestExecution to a not completed one (status.weigth == 0),
761
        set the run as not completed
762
        """
763
        run = TestRunFactory(stop_date=timezone.now())
764
765
        execution = TestExecutionFactory(status=self.status_positive, run=run)
766
        _ = TestExecutionFactory(status=self.status_negative, run=run)
767
768
        self.rpc_client.TestExecution.update(
769
            execution.pk, {"status": self.status_in_progress.pk}
770
        )
771
772
        run.refresh_from_db()
773
        self.assertIsNone(run.stop_date)
774