Completed
Pull Request — devel (#90)
by Paolo
18:48
created

SearchOrphanTaskTestCase.test_search_orphan()   A

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 14
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:19:41 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch, Mock
10
11
from django.test import TestCase
12
from django.utils import timezone
13
14
from common.constants import COMPLETED
15
from common.tests import AsyncIOMixin
16
17
from ..models import Submission
18
from ..tasks import CleanUpTask, SearchOrphanTask
19
20
21
class CleanUpTaskTestCase(TestCase):
22
23
    fixtures = [
24
        "biosample/submission",
25
        "uid/dictcountry",
26
        "uid/dictrole",
27
        "uid/organization",
28
        "uid/submission",
29
        "uid/user",
30
    ]
31
32
    def setUp(self):
33
        # calling my base setup
34
        super().setUp()
35
36
        # fix status for objects
37
        Submission.objects.update(status=COMPLETED)
38
39
        # get one objcet and updatete time
40
        Submission.objects.filter(pk=1).update(updated_at=timezone.now())
41
42
        # define my task
43
        self.my_task = CleanUpTask()
44
45
        # change lock_id (useful when running test during cron)
46
        self.my_task.lock_id = "test-CleanUpTask"
47
48
    def test_clean_up(self):
49
        """Test clean_up task"""
50
51
        for submission in Submission.objects.all():
52
            print(submission, submission.updated_at)
53
54
        # NOTE that I'm calling the function directly, without delay
55
        # (AsyncResult). I've patched the time consuming task
56
        res = self.my_task.run()
57
58
        # assert a success with data uploading
59
        self.assertEqual(res, "success")
60
61
        # assert one object in the database
62
        self.assertEqual(Submission.objects.count(), 1)
63
64
    # Test a non blocking instance
65
    @patch("redis.lock.Lock.acquire", return_value=False)
66
    def test_clean_up_nb(self, my_lock):
67
        """Test CleanUpTask while a lock is present"""
68
69
        res = self.my_task.run()
70
71
        # assert database is locked
72
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
73
74
        # assert two object in the database (fake running)
75
        self.assertEqual(Submission.objects.count(), 2)
76
77
78
class SearchOrphanTaskTestCase(AsyncIOMixin, TestCase):
79
80
    def setUp(self):
81
        # calling my base setup
82
        super().setUp()
83
84
        # ovveride the base patcher method
85
        self.asyncio_mock_patcher = patch(
86
            'asyncio.new_event_loop')
87
        self.asyncio_mock = self.asyncio_mock_patcher.start()
88
89
        # mocking asyncio return value
90
        self.run_until = self.asyncio_mock.return_value
91
        self.run_until.run_until_complete = Mock()
92
93
        # another patch
94
        self.check_samples_patcher = patch(
95
            'biosample.tasks.cleanup.check_samples')
96
        self.check_samples = self.check_samples_patcher.start()
97
98
        # define my task
99
        self.my_task = SearchOrphanTask()
100
101
        # change lock_id (useful when running test during cron)
102
        self.my_task.lock_id = "test-SearchOrphanTask"
103
104
    def tearDown(self):
105
        # stopping mock objects
106
        self.check_samples_patcher.stop()
107
108
        # calling base methods
109
        super().tearDown()
110
111
    def test_search_orphan(self):
112
        """Test SearchOrphanTask task"""
113
114
        # NOTE that I'm calling the function directly, without delay
115
        # (AsyncResult). I've patched the time consuming task
116
        res = self.my_task.run()
117
118
        # assert a success with data uploading
119
        self.assertEqual(res, "success")
120
121
        # assert function called
122
        self.assertEqual(self.asyncio_mock.call_count, 1)
123
        self.assertEqual(self.run_until.run_until_complete.call_count, 1)
124
        self.assertEqual(self.check_samples.call_count, 1)
125
126
    # Test a non blocking instance
127
    @patch("redis.lock.Lock.acquire", return_value=False)
128
    def test_search_orphan_nb(self, my_lock):
129
        """Test SearchOrphanTask while a lock is present"""
130
131
        res = self.my_task.run()
132
133
        # assert database is locked
134
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
135
136
        # assert function not called
137
        self.assertEqual(self.asyncio_mock.call_count, 0)
138
        self.assertEqual(self.run_until.run_until_complete.call_count, 0)
139
        self.assertEqual(self.check_samples.call_count, 0)
140