Completed
Push — master ( 55dedd...16663f )
by Paolo
30s queued 14s
created

biosample.tasks.cleanup.get_biosamples()   B

Complexity

Conditions 8

Size

Total Lines 78
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 78
rs 7.3333
c 0
b 0
f 0
cc 8
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
from itertools import islice
16
17
from datetime import timedelta
18
from celery.utils.log import get_task_logger
19
from django.utils import timezone
20
from django.utils.dateparse import parse_date
21
22
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
23
from common.helpers import format_attribute, send_mail_to_admins
24
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
25
from image.celery import app as celery_app
26
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
27
28
from ..helpers import get_manager_auth
29
from ..models import Submission, OrphanSample, ManagedTeam
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# defining constants. Clean biosample database data after
35
CLEANUP_DAYS = 30
36
37
# this is the timedelta which I want to add to relaseDate to remove samples
38
RELEASE_TIMEDELTA = timedelta(days=365*1000)
39
40
# Setting page size for biosample requests
41
PAGE_SIZE = 500
42
43
BIOSAMPLE_PARAMS = MultiDict([
44
    ('size', PAGE_SIZE),
45
    ('filter', 'attr:project:IMAGE'),
46
    ])
47
HEADERS = {
48
        'Accept': 'application/hal+json',
49
    }
50
51
# define the orphan queryset once
52
ORPHAN_QS = OrphanSample.objects.filter(
53
    ignore=False,
54
    removed=False,
55
    status=READY
56
)
57
58
59
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
60
    """Perform biosample.models cleanup by selecting old completed submission
61
    and remove them from database"""
62
63
    name = "Clean biosample models"
64
    description = """Clean biosample models"""
65
66
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
67
    def run(self):
68
        """
69
        This function is called when delay is called. It will acquire a lock
70
        in redis, so those tasks are mutually exclusive
71
72
        Returns:
73
            str: success if everything is ok. Different messages if task is
74
            already running or exception is caught"""
75
76
        logger.info("Clean biosample.database started")
77
78
        # get an interval starting from now
79
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
80
81
        # select all COMPLETED object older than interval
82
        qs = Submission.objects.filter(
83
            updated_at__lt=interval,
84
            status=COMPLETED)
85
86
        logger.info(
87
            "Deleting %s biosample.models.Submission objects" % qs.count())
88
89
        # delete all old objects
90
        qs.delete()
91
92
        # debug
93
        logger.info("Clean biosample.database completed")
94
95
        return "success"
96
97
98
async def parse_json(response, url):
99
    """Helper function to parse json data"""
100
101
    try:
102
        return await response.json()
103
104
    except aiohttp.client_exceptions.ContentTypeError as exc:
105
        logger.error(repr(exc))
106
        logger.warning(
107
            "error while getting data from %s" % url)
108
        return {}
109
110
111
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
112
    """
113
    Fetch a generic url, read data as json and return a promise
114
115
    Parameters
116
    ----------
117
    session : aiohttp.ClientSession
118
        an async session object.
119
    url : str, optional
120
        the desidered url. The default is BIOSAMPLE_URL.
121
    params : MultiDict, optional
122
        Additional params for request. The default is BIOSAMPLE_PARAMS.
123
124
    Returns
125
    -------
126
    dict
127
        json content of the page
128
129
    """
130
    """"""
131
132
    # define a URL with yarl
133
    url = URL(url)
134
    url = url.update_query(params)
135
136
    logger.debug(url)
137
138
    try:
139
        async with session.get(url, headers=HEADERS) as response:
140
            # try to read json data
141
            return await parse_json(response, url)
142
143
    except aiohttp.client_exceptions.ServerDisconnectedError as exc:
144
        logger.error(repr(exc))
145
        logger.warning(
146
            "server disconnected during %s" % url)
147
        return {}
148
149
150
async def filter_managed_biosamples(data, managed_domains):
151
    """
152
    Parse data from a BioSample results page and yield samples managed
153
    by InjectTool users.
154
155
    Parameters
156
    ----------
157
    data : dict
158
        biosample data read from BIOSAMPLE_URL.
159
    managed_domains : list
160
        A list of AAP domains, as returned from
161
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
162
163
    Yields
164
    ------
165
    sample : dict
166
        a BioSample record.
167
168
    """
