Passed
Pull Request — devel (#90)
by Paolo
06:31
created

SearchOrphanTaskTestCase.setUp()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 14
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:19:41 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch
10
11
from django.test import TestCase
12
from django.utils import timezone
13
14
from common.constants import COMPLETED
15
from common.tests import AsyncIOMixin
16
17
from ..models import Submission
18
from ..tasks import CleanUpTask, SearchOrphanTask
19
20
21
class CleanUpTaskTestCase(TestCase):
22
23
    fixtures = [
24
        "biosample/submission",
25
        "uid/dictcountry",
26
        "uid/dictrole",
27
        "uid/organization",
28
        "uid/submission",
29
        "uid/user",
30
    ]
31
32
    def setUp(self):
33
        # calling my base setup
34
        super().setUp()
35
36
        # fix status for objects
37
        Submission.objects.update(status=COMPLETED)
38
39
        # get one objcet and updatete time
40
        Submission.objects.filter(pk=1).update(updated_at=timezone.now())
41
42
        # define my task
43
        self.my_task = CleanUpTask()
44
45
        # change lock_id (useful when running test during cron)
46
        self.my_task.lock_id = "test-CleanUpTask"
47
48
    def test_clean_up(self):
49
        """Test clean_up task"""
50
51
        for submission in Submission.objects.all():
52
            print(submission, submission.updated_at)
53
54
        # NOTE that I'm calling the function directly, without delay
55
        # (AsyncResult). I've patched the time consuming task
56
        res = self.my_task.run()
57
58
        # assert a success with data uploading
59
        self.assertEqual(res, "success")
60
61
        # assert one object in the database
62
        self.assertEqual(Submission.objects.count(), 1)
63
64
    # Test a non blocking instance
65
    @patch("redis.lock.Lock.acquire", return_value=False)
66
    def test_clean_up_nb(self, my_lock):
67
        """Test CleanUpTask while a lock is present"""
68
69
        res = self.my_task.run()
70
71
        # assert database is locked
72
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
73
74
        # assert two object in the database (fake running)
75
        self.assertEqual(Submission.objects.count(), 2)
76
77
78
class SearchOrphanTaskTestCase(AsyncIOMixin, TestCase):
79
80
    def setUp(self):
81
        # calling my base setup
82
        super().setUp()
83
84
        # another patch
85
        self.check_samples_patcher = patch(
86
            'biosample.tasks.cleanup.check_samples')
87
        self.check_samples = self.check_samples_patcher.start()
88
89
        # define my task
90
        self.my_task = SearchOrphanTask()
91
92
        # change lock_id (useful when running test during cron)
93
        self.my_task.lock_id = "test-SearchOrphanTask"
94
95
    def tearDown(self):
96
        # stopping mock objects
97
        self.check_samples_patcher.stop()
98
99
        # calling base methods
100
        super().tearDown()
101
102
    def test_search_orphan(self):
103
        """Test SearchOrphanTask task"""
104
105
        # NOTE that I'm calling the function directly, without delay
106
        # (AsyncResult). I've patched the time consuming task
107
        res = self.my_task.run()
108
109
        # assert a success with data uploading
110
        self.assertEqual(res, "success")
111
112
        # assert function called
113
        self.assertEqual(self.asyncio_mock.call_count, 1)
114
        self.assertEqual(self.run_until.run_until_complete.call_count, 1)
115
        self.assertEqual(self.check_samples.call_count, 1)
116
117
    # Test a non blocking instance
118
    @patch("redis.lock.Lock.acquire", return_value=False)
119
    def test_search_orphan_nb(self, my_lock):
120
        """Test SearchOrphanTask while a lock is present"""
121
122
        res = self.my_task.run()
123
124
        # assert database is locked
125
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
126
127
        # assert function not called
128
        self.assertEqual(self.asyncio_mock.call_count, 0)
129
        self.assertEqual(self.run_until.run_until_complete.call_count, 0)
130
        self.assertEqual(self.check_samples.call_count, 0)
131