Completed
Push — master ( 55dedd...16663f )
by Paolo
30s queued 14s
created

CleanUpTaskTestCase.test_fetch_status_nb()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:19:41 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch, Mock
10
11
from django.core import mail
12
from django.test import TestCase
13
from django.utils import timezone
14
15
from common.constants import COMPLETED
16
17
from ..models import Submission
18
from ..tasks import CleanUpTask, SearchOrphanTask
19
20
21
class CleanUpTaskTestCase(TestCase):
22
23
    fixtures = [
24
        "biosample/submission",
25
        "uid/dictcountry",
26
        "uid/dictrole",
27
        "uid/organization",
28
        "uid/submission",
29
        "uid/user",
30
    ]
31
32
    def setUp(self):
33
        # calling my base setup
34
        super().setUp()
35
36
        # fix status for objects
37
        Submission.objects.update(status=COMPLETED)
38
39
        # get one objcet and updatete time
40
        Submission.objects.filter(pk=1).update(updated_at=timezone.now())
41
42
        # define my task
43
        self.my_task = CleanUpTask()
44
45
        # change lock_id (useful when running test during cron)
46
        self.my_task.lock_id = "test-CleanUpTask"
47
48
    def test_clean_up(self):
49
        """Test clean_up task"""
50
51
        for submission in Submission.objects.all():
52
            print(submission, submission.updated_at)
53
54
        # NOTE that I'm calling the function directly, without delay
55
        # (AsyncResult). I've patched the time consuming task
56
        res = self.my_task.run()
57
58
        # assert a success with data uploading
59
        self.assertEqual(res, "success")
60
61
        # assert one object in the database
62
        self.assertEqual(Submission.objects.count(), 1)
63
64
    # Test a non blocking instance
65
    @patch("redis.lock.Lock.acquire", return_value=False)
66
    def test_clean_up_nb(self, my_lock):
67
        """Test CleanUpTask while a lock is present"""
68
69
        res = self.my_task.run()
70
71
        # assert database is locked
72
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
73
74
        # assert two object in the database (fake running)
75
        self.assertEqual(Submission.objects.count(), 2)
76
77
78
class SearchOrphanTaskTestCase(TestCase):
79
80
    fixtures = [
81
        'biosample/managedteam',
82
        'biosample/orphansample',
83
        'uid/dictspecie',
84
    ]
85
86
    def setUp(self):
87
        # calling my base setup
88
        super().setUp()
89
90
        # ovveride the base patcher method
91
        self.asyncio_mock_patcher = patch(
92
            'asyncio.new_event_loop')
93
        self.asyncio_mock = self.asyncio_mock_patcher.start()
94
95
        # mocking asyncio return value
96
        self.run_until = self.asyncio_mock.return_value
97
        self.run_until.run_until_complete = Mock()
98
99
        # another patch
100
        self.check_samples_patcher = patch(
101
            'biosample.tasks.cleanup.check_samples')
102
        self.check_samples = self.check_samples_patcher.start()
103
104
        # define my task
105
        self.my_task = SearchOrphanTask()
106
107
        # change lock_id (useful when running test during cron)
108
        self.my_task.lock_id = "test-SearchOrphanTask"
109
110
    def tearDown(self):
111
        # stopping mock objects
112
        self.asyncio_mock_patcher.stop()
113
        self.check_samples_patcher.stop()
114
115
        # calling base methods
116
        super().tearDown()
117
118
    def test_search_orphan(self):
119
        """Test SearchOrphanTask task"""
120
121
        # NOTE that I'm calling the function directly, without delay
122
        # (AsyncResult). I've patched the time consuming task
123
        res = self.my_task.run()
124
125
        # assert a success with data uploading
126
        self.assertEqual(res, "success")
127
128
        # assert function called
129
        self.assertEqual(self.asyncio_mock.call_count, 1)
130
        self.assertEqual(self.run_until.run_until_complete.call_count, 1)
131
        self.assertEqual(self.check_samples.call_count, 1)
132
133
        # test email sent
134
        self.assertEqual(len(mail.outbox), 1)
135
136
        # read email
137
        email = mail.outbox[0]
138
139
        self.assertEqual(
140
            "Some entries in BioSamples are orphan",
141
            email.subject)
142
143
    # Test a non blocking instance
144
    @patch("redis.lock.Lock.acquire", return_value=False)
145
    def test_search_orphan_nb(self, my_lock):
146
        """Test SearchOrphanTask while a lock is present"""
147
148
        res = self.my_task.run()
149
150
        # assert database is locked
151
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
152
153
        # assert function not called
154
        self.assertEqual(self.asyncio_mock.call_count, 0)
155
        self.assertEqual(self.run_until.run_until_complete.call_count, 0)
156
        self.assertEqual(self.check_samples.call_count, 0)
157