169
    # get samples objects
170
    try:
171
        samples = data['_embedded']['samples']
172
173
        for sample in samples:
174
            # filter out unmanaged records
175
            if sample['domain'] not in managed_domains:
176
                logger.warning("Ignoring %s" % (sample['name']))
177
                continue
178
179
            # otherwise return to the caller the sample
180
            yield sample
181
182
    except KeyError as exc:
183
        # logger exception. With repr() the exception name is rendered
184
        logger.error(repr(exc))
185
        logger.warning("error while parsing samples")
186
        logger.warning(data)
187
188
189
async def get_biosamples(
190
        url=BIOSAMPLE_URL,
191
        params=BIOSAMPLE_PARAMS,
192
        managed_domains=[]):
193
    """
194
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
195
    once, determines how many pages to request and return only sample record
196
    managed by InjectTool
197
198
    Parameters
199
    ----------
200
    url : str, optional
201
        The desidered URL. The default is BIOSAMPLE_URL.
202
    params : MultiDict, optional
203
        Additional params for request. The default is BIOSAMPLE_PARAMS.
204
    managed_domains : list
205
        A list of AAP domains, as returned from
206
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
207
208
    Yields
209
    ------
210
    sample : dict
211
        a BioSample record.
212
213
    """
214
    # limiting the number of connections
215
    # https://docs.aiohttp.org/en/stable/client_advanced.html
216
    connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
217
218
    # https://stackoverflow.com/a/43857526
219
    async with aiohttp.ClientSession(connector=connector) as session:
220
        # get data for the first time to determine how many pages I have
221
        # to requests
222
        data = await fetch_url(session, url, params)
223
224
        # maybe the request had issues
225
        if data == {}:
226
            logger.debug("Got a result with no data")
227
            raise ConnectionError("Can't fetch biosamples for orphan samples")
228
229
        # process data and filter samples I own
230
        # https://stackoverflow.com/a/47378063
231
        async for sample in filter_managed_biosamples(data, managed_domains):
232
            # return a managed biosample record
233
            yield sample
234
235
        tasks = []
236
237
        # get pages
238
        totalPages = data['page']['totalPages']
239
240
        # generate new awaitable objects
241
        for page in range(1, totalPages):
242
            # get a new param object to edit
243
            my_params = params.copy()
244
245
            # edit a multidict object
246
            my_params.update(page=page)
247
248
            # track the new awaitable object
249
            tasks.append(fetch_url(session, url, my_params))
250
251
        # Run awaitable objects in the aws set concurrently.
252
        # Return an iterator of Future objects.
253
        for task in asyncio.as_completed(tasks):
254
            # read data
255
            data = await task
256
257
            # maybe the request had issues
258
            if data == {}:
259
                logger.debug("Got a result with no data")
260
                continue
261
262
            # process data and filter samples I own
263
            # https://stackoverflow.com/a/47378063
264
            async for sample in filter_managed_biosamples(
265
                    data, managed_domains):
266
                yield sample
267
268
269
async def check_samples():
270
    """
271
    Get all records from BioSamples submitted by the InjectTool manager auth
272
    managed domains, and call check_orphan_sample for each of them
273
274
    Returns
275
    -------
276
    None.
277
278
    """
279
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
280
    # belong to me
281
    auth = get_manager_auth()
282
    managed_domains = auth.get_domains()
283
284
    async for sample in get_biosamples(managed_domains=managed_domains):
285
        check_orphan_sample(sample)
286
287
288
def check_orphan_sample(sample):
289
    """
290
    Get a BioSample record and check if such BioSampleId is registered into
291
    InjectTool UID. If Such record is not present, create a new
292
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
293
    orphan table
294
295
    Parameters
296
    ----------
297
    sample : dict
298
        a BioSample record.
299
300
    Returns
301
    -------
302
    None.
303
304
    """
305
    animal_qs = UIDAnimal.objects.filter(
306
        biosample_id=sample['accession'])
307
308
    sample_qs = UIDSample.objects.filter(
309
        biosample_id=sample['accession'])
310
311
    if animal_qs.exists() or sample_qs.exists():
312
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
313
314
    else:
315
        # get a managed team
316
        team = ManagedTeam.objects.get(name=sample["domain"])
317
318
        # Create an orphan sample
