Passed
Pull Request — devel (#90)
by Paolo
06:42
created

biosample.tasks.cleanup.purge_orphan_samples()   A

Complexity

Conditions 3

Size

Total Lines 44
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 44
rs 9.376
c 0
b 0
f 0
cc 3
nop 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
16
from datetime import timedelta
17
from celery.utils.log import get_task_logger
18
from django.utils import timezone
19
from django.utils.dateparse import parse_date
20
21
from common.constants import COMPLETED, BIOSAMPLE_URL
22
from common.helpers import format_attribute
23
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
24
from image.celery import app as celery_app
25
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
26
27
from ..helpers import get_manager_auth
28
from ..models import Submission, OrphanSample, ManagedTeam
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# defining constants. Clean biosample database data after
34
CLEANUP_DAYS = 30
35
36
# this is the timedelta which I want to add to relaseDate to remove samples
37
RELEASE_TIMEDELTA = timedelta(days=365*1000)
38
39
# Setting page size for biosample requests
40
PAGE_SIZE = 20
41
42
PARAMS = MultiDict([
43
    ('size', PAGE_SIZE),
44
    ('filter', 'attr:project:IMAGE'),
45
    ])
46
HEADERS = {
47
        'Accept': 'application/hal+json',
48
    }
49
50
51
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
52
    """Perform biosample.models cleanup by selecting old completed submission
53
    and remove them from database"""
54
55
    name = "Clean biosample models"
56
    description = """Clean biosample models"""
57
58
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
59
    def run(self):
60
        """
61
        This function is called when delay is called. It will acquire a lock
62
        in redis, so those tasks are mutually exclusive
63
64
        Returns:
65
            str: success if everything is ok. Different messages if task is
66
            already running or exception is caught"""
67
68
        logger.info("Clean biosample.database started")
69
70
        # get an interval starting from now
71
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
72
73
        # select all COMPLETED object older than interval
74
        qs = Submission.objects.filter(
75
            updated_at__lt=interval,
76
            status=COMPLETED)
77
78
        logger.info(
79
            "Deleting %s biosample.models.Submission objects" % qs.count())
80
81
        # delete all old objects
82
        qs.delete()
83
84
        # debug
85
        logger.info("Clean biosample.database completed")
86
87
        return "success"
88
89
90
async def fetch(session, url=BIOSAMPLE_URL, params=PARAMS):
91
    """Get a page from biosamples"""
92
93
    # define a URL with yarl
94
    url = URL(url)
95
    url = url.update_query(params)
96
97
    async with session.get(url, headers=HEADERS) as response:
98
        return await response.json()
99
100
101
async def parse_samples_data(data, managed_domains):
102
    # get samples objects
103
    try:
104
        samples = data['_embedded']['samples']
105
106
        for sample in samples:
107
            # filter out unmanaged records
108
            if sample['domain'] not in managed_domains:
109
                logger.warning("Ignoring %s" % (sample['name']))
110
                continue
111
112
            # otherwise return to the caller the sample
113
            yield sample
114
115
    except KeyError as exc:
116
        # logger exception. With repr() the exception name is rendered
117
        logger.error(repr(exc))
118
        logger.warning("error while parsing samples")
119
        logger.warning(data)
120
121
122
async def get_samples(
123
        url=BIOSAMPLE_URL,
124
        params=PARAMS,
125
        managed_domains=[]):
126
    async with aiohttp.ClientSession() as session:
127
        data = await fetch(session, url, params)
128
129
        # process data and filter samples I own
130
        # https://stackoverflow.com/a/47378063
131
        async for sample in parse_samples_data(data, managed_domains):
132
            yield sample
133
134
        tasks = []
135
136
        # get pages
137
        totalPages = data['page']['totalPages']
138
139
        # generate new awaitable objects
140
        for page in range(1, totalPages):
141
            # get a new param object to edit
142
            my_params = params.copy()
143
144
            # edit a multidict object
145
            my_params.update(page=page)
146
147
            # track the new awaitable object
148
            tasks.append(fetch(session, url, my_params))
149
150
        # Run awaitable objects in the aws set concurrently.
151
        # Return an iterator of Future objects.
152
        for task in asyncio.as_completed(tasks):
153
            # read data
154
            data = await task
155
156
            # process data and filter samples I own
157
            # https://stackoverflow.com/a/47378063
158
            async for sample in parse_samples_data(data, managed_domains):
159
                yield sample
160
161
162
async def check_samples():
163
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
164
    # belong to me
165
    auth = get_manager_auth()
166
    managed_domains = auth.get_domains()
167
168
    async for sample in get_samples(managed_domains=managed_domains):
169
        check_orphan_sample(sample)
170
171
172
def check_orphan_sample(sample):
173
    animal_qs = UIDAnimal.objects.filter(
174
        biosample_id=sample['accession'])
175
176
    sample_qs = UIDSample.objects.filter(
177
        biosample_id=sample['accession'])
178
179
    if animal_qs.exists() or sample_qs.exists():
180
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
181
182
    else:
183
        # get a managed team
184
        team = ManagedTeam.objects.get(name=sample["domain"])
185
186
        # Create an orphan sample
187
        orphan, created = OrphanSample.objects.get_or_create(
188
            biosample_id=sample['accession'],
189
            name=sample['name'],
190
            team=team)
191
192
        if created:
193
            logger.warning("Add %s to orphan samples" % sample['accession'])
194
195
196
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
197
    """Search accross biosamples for objects not present in UID"""
198
199
    name = "Search Orphan BioSamples IDs"
200
    description = """Track BioSamples IDs not present in UID"""
201
202
    @exclusive_task(
203
        task_name=name, lock_id="SearchOrphanTask")
204
    def run(self):
205
        """
206
        This function is called when delay is called. It will acquire a lock
207
        in redis, so those tasks are mutually exclusive
208
209
        Returns:
210
            str: success if everything is ok. Different messages if task is
211
            already running or exception is caught"""
212
213
        logger.info("%s started" % (self.name))
214
215
        # create a loop object
216
        loop = asyncio.new_event_loop()
217
218
        # execute stuff
219
        try:
220
            loop.run_until_complete(check_samples())
221
222
        finally:
223
            # close loop
224
            loop.close()
225
226
        # debug
227
        logger.info("%s completed" % (self.name))
228
229
        return "success"
230
231
232
def purge_orphan_samples():
233
    """A function to remove objects from OrphanSample table"""
234
235
    with requests.Session() as session:
236
        for orphan_sample in OrphanSample.objects.filter(
237
                ignore=False, removed=False):
238
239
            # define the url I need to check
240
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
241
242
            # read data from url
243
            response = session.get(url)
244
            data = response.json()
245
246
            # I need a new data dictionary to submit
247
            new_data = dict()
248
249
            # I suppose the accession exists, since I found this sample
250
            # using accession [biosample.id]
251
            new_data['accession'] = data.get(
252
                'accession', orphan_sample.biosample_id)
253
254
            new_data['alias'] = data['name']
255
256
            new_data['title'] = data['characteristics']['title'][0]['text']
257
258
            # this will be the most important attribute
259
            new_data['releaseDate'] = str(
260
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
261
262
            new_data['taxonId'] = data['taxId']
263
264
            # need to determine taxon as
265
            new_data['taxon'] = DictSpecie.objects.get(
266
                term__endswith=data['taxId']).label
267
268
            new_data['attributes'] = dict()
269
270
            # set project again
271
            new_data['attributes']["Project"] = format_attribute(
272
                value="IMAGE")
273
274
            # return new biosample data
275
            yield new_data
276
277
278
# register explicitly tasks
279
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
280
celery_app.tasks.register(CleanUpTask)
281
celery_app.tasks.register(SearchOrphanTask)
282