1
|
|
|
#!/usr/bin/env python3 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
""" |
4
|
|
|
Created on Thu Nov 14 16:19:41 2019 |
5
|
|
|
|
6
|
|
|
@author: Paolo Cozzi <[email protected]> |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
from unittest.mock import patch, Mock |
10
|
|
|
|
11
|
|
|
from django.core import mail |
12
|
|
|
from django.test import TestCase |
13
|
|
|
from django.utils import timezone |
14
|
|
|
|
15
|
|
|
from common.constants import COMPLETED |
16
|
|
|
|
17
|
|
|
from ..models import Submission |
18
|
|
|
from ..tasks import CleanUpTask, SearchOrphanTask |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
class CleanUpTaskTestCase(TestCase): |
22
|
|
|
|
23
|
|
|
fixtures = [ |
24
|
|
|
"biosample/submission", |
25
|
|
|
"uid/dictcountry", |
26
|
|
|
"uid/dictrole", |
27
|
|
|
"uid/organization", |
28
|
|
|
"uid/submission", |
29
|
|
|
"uid/user", |
30
|
|
|
] |
31
|
|
|
|
32
|
|
|
def setUp(self): |
33
|
|
|
# calling my base setup |
34
|
|
|
super().setUp() |
35
|
|
|
|
36
|
|
|
# fix status for objects |
37
|
|
|
Submission.objects.update(status=COMPLETED) |
38
|
|
|
|
39
|
|
|
# get one objcet and updatete time |
40
|
|
|
Submission.objects.filter(pk=1).update(updated_at=timezone.now()) |
41
|
|
|
|
42
|
|
|
# define my task |
43
|
|
|
self.my_task = CleanUpTask() |
44
|
|
|
|
45
|
|
|
# change lock_id (useful when running test during cron) |
46
|
|
|
self.my_task.lock_id = "test-CleanUpTask" |
47
|
|
|
|
48
|
|
|
def test_clean_up(self): |
49
|
|
|
"""Test clean_up task""" |
50
|
|
|
|
51
|
|
|
for submission in Submission.objects.all(): |
52
|
|
|
print(submission, submission.updated_at) |
53
|
|
|
|
54
|
|
|
# NOTE that I'm calling the function directly, without delay |
55
|
|
|
# (AsyncResult). I've patched the time consuming task |
56
|
|
|
res = self.my_task.run() |
57
|
|
|
|
58
|
|
|
# assert a success with data uploading |
59
|
|
|
self.assertEqual(res, "success") |
60
|
|
|
|
61
|
|
|
# assert one object in the database |
62
|
|
|
self.assertEqual(Submission.objects.count(), 1) |
63
|
|
|
|
64
|
|
|
# Test a non blocking instance |
65
|
|
|
@patch("redis.lock.Lock.acquire", return_value=False) |
66
|
|
|
def test_clean_up_nb(self, my_lock): |
67
|
|
|
"""Test CleanUpTask while a lock is present""" |
68
|
|
|
|
69
|
|
|
res = self.my_task.run() |
70
|
|
|
|
71
|
|
|
# assert database is locked |
72
|
|
|
self.assertEqual(res, "%s already running!" % (self.my_task.name)) |
73
|
|
|
|
74
|
|
|
# assert two object in the database (fake running) |
75
|
|
|
self.assertEqual(Submission.objects.count(), 2) |
76
|
|
|
|
77
|
|
|
|
78
|
|
|
class SearchOrphanTaskTestCase(TestCase): |
79
|
|
|
|
80
|
|
|
fixtures = [ |
81
|
|
|
'biosample/managedteam', |
82
|
|
|
'biosample/orphansample', |
83
|
|
|
'uid/dictspecie', |
84
|
|
|
] |
85
|
|
|
|
86
|
|
|
def setUp(self): |
87
|
|
|
# calling my base setup |
88
|
|
|
super().setUp() |
89
|
|
|
|
90
|
|
|
# ovveride the base patcher method |
91
|
|
|
self.asyncio_mock_patcher = patch( |
92
|
|
|
'asyncio.new_event_loop') |
93
|
|
|
self.asyncio_mock = self.asyncio_mock_patcher.start() |
94
|
|
|
|
95
|
|
|
# mocking asyncio return value |
96
|
|
|
self.run_until = self.asyncio_mock.return_value |
97
|
|
|
self.run_until.run_until_complete = Mock() |
98
|
|
|
|
99
|
|
|
# another patch |
100
|
|
|
self.check_samples_patcher = patch( |
101
|
|
|
'biosample.tasks.cleanup.check_samples') |
102
|
|
|
self.check_samples = self.check_samples_patcher.start() |
103
|
|
|
|
104
|
|
|
# define my task |
105
|
|
|
self.my_task = SearchOrphanTask() |
106
|
|
|
|
107
|
|
|
# change lock_id (useful when running test during cron) |
108
|
|
|
self.my_task.lock_id = "test-SearchOrphanTask" |
109
|
|
|
|
110
|
|
|
def tearDown(self): |
111
|
|
|
# stopping mock objects |
112
|
|
|
self.asyncio_mock_patcher.stop() |
113
|
|
|
self.check_samples_patcher.stop() |
114
|
|
|
|
115
|
|
|
# calling base methods |
116
|
|
|
super().tearDown() |
117
|
|
|
|
118
|
|
|
def test_search_orphan(self): |
119
|
|
|
"""Test SearchOrphanTask task""" |
120
|
|
|
|
121
|
|
|
# NOTE that I'm calling the function directly, without delay |
122
|
|
|
# (AsyncResult). I've patched the time consuming task |
123
|
|
|
res = self.my_task.run() |
124
|
|
|
|
125
|
|
|
# assert a success with data uploading |
126
|
|
|
self.assertEqual(res, "success") |
127
|
|
|
|
128
|
|
|
# assert function called |
129
|
|
|
self.assertEqual(self.asyncio_mock.call_count, 1) |
130
|
|
|
self.assertEqual(self.run_until.run_until_complete.call_count, 1) |
131
|
|
|
self.assertEqual(self.check_samples.call_count, 1) |
132
|
|
|
|
133
|
|
|
# test email sent |
134
|
|
|
self.assertEqual(len(mail.outbox), 1) |
135
|
|
|
|
136
|
|
|
# read email |
137
|
|
|
email = mail.outbox[0] |
138
|
|
|
|
139
|
|
|
self.assertEqual( |
140
|
|
|
"Some entries in BioSamples are orphan", |
141
|
|
|
email.subject) |
142
|
|
|
|
143
|
|
|
# Test a non blocking instance |
144
|
|
|
@patch("redis.lock.Lock.acquire", return_value=False) |
145
|
|
|
def test_search_orphan_nb(self, my_lock): |
146
|
|
|
"""Test SearchOrphanTask while a lock is present""" |
147
|
|
|
|
148
|
|
|
res = self.my_task.run() |
149
|
|
|
|
150
|
|
|
# assert database is locked |
151
|
|
|
self.assertEqual(res, "%s already running!" % (self.my_task.name)) |
152
|
|
|
|
153
|
|
|
# assert function not called |
154
|
|
|
self.assertEqual(self.asyncio_mock.call_count, 0) |
155
|
|
|
self.assertEqual(self.run_until.run_until_complete.call_count, 0) |
156
|
|
|
self.assertEqual(self.check_samples.call_count, 0) |
157
|
|
|
|