Passed
Pull Request — master (#97)
by Paolo
03:10
created

CleanUpTaskTestCase.test_clean_up()   A

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 15
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:19:41 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch, Mock
10
11
from django.core import mail
12
from django.test import TestCase
13
from django.utils import timezone
14
15
from common.constants import COMPLETED
16
17
from ..models import Submission
18
from ..tasks import CleanUpTask, SearchOrphanTask
19
from .common import BioSamplesMixin
20
21
22
class CleanUpTaskTestCase(TestCase):
23
24
    fixtures = [
25
        "biosample/submission",
26
        "uid/dictcountry",
27
        "uid/dictrole",
28
        "uid/organization",
29
        "uid/submission",
30
        "uid/user",
31
    ]
32
33
    def setUp(self):
34
        # calling my base setup
35
        super().setUp()
36
37
        # fix status for objects
38
        Submission.objects.update(status=COMPLETED)
39
40
        # get one objcet and updatete time
41
        Submission.objects.filter(pk=1).update(updated_at=timezone.now())
42
43
        # define my task
44
        self.my_task = CleanUpTask()
45
46
        # change lock_id (useful when running test during cron)
47
        self.my_task.lock_id = "test-CleanUpTask"
48
49
    def test_clean_up(self):
50
        """Test clean_up task"""
51
52
        for submission in Submission.objects.all():
53
            print(submission, submission.updated_at)
54
55
        # NOTE that I'm calling the function directly, without delay
56
        # (AsyncResult). I've patched the time consuming task
57
        res = self.my_task.run()
58
59
        # assert a success with data uploading
60
        self.assertEqual(res, "success")
61
62
        # assert one object in the database
63
        self.assertEqual(Submission.objects.count(), 1)
64
65
    # Test a non blocking instance
66
    @patch("redis.lock.Lock.acquire", return_value=False)
67
    def test_clean_up_nb(self, my_lock):
68
        """Test CleanUpTask while a lock is present"""
69
70
        res = self.my_task.run()
71
72
        # assert database is locked
73
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
74
75
        # assert two object in the database (fake running)
76
        self.assertEqual(Submission.objects.count(), 2)
77
78
79
class SearchOrphanTaskTestCase(TestCase):
80
81
    fixtures = [
82
        'biosample/managedteam',
83
        'biosample/orphansample',
84
        'uid/dictspecie',
85
    ]
86
87
    def setUp(self):
88
        # calling my base setup
89
        super().setUp()
90
91
        # ovveride the base patcher method
92
        self.asyncio_mock_patcher = patch(
93
            'asyncio.new_event_loop')
94
        self.asyncio_mock = self.asyncio_mock_patcher.start()
95
96
        # mocking asyncio return value
97
        self.run_until = self.asyncio_mock.return_value
98
        self.run_until.run_until_complete = Mock()
99
100
        # another patch
101
        self.check_samples_patcher = patch(
102
            'biosample.tasks.cleanup.check_samples')
103
        self.check_samples = self.check_samples_patcher.start()
104
105
        # define my task
106
        self.my_task = SearchOrphanTask()
107
108
        # change lock_id (useful when running test during cron)
109
        self.my_task.lock_id = "test-SearchOrphanTask"
110
111
    def tearDown(self):
112
        # stopping mock objects
113
        self.asyncio_mock_patcher.stop()
114
        self.check_samples_patcher.stop()
115
116
        # calling base methods
117
        super().tearDown()
118
119
    def test_search_orphan(self):
120
        """Test SearchOrphanTask task"""
121
122
        # NOTE that I'm calling the function directly, without delay
123
        # (AsyncResult). I've patched the time consuming task
124
        res = self.my_task.run()
125
126
        # assert a success with data uploading
127
        self.assertEqual(res, "success")
128
129
        # assert function called
130
        self.assertEqual(self.asyncio_mock.call_count, 1)
131
        self.assertEqual(self.run_until.run_until_complete.call_count, 1)
132
        self.assertEqual(self.check_samples.call_count, 1)
133
134
        # test email sent
135
        self.assertEqual(len(mail.outbox), 1)
136
137
        # read email
138
        email = mail.outbox[0]
139
140
        self.assertEqual(
141
            "Some entries in BioSamples are orphan",
142
            email.subject)
143
144
    # Test a non blocking instance
145
    @patch("redis.lock.Lock.acquire", return_value=False)
146
    def test_search_orphan_nb(self, my_lock):
147
        """Test SearchOrphanTask while a lock is present"""
148
149
        res = self.my_task.run()
150
151
        # assert database is locked
152
        self.assertEqual(res, "%s already running!" % (self.my_task.name))
153
154
        # assert function not called
155
        self.assertEqual(self.asyncio_mock.call_count, 0)
156
        self.assertEqual(self.run_until.run_until_complete.call_count, 0)
157
        self.assertEqual(self.check_samples.call_count, 0)
158