Passed
Pull Request — devel (#90)
by Paolo
06:42
created

CleanUpTaskTestCase.test_clean_up_nb()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:19:41 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch, Mock
10
11
from django.test import TestCase
12
from django.utils import timezone
13
14
from common.constants import COMPLETED
15
from common.tests import AsyncIOMixin
16
17
from ..models import Submission
18
from ..tasks import CleanUpTask, SearchOrphanTask
19
20
21
class CleanUpTaskTestCase(TestCase):
22
23
    fixtures = [
24
        "biosample/submission",
25
        "uid/dictcountry",
26
        "uid/dictrole",
27
        "uid/organization",
28
        "uid/submission",
29
        "uid/user",
30
    ]
31
32
    def setUp(self):
33
        # calling my base setup
34
        super().setUp()
35
36
        # fix status for objects
37
        Submission.objects.update(status=COMPLETED)
38
39
        # get one objcet and updatete time
40
        Submission.objects.filter(pk=1).update(updated_at=timezone.now())
41
42
        # define my task
43
        self.my_task = CleanUpTask()
44
45
        # change lock_id (useful when running test during cron)
46
        self.my_task.lock_id = "test-CleanUpTask"
47
48
    def test_clean_up(self):
49
        """Test clean_up task"""
50
51
        for submission in Submission.objects.all():
52
            print(submission, submission.updated_at)
53
54
        # NOTE that I'm calling the function directly, without delay
55
        # (AsyncResult). I've patched the time consuming task
56
        res = self.my_task.run()
57
58
        # assert a success with data uploading
59
        self.assertEqual(res, "success")
60
61
        # assert one object in the database
62
        self.assertEqual(Submission.objects.count(), 1)
63
64
    # Test a non blocking instance
65
    @patch("redis.lock.Lock.acquire", return_value=False)
66
    def test_clean_up_nb(self, my_lock):
67
        """Test CleanUpTask while a lock is present"""
68
69
        res = self.my_task.run()
70
71
        # assert database is locked
72
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
73
74
        # assert two object in the database (fake running)
75
        self.assertEqual(Submission.objects.count(), 2)
76
77
78
class SearchOrphanTaskTestCase(TestCase):
79
80
    def setUp(self):
81
        # calling my base setup
82
        super().setUp()
83
84
        # ovveride the base patcher method
85
        self.asyncio_mock_patcher = patch(
86
            'asyncio.new_event_loop')
87
        self.asyncio_mock = self.asyncio_mock_patcher.start()
88
89
        # mocking asyncio return value
90
        self.run_until = self.asyncio_mock.return_value
91
        self.run_until.run_until_complete = Mock()
92
93
        # another patch
94
        self.check_samples_patcher = patch(
95
            'biosample.tasks.cleanup.check_samples')
96
        self.check_samples = self.check_samples_patcher.start()
97
98
        # define my task
99
        self.my_task = SearchOrphanTask()
100
101
        # change lock_id (useful when running test during cron)
102
        self.my_task.lock_id = "test-SearchOrphanTask"
103
104
    def tearDown(self):
105
        # stopping mock objects
106
        self.asyncio_mock_patcher.stop()
107
        self.check_samples_patcher.stop()
108
109
        # calling base methods
110
        super().tearDown()
111
112
    def test_search_orphan(self):
113
        """Test SearchOrphanTask task"""
114
115
        # NOTE that I'm calling the function directly, without delay
116
        # (AsyncResult). I've patched the time consuming task
117
        res = self.my_task.run()
118
119
        # assert a success with data uploading
120
        self.assertEqual(res, "success")
121
122
        # assert function called
123
        self.assertEqual(self.asyncio_mock.call_count, 1)
124
        self.assertEqual(self.run_until.run_until_complete.call_count, 1)
125
        self.assertEqual(self.check_samples.call_count, 1)
126
127
    # Test a non blocking instance
128
    @patch("redis.lock.Lock.acquire", return_value=False)
129
    def test_search_orphan_nb(self, my_lock):
130
        """Test SearchOrphanTask while a lock is present"""
131
132
        res = self.my_task.run()
133
134
        # assert database is locked
135
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
136
137
        # assert function not called
138
        self.assertEqual(self.asyncio_mock.call_count, 0)
139
        self.assertEqual(self.run_until.run_until_complete.call_count, 0)
140
        self.assertEqual(self.check_samples.call_count, 0)
141