319
        orphan, created = OrphanSample.objects.get_or_create(
320
            biosample_id=sample['accession'],
321
            name=sample['name'],
322
            team=team,
323
        )
324
325
        if created:
326
            logger.warning("Add %s to orphan samples" % sample['accession'])
327
328
            # set status for new object
329
            orphan.status = READY
330
            orphan.save()
331
332
333
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
334
    """Search accross biosamples for objects not present in UID"""
335
336
    name = "Search Orphan BioSamples IDs"
337
    description = """Track BioSamples IDs not present in UID"""
338
339
    @exclusive_task(
340
        task_name=name, lock_id="SearchOrphanTask")
341
    def run(self):
342
        """
343
        This function is called when delay is called. It will acquire a lock
344
        in redis, so those tasks are mutually exclusive
345
346
        Returns:
347
            str: success if everything is ok. Different messages if task is
348
            already running or exception is caught"""
349
350
        logger.info("%s started" % (self.name))
351
352
        # create a loop object
353
        loop = asyncio.new_event_loop()
354
355
        # execute stuff
356
        try:
357
            loop.run_until_complete(check_samples())
358
359
        finally:
360
            # close loop
361
            loop.close()
362
363
        # Ok count orphan samples with a query
364
        orphan_count = ORPHAN_QS.count()
365
366
        if orphan_count > 0:
367
            email_subject = "Some entries in BioSamples are orphan"
368
            email_message = (
369
                "There are %s biosample ids which are not managed by "
370
                "InjectTool" % orphan_count)
371
372
            logger.warning(email_message)
373
374
            # Notify admins if I have orphan samples
375
            send_mail_to_admins(email_subject, email_message)
376
377
        # debug
378
        logger.info("%s completed" % (self.name))
379
380
        return "success"
381
382
383
def get_orphan_samples(limit=None):
384
    """
385
    Iterate for all BioSample orphaned records which are not yet removed and
386
    are tracked for removal, get minimal data from BioSample and return a
387
    dictionary which can be used to patch a BioSample id with a new
388
    BioSample submission in order to remove a BioSamples record
389
    (publish the BioSample record after 1000 years from Now).
390
391
    Yields
392
    ------
393
    new_data : dict
394
        payload to submit to BioSample in order to remove a BioSamples record.
395
    """
396
397
    with requests.Session() as session:
398
        # get all biosamples candidate for a removal. Pay attention that
399
        # could be removed from different users
400
        qs = ORPHAN_QS.order_by('team__name', 'id')
401
402
        if limit:
403
            qs = islice(qs, limit)
404
405
        for orphan_sample in qs:
406
            # define the url I need to check
407
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
408
409
            # read data from url
410
            response = session.get(url)
411
            data = response.json()
412
413
            # check status
414
            if response.status_code == 403:
415
                logger.error("Error for %s (%s): %s" % (
416
                    orphan_sample.biosample_id,
417
                    data['error'],
418
                    data['message'])
419
                )
420
421
                # this sample seems already removed
422
                continue
423
424
            # I need a new data dictionary to submit
425
            new_data = dict()
426
427
            # I suppose the accession exists, since I found this sample
428
            # using accession [biosample.id]
429
            new_data['accession'] = data.get(
430
                'accession', orphan_sample.biosample_id)
431
432
            new_data['alias'] = data['name']
433
434
            new_data['title'] = data['characteristics']['title'][0]['text']
435
436
            # this will be the most important attribute
437
            new_data['releaseDate'] = str(
438
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
439
440
            new_data['taxonId'] = data['taxId']
441
442
            # need to determine taxon as
443
            new_data['taxon'] = DictSpecie.objects.get(
444
                term__endswith=data['taxId']).label
445
446
            new_data['attributes'] = dict()
447
448
            new_data['description'] = "Removed by InjectTool"
449
450
            # set project again
451
            new_data['attributes']["Project"] = format_attribute(
452
                value="IMAGE")
453
454
            # return new biosample data
455
            yield {
456
                'data': new_data,
457
                'team': orphan_sample.team,
458
                'sample': orphan_sample,
459
            }
460
461
462
# register explicitly tasks
463
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
464
celery_app.tasks.register(CleanUpTask)
465
celery_app.tasks.register(SearchOrphanTask)
466