1
|
|
|
#!/usr/bin/env python3 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
""" |
4
|
|
|
Created on Thu Nov 14 16:19:41 2019 |
5
|
|
|
|
6
|
|
|
@author: Paolo Cozzi <[email protected]> |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
from unittest.mock import patch, Mock |
10
|
|
|
|
11
|
|
|
from django.core import mail |
12
|
|
|
from django.test import TestCase |
13
|
|
|
from django.utils import timezone |
14
|
|
|
|
15
|
|
|
from common.constants import COMPLETED |
16
|
|
|
|
17
|
|
|
from ..models import Submission |
18
|
|
|
from ..tasks import CleanUpTask, SearchOrphanTask |
19
|
|
|
from .common import BioSamplesMixin |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class CleanUpTaskTestCase(TestCase): |
23
|
|
|
|
24
|
|
|
fixtures = [ |
25
|
|
|
"biosample/submission", |
26
|
|
|
"uid/dictcountry", |
27
|
|
|
"uid/dictrole", |
28
|
|
|
"uid/organization", |
29
|
|
|
"uid/submission", |
30
|
|
|
"uid/user", |
31
|
|
|
] |
32
|
|
|
|
33
|
|
|
def setUp(self): |
34
|
|
|
# calling my base setup |
35
|
|
|
super().setUp() |
36
|
|
|
|
37
|
|
|
# fix status for objects |
38
|
|
|
Submission.objects.update(status=COMPLETED) |
39
|
|
|
|
40
|
|
|
# get one objcet and updatete time |
41
|
|
|
Submission.objects.filter(pk=1).update(updated_at=timezone.now()) |
42
|
|
|
|
43
|
|
|
# define my task |
44
|
|
|
self.my_task = CleanUpTask() |
45
|
|
|
|
46
|
|
|
# change lock_id (useful when running test during cron) |
47
|
|
|
self.my_task.lock_id = "test-CleanUpTask" |
48
|
|
|
|
49
|
|
|
def test_clean_up(self): |
50
|
|
|
"""Test clean_up task""" |
51
|
|
|
|
52
|
|
|
for submission in Submission.objects.all(): |
53
|
|
|
print(submission, submission.updated_at) |
54
|
|
|
|
55
|
|
|
# NOTE that I'm calling the function directly, without delay |
56
|
|
|
# (AsyncResult). I've patched the time consuming task |
57
|
|
|
res = self.my_task.run() |
58
|
|
|
|
59
|
|
|
# assert a success with data uploading |
60
|
|
|
self.assertEqual(res, "success") |
61
|
|
|
|
62
|
|
|
# assert one object in the database |
63
|
|
|
self.assertEqual(Submission.objects.count(), 1) |
64
|
|
|
|
65
|
|
|
# Test a non blocking instance |
66
|
|
|
@patch("redis.lock.Lock.acquire", return_value=False) |
67
|
|
|
def test_clean_up_nb(self, my_lock): |
68
|
|
|
"""Test CleanUpTask while a lock is present""" |
69
|
|
|
|
70
|
|
|
res = self.my_task.run() |
71
|
|
|
|
72
|
|
|
# assert database is locked |
73
|
|
|
self.assertEqual(res, "%s already running!" % (self.my_task.name)) |
74
|
|
|
|
75
|
|
|
# assert two object in the database (fake running) |
76
|
|
|
self.assertEqual(Submission.objects.count(), 2) |
77
|
|
|
|
78
|
|
|
|
79
|
|
|
class SearchOrphanTaskTestCase(TestCase): |
80
|
|
|
|
81
|
|
|
fixtures = [ |
82
|
|
|
'biosample/managedteam', |
83
|
|
|
'biosample/orphansample', |
84
|
|
|
'uid/dictspecie', |
85
|
|
|
] |
86
|
|
|
|
87
|
|
|
def setUp(self): |
88
|
|
|
# calling my base setup |
89
|
|
|
super().setUp() |
90
|
|
|
|
91
|
|
|
# ovveride the base patcher method |
92
|
|
|
self.asyncio_mock_patcher = patch( |
93
|
|
|
'asyncio.new_event_loop') |
94
|
|
|
self.asyncio_mock = self.asyncio_mock_patcher.start() |
95
|
|
|
|
96
|
|
|
# mocking asyncio return value |
97
|
|
|
self.run_until = self.asyncio_mock.return_value |
98
|
|
|
self.run_until.run_until_complete = Mock() |
99
|
|
|
|
100
|
|
|
# another patch |
101
|
|
|
self.check_samples_patcher = patch( |
102
|
|
|
'biosample.tasks.cleanup.check_samples') |
103
|
|
|
self.check_samples = self.check_samples_patcher.start() |
104
|
|
|
|
105
|
|
|
# define my task |
106
|
|
|
self.my_task = SearchOrphanTask() |
107
|
|
|
|
108
|
|
|
# change lock_id (useful when running test during cron) |
109
|
|
|
self.my_task.lock_id = "test-SearchOrphanTask" |
110
|
|
|
|
111
|
|
|
def tearDown(self): |
112
|
|
|
# stopping mock objects |
113
|
|
|
self.asyncio_mock_patcher.stop() |
114
|
|
|
self.check_samples_patcher.stop() |
115
|
|
|
|
116
|
|
|
# calling base methods |
117
|
|
|
super().tearDown() |
118
|
|
|
|
119
|
|
|
def test_search_orphan(self): |
120
|
|
|
"""Test SearchOrphanTask task""" |
121
|
|
|
|
122
|
|
|
# NOTE that I'm calling the function directly, without delay |
123
|
|
|
# (AsyncResult). I've patched the time consuming task |
124
|
|
|
res = self.my_task.run() |
125
|
|
|
|
126
|
|
|
# assert a success with data uploading |
127
|
|
|
self.assertEqual(res, "success") |
128
|
|
|
|
129
|
|
|
# assert function called |
130
|
|
|
self.assertEqual(self.asyncio_mock.call_count, 1) |
131
|
|
|
self.assertEqual(self.run_until.run_until_complete.call_count, 1) |
132
|
|
|
self.assertEqual(self.check_samples.call_count, 1) |
133
|
|
|
|
134
|
|
|
# test email sent |
135
|
|
|
self.assertEqual(len(mail.outbox), 1) |
136
|
|
|
|
137
|
|
|
# read email |
138
|
|
|
email = mail.outbox[0] |
139
|
|
|
|
140
|
|
|
self.assertEqual( |
141
|
|
|
"Some entries in BioSamples are orphan", |
142
|
|
|
email.subject) |
143
|
|
|
|
144
|
|
|
# Test a non blocking instance |
145
|
|
|
@patch("redis.lock.Lock.acquire", return_value=False) |
146
|
|
|
def test_search_orphan_nb(self, my_lock): |
147
|
|
|
"""Test SearchOrphanTask while a lock is present""" |
148
|
|
|
|
149
|
|
|
res = self.my_task.run() |
150
|
|
|
|
151
|
|
|
# assert database is locked |
152
|
|
|
self.assertEqual(res, "%s already running!" % (self.my_task.name)) |
153
|
|
|
|
154
|
|
|
# assert function not called |
155
|
|
|
self.assertEqual(self.asyncio_mock.call_count, 0) |
156
|
|
|
self.assertEqual(self.run_until.run_until_complete.call_count, 0) |
157
|
|
|
self.assertEqual(self.check_samples.call_count, 0) |
158
|
|
|
|