Completed
Pull Request — devel (#90)
by Paolo
06:18
created

biosample.tasks.cleanup.SearchOrphanTask.run()   A

Complexity

Conditions 2

Size

Total Lines 42
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 42
rs 9.5
c 0
b 0
f 0
cc 2
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
16
from datetime import timedelta
17
from celery.utils.log import get_task_logger
18
from django.utils import timezone
19
from django.utils.dateparse import parse_date
20
21
from common.constants import COMPLETED, BIOSAMPLE_URL
22
from common.helpers import format_attribute, send_mail_to_admins
23
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
24
from image.celery import app as celery_app
25
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
26
27
from ..helpers import get_manager_auth
28
from ..models import Submission, OrphanSample, ManagedTeam
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# defining constants. Clean biosample database data after
34
CLEANUP_DAYS = 30
35
36
# this is the timedelta which I want to add to relaseDate to remove samples
37
RELEASE_TIMEDELTA = timedelta(days=365*1000)
38
39
# Setting page size for biosample requests
40
PAGE_SIZE = 20
41
42
BIOSAMPLE_PARAMS = MultiDict([
43
    ('size', PAGE_SIZE),
44
    ('filter', 'attr:project:IMAGE'),
45
    ])
46
HEADERS = {
47
        'Accept': 'application/hal+json',
48
    }
49
50
51
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
52
    """Perform biosample.models cleanup by selecting old completed submission
53
    and remove them from database"""
54
55
    name = "Clean biosample models"
56
    description = """Clean biosample models"""
57
58
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
59
    def run(self):
60
        """
61
        This function is called when delay is called. It will acquire a lock
62
        in redis, so those tasks are mutually exclusive
63
64
        Returns:
65
            str: success if everything is ok. Different messages if task is
66
            already running or exception is caught"""
67
68
        logger.info("Clean biosample.database started")
69
70
        # get an interval starting from now
71
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
72
73
        # select all COMPLETED object older than interval
74
        qs = Submission.objects.filter(
75
            updated_at__lt=interval,
76
            status=COMPLETED)
77
78
        logger.info(
79
            "Deleting %s biosample.models.Submission objects" % qs.count())
80
81
        # delete all old objects
82
        qs.delete()
83
84
        # debug
85
        logger.info("Clean biosample.database completed")
86
87
        return "success"
88
89
90
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
91
    """
92
    Fetch a generic url, read data as json and return a promise
93
94
    Parameters
95
    ----------
96
    session : aiohttp.ClientSession
97
        an async session object.
98
    url : str, optional
99
        the desidered url. The default is BIOSAMPLE_URL.
100
    params : MultiDict, optional
101
        Additional params for request. The default is BIOSAMPLE_PARAMS.
102
103
    Returns
104
    -------
105
    dict
106
        json content of the page
107
108
    """
109
    """"""
110
111
    # define a URL with yarl
112
    url = URL(url)
113
    url = url.update_query(params)
114
115
    async with session.get(url, headers=HEADERS) as response:
116
        return await response.json()
117
118
119
async def filter_managed_biosamples(data, managed_domains):
120
    """
121
    Parse data from a BioSample results page and yield samples managed
122
    by InjectTool users.
123
124
    Parameters
125
    ----------
126
    data : dict
127
        biosample data read from BIOSAMPLE_URL.
128
    managed_domains : list
129
        A list of AAP domains, as returned from
130
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
131
132
    Yields
133
    ------
134
    sample : dict
135
        a BioSample record.
136
137
    """
138
    # get samples objects
139
    try:
140
        samples = data['_embedded']['samples']
141
142
        for sample in samples:
143
            # filter out unmanaged records
144
            if sample['domain'] not in managed_domains:
145
                logger.warning("Ignoring %s" % (sample['name']))
146
                continue
147
148
            # otherwise return to the caller the sample
149
            yield sample
150
151
    except KeyError as exc:
152
        # logger exception. With repr() the exception name is rendered
153
        logger.error(repr(exc))
154
        logger.warning("error while parsing samples")
155
        logger.warning(data)
156
157
158
async def get_biosamples(
159
        url=BIOSAMPLE_URL,
160
        params=BIOSAMPLE_PARAMS,
161
        managed_domains=[]):
162
    """
163
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
164
    once, determines how many pages to request and return only sample record
165
    managed by InjectTool
166
167
    Parameters
168
    ----------
169
    url : str, optional
170
        The desidered URL. The default is BIOSAMPLE_URL.
171
    params : MultiDict, optional
172
        Additional params for request. The default is BIOSAMPLE_PARAMS.
173
    managed_domains : list
174
        A list of AAP domains, as returned from
175
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
176
177
    Yields
178
    ------
179
    sample : dict
180
        a BioSample record.
181
182
    """
183
    async with aiohttp.ClientSession() as session:
184
        # get data for the first time to determine how many pages I have
185
        # to requests
186
        data = await fetch_url(session, url, params)
187
188
        # process data and filter samples I own
189
        # https://stackoverflow.com/a/47378063
190
        async for sample in filter_managed_biosamples(data, managed_domains):
191
            # return a managed biosample record
192
            yield sample
193
194
        tasks = []
195
196
        # get pages
197
        totalPages = data['page']['totalPages']
198
199
        # generate new awaitable objects
200
        for page in range(1, totalPages):
201
            # get a new param object to edit
202
            my_params = params.copy()
203
204
            # edit a multidict object
205
            my_params.update(page=page)
206
207
            # track the new awaitable object
208
            tasks.append(fetch_url(session, url, my_params))
209
210
        # Run awaitable objects in the aws set concurrently.
211
        # Return an iterator of Future objects.
212
        for task in asyncio.as_completed(tasks):
213
            # read data
214
            data = await task
215
216
            # process data and filter samples I own
217
            # https://stackoverflow.com/a/47378063
218
            async for sample in filter_managed_biosamples(
219
                    data, managed_domains):
220
                yield sample
221
222
223
async def check_samples():
224
    """
225
    Get all records from BioSamples submitted by the InjectTool manager auth
226
    managed domains, and call check_orphan_sample for each of them
227
228
    Returns
229
    -------
230
    None.
231
232
    """
233
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
234
    # belong to me
235
    auth = get_manager_auth()
236
    managed_domains = auth.get_domains()
237
238
    async for sample in get_biosamples(managed_domains=managed_domains):
239
        check_orphan_sample(sample)
240
241
242
def check_orphan_sample(sample):
243
    """
244
    Get a BioSample record and check if such BioSampleId is registered into
245
    InjectTool UID. If Such record is not present, create a new
246
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
247
    orphan table
248
249
    Parameters
250
    ----------
251
    sample : dict
252
        a BioSample record.
253
254
    Returns
255
    -------
256
    None.
257
258
    """
259
    animal_qs = UIDAnimal.objects.filter(
260
        biosample_id=sample['accession'])
261
262
    sample_qs = UIDSample.objects.filter(
263
        biosample_id=sample['accession'])
264
265
    if animal_qs.exists() or sample_qs.exists():
266
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
267
268
    else:
269
        # get a managed team
270
        team = ManagedTeam.objects.get(name=sample["domain"])
271
272
        # Create an orphan sample
273
        orphan, created = OrphanSample.objects.get_or_create(
274
            biosample_id=sample['accession'],
275
            name=sample['name'],
276
            team=team)
277
278
        if created:
279
            logger.warning("Add %s to orphan samples" % sample['accession'])
280
281
282
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
283
    """Search accross biosamples for objects not present in UID"""
284
285
    name = "Search Orphan BioSamples IDs"
286
    description = """Track BioSamples IDs not present in UID"""
287
288
    @exclusive_task(
289
        task_name=name, lock_id="SearchOrphanTask")
290
    def run(self):
291
        """
292
        This function is called when delay is called. It will acquire a lock
293
        in redis, so those tasks are mutually exclusive
294
295
        Returns:
296
            str: success if everything is ok. Different messages if task is
297
            already running or exception is caught"""
298
299
        logger.info("%s started" % (self.name))
300
301
        # create a loop object
302
        loop = asyncio.new_event_loop()
303
304
        # execute stuff
305
        try:
306
            loop.run_until_complete(check_samples())
307
308
        finally:
309
            # close loop
310
            loop.close()
311
312
        # Ok count orphan samples
313
        orphan_count = sum(1 for orphan in get_orphan_samples())
314
315
        if orphan_count > 0:
316
            email_subject = "Some entries in BioSamples are orphan"
317
            email_message = (
318
                "There are %s biosample ids which are not managed by "
319
                "InjectTool" % orphan_count)
320
321
            logger.warning(email_message)
322
323
            # Notify admins if I have orphan samples
324
            send_mail_to_admins(email_subject, email_message)
325
326
        # debug
327
        logger.info("%s completed" % (self.name))
328
329
        return "success"
330
331
332
def get_orphan_samples():
333
    """
334
    Iterate for all BioSample orphaned records which are not yet removed and
335
    are tracked for removal, get minimal data from BioSample and return a
336
    dictionary which can be used to patch a BioSample id with a new
337
    BioSample submission in order to remove a BioSamples record
338
    (publish the BioSample record after 1000 years from Now).
339
340
    Yields
341
    ------
342
    new_data : dict
343
        payload to submit to BioSample in order to remove a BioSamples record.
344
    """
345
346
    with requests.Session() as session:
347
        for orphan_sample in OrphanSample.objects.filter(
348
                ignore=False, removed=False).order_by('team__name'):
349
350
            # define the url I need to check
351
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
352
353
            # read data from url
354
            response = session.get(url)
355
            data = response.json()
356
357
            # I need a new data dictionary to submit
358
            new_data = dict()
359
360
            # I suppose the accession exists, since I found this sample
361
            # using accession [biosample.id]
362
            new_data['accession'] = data.get(
363
                'accession', orphan_sample.biosample_id)
364
365
            new_data['alias'] = data['name']
366
367
            new_data['title'] = data['characteristics']['title'][0]['text']
368
369
            # this will be the most important attribute
370
            new_data['releaseDate'] = str(
371
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
372
373
            new_data['taxonId'] = data['taxId']
374
375
            # need to determine taxon as
376
            new_data['taxon'] = DictSpecie.objects.get(
377
                term__endswith=data['taxId']).label
378
379
            new_data['attributes'] = dict()
380
381
            new_data['description'] = "Removed by InjectTool"
382
383
            # set project again
384
            new_data['attributes']["Project"] = format_attribute(
385
                value="IMAGE")
386
387
            # return new biosample data
388
            yield {
389
                'data': new_data,
390
                'team': orphan_sample.team
391
            }
392
393
394
# register explicitly tasks
395
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
396
celery_app.tasks.register(CleanUpTask)
397
celery_app.tasks.register(SearchOrphanTask)
398