biosample.tasks.cleanup.get_orphan_samples()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 76
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 33
dl 0
loc 76
rs 8.6213
c 0
b 0
f 0
cc 5
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
import typing
13
14
from yarl import URL
15
from multidict import MultiDict
16
from itertools import islice
17
18
from datetime import timedelta
19
from celery.utils.log import get_task_logger
20
from django.utils import timezone
21
from django.utils.dateparse import parse_date
22
23
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
24
from common.helpers import format_attribute, send_mail_to_admins
25
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
26
from image.celery import app as celery_app
27
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
28
29
from ..helpers import get_manager_auth
30
from ..models import Submission, OrphanSample, ManagedTeam
31
32
# Get an instance of a logger
33
logger = get_task_logger(__name__)
34
35
# defining constants. Clean biosample database data after
36
CLEANUP_DAYS = 30
37
38
# this is the timedelta which I want to add to relaseDate to remove samples
39
RELEASE_TIMEDELTA = timedelta(days=365*1000)
40
41
# Setting page size for biosample requests
42
PAGE_SIZE = 500
43
44
# a custom BIOSAMPLE URL with yarl
45
# ie. https://wwwdev.ebi.ac.uk/biosamples
46
BIOSAMPLE_BASE_URL = URL(BIOSAMPLE_URL).parent
47
48
BIOSAMPLE_SAMPLE_ENDPOINT = BIOSAMPLE_BASE_URL / "samples"
49
BIOSAMPLE_ACCESSION_ENDPOINT = BIOSAMPLE_BASE_URL / "accessions"
50
51
BIOSAMPLE_PARAMS = MultiDict([
52
    ('size', PAGE_SIZE),
53
    ('filter', 'attr:project:IMAGE'),
54
    ])
55
HEADERS = {
56
        'Accept': 'application/hal+json',
57
    }
58
59
# define the orphan queryset once
60
ORPHAN_QS = OrphanSample.objects.filter(
61
    ignore=False,
62
    removed=False,
63
    status=READY
64
)
65
66
67
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
68
    """Perform biosample.models cleanup by selecting old completed submission
69
    and remove them from database"""
70
71
    name = "Clean biosample models"
72
    description = """Clean biosample models"""
73
74
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
75
    def run(self):
76
        """
77
        This function is called when delay is called. It will acquire a lock
78
        in redis, so those tasks are mutually exclusive
79
80
        Returns:
81
            str: success if everything is ok. Different messages if task is
82
            already running or exception is caught"""
83
84
        logger.info("Clean biosample.database started")
85
86
        # get an interval starting from now
87
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
88
89
        # select all COMPLETED object older than interval
90
        qs = Submission.objects.filter(
91
            updated_at__lt=interval,
92
            status=COMPLETED)
93
94
        logger.info(
95
            "Deleting %s biosample.models.Submission objects" % qs.count())
96
97
        # delete all old objects
98
        qs.delete()
99
100
        # debug
101
        logger.info("Clean biosample.database completed")
102
103
        return "success"
104
105
106
async def parse_json(response, url):
107
    """Helper function to parse json data"""
108
109
    try:
110
        return await response.json()
111
112
    except aiohttp.client_exceptions.ContentTypeError as exc:
113
        logger.error(repr(exc))
114
        logger.warning(
115
            "error while getting data from %s" % url)
116
        return {}
117
118
119
async def fetch_url(
120
        session, url, params=BIOSAMPLE_PARAMS, headers=None
121
        ) -> typing.Awaitable[dict]:
122
    """
123
    Fetch a generic url, read data as json and return a promise
124
125
    Parameters
126
    ----------
127
    session : aiohttp.ClientSession
128
        an async session object.
129
    url : str
130
        the desidered url
131
    params : MultiDict, optional
132
        Additional params for request. The default is BIOSAMPLE_PARAMS.
133
    headers : dict
134
        Additional HEADER information
135
136
    Returns
137
    -------
138
    typing.Awaitable[dict]
139
        json content of the page
140
    """
141
142
    # update URL params with yarl
143
    url = url.update_query(params)
144
145
    logger.debug(url)
146
147
    try:
148
        async with session.get(url, headers=headers) as response:
149
            # try to read json data
150
            return await parse_json(response, url)
151
152
    except aiohttp.client_exceptions.ServerDisconnectedError as exc:
153
        logger.error(repr(exc))
154
        logger.warning(
155
            "server disconnected during %s" % url)
156
        return {}
157
158
159
async def fecth_biosample(
160
        session: aiohttp.ClientSession,
161
        accession: str,
162
        base_url: URL = BIOSAMPLE_SAMPLE_ENDPOINT,
163
        headers: dict = HEADERS) -> typing.Awaitable[dict]:
164
    """
165
    Collect a single BioSample object from EBI
166
167
    Parameters
168
    ----------
169
    session : aiohttp.ClientSession
170
        an async session object.
171
    accession : str
172
        a BioSample accession ID.
173
    base_url : URL, optional
174
        DESCRIPTION. The default is BIOSAMPLE_BASE_URL.
175
    headers : dict, optional
176
        DESCRIPTION. The default is HEADERS.
177
178
    Returns
179
    -------
180
    typing.Awaitable[dict]
181
        A BioSample dictionary object
182
    """
183
184
    # define sample location
185
    url = base_url / accession
186
187
    return await fetch_url(session, url, None, headers)
188
189
190
async def filter_managed_biosamples(
191
        session: aiohttp.ClientSession,
192
        data: dict,
193
        managed_domains: list):
194
    """
195
    Parse data from a BioSample results page and yield samples managed
196
    by InjectTool users.
197
198
    Parameters
199
    ----------
200
    session : aiohttp.ClientSession
201
        an async session object.
202
    data : dict
203
        biosample data read from BIOSAMPLE_URL.
204
    managed_domains : list
205
        A list of AAP domains, as returned from
206
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
207
208
    Yields
209
    ------
210
    sample : dict
211
        a BioSample record.
212
213
    """
214
    tasks = []
215
216
    # get samples objects
217
    try:
218
        accessions = data['_embedded']['accessions']
219
220
    except KeyError as exc:
221
        # logger exception. With repr() the exception name is rendered
222
        logger.error(repr(exc))
223
        logger.warning("error while parsing accessions")
224
        logger.warning(data)
225
226
    else:
227
        for accession in accessions:
228
            tasks.append(fecth_biosample(session, accession))
229
230
        for task in asyncio.as_completed(tasks):
231
            # read data
232
            sample = await task
233
234
            # filter out unmanaged records
235
            if sample['domain'] not in managed_domains:
236
                logger.warning("Ignoring %s (%s)" % (
237
                    sample['name'], sample['accession']))
238
                continue
239
240
            # otherwise return to the caller the sample
241
            yield sample
242
243
244
async def get_biosamples(
245
        url=BIOSAMPLE_ACCESSION_ENDPOINT,
246
        params=BIOSAMPLE_PARAMS,
247
        managed_domains=[]):
248
    """
249
    Get all samples from BioSamples for the IMAGE project. Fecth Biosample
250
    once, determines how many pages to request and return only accession
251
    records managed by InjectTool
252
253
    Parameters
254
    ----------
255
    url : str, optional
256
        The desidered URL. The default is BIOSAMPLE_ACCESSION_ENDPOINT.
257
    params : MultiDict, optional
258
        Additional params for request. The default is BIOSAMPLE_PARAMS.
259
    managed_domains : list
260
        A list of AAP domains, as returned from
261
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
262
263
    Yields
264
    ------
265
    sample : dict
266
        a BioSample record.
267
268
    """
269
    # limiting the number of connections
270
    # https://docs.aiohttp.org/en/stable/client_advanced.html
271
    connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
272
273
    # https://stackoverflow.com/a/43857526
274
    async with aiohttp.ClientSession(connector=connector) as session:
275
        # get data for the first time to determine how many pages I have
276
        # to requests
277
        data = await fetch_url(session, url, params)
278
279
        # maybe the request had issues
280
        if data == {}:
281
            logger.debug("Got a result with no data")
282
            raise ConnectionError("Can't fetch biosamples for orphan samples")
283
284
        # process data and filter samples I own
285
        # https://stackoverflow.com/a/47378063
286
        async for sample in filter_managed_biosamples(
287
                session, data, managed_domains):
288
            # return a managed biosample record
289
            yield sample
290
291
        tasks = []
292
293
        # get pages
294
        totalPages = data['page']['totalPages']
295
296
        # generate new awaitable objects
297
        for page in range(1, totalPages):
298
            # get a new param object to edit
299
            my_params = params.copy()
300
301
            # edit a multidict object
302
            my_params.update(page=page)
303
304
            # track the new awaitable object
305
            tasks.append(fetch_url(session, url, my_params))
306
307
        # Run awaitable objects in the aws set concurrently.
308
        # Return an iterator of Future objects.
309
        for task in asyncio.as_completed(tasks):
310
            # read data
311
            data = await task
312
313
            # maybe the request had issues
314
            if data == {}:
315
                logger.debug("Got a result with no data")
316
                continue
317
318
            # process data and filter samples I own
319
            # https://stackoverflow.com/a/47378063
320
            async for sample in filter_managed_biosamples(
321
                    session, data, managed_domains):
322
                yield sample
323
324
325
async def check_samples():
326
    """
327
    Get all records from BioSamples submitted by the InjectTool manager auth
328
    managed domains, and call check_orphan_sample for each of them
329
330
    Returns
331
    -------
332
    None.
333
334
    """
335
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
336
    # belong to me
337
    auth = get_manager_auth()
338
    managed_domains = auth.get_domains()
339
340
    async for sample in get_biosamples(managed_domains=managed_domains):
341
        check_orphan_sample(sample)
342
343
344
def check_orphan_sample(sample):
345
    """
346
    Get a BioSample record and check if such BioSampleId is registered into
347
    InjectTool UID. If Such record is not present, create a new
348
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
349
    orphan table
350
351
    Parameters
352
    ----------
353
    sample : dict
354
        a BioSample record.
355
356
    Returns
357
    -------
358
    None.
359
360
    """
361
    animal_qs = UIDAnimal.objects.filter(
362
        biosample_id=sample['accession'])
363
364
    sample_qs = UIDSample.objects.filter(
365
        biosample_id=sample['accession'])
366
367
    if animal_qs.exists() or sample_qs.exists():
368
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
369
370
    else:
371
        # get a managed team
372
        team = ManagedTeam.objects.get(name=sample["domain"])
373
374
        # Create an orphan sample
375
        orphan, created = OrphanSample.objects.get_or_create(
376
            biosample_id=sample['accession'],
377
            name=sample['name'],
378
            team=team,
379
        )
380
381
        if created:
382
            logger.warning("Add %s to orphan samples" % sample['accession'])
383
384
            # set status for new object
385
            orphan.status = READY
386
            orphan.save()
387
388
389
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
390
    """Search accross biosamples for objects not present in UID"""
391
392
    name = "Search Orphan BioSamples IDs"
393
    description = """Track BioSamples IDs not present in UID"""
394
395
    @exclusive_task(
396
        task_name=name, lock_id="SearchOrphanTask")
397
    def run(self):
398
        """
399
        This function is called when delay is called. It will acquire a lock
400
        in redis, so those tasks are mutually exclusive
401
402
        Returns:
403
            str: success if everything is ok. Different messages if task is
404
            already running or exception is caught"""
405
406
        logger.info("%s started" % (self.name))
407
408
        # create a loop object
409
        loop = asyncio.new_event_loop()
410
411
        # execute stuff
412
        try:
413
            loop.run_until_complete(check_samples())
414
415
        finally:
416
            # close loop
417
            loop.close()
418
419
        # Ok count orphan samples with a query
420
        orphan_count = ORPHAN_QS.count()
421
422
        if orphan_count > 0:
423
            email_subject = "Some entries in BioSamples are orphan"
424
            email_message = (
425
                "There are %s biosample ids which are not managed by "
426
                "InjectTool" % orphan_count)
427
428
            logger.warning(email_message)
429
430
            # Notify admins if I have orphan samples
431
            send_mail_to_admins(email_subject, email_message)
432
433
        # debug
434
        logger.info("%s completed" % (self.name))
435
436
        return "success"
437
438
439
def get_orphan_samples(limit=None):
440
    """
441
    Iterate for all BioSample orphaned records which are not yet removed and
442
    are tracked for removal, get minimal data from BioSample and return a
443
    dictionary which can be used to patch a BioSample id with a new
444
    BioSample submission in order to remove a BioSamples record
445
    (publish the BioSample record after 1000 years from Now).
446
447
    Yields
448
    ------
449
    new_data : dict
450
        payload to submit to BioSample in order to remove a BioSamples record.
451
    """
452
453
    with requests.Session() as session:
454
        # get all biosamples candidate for a removal. Pay attention that
455
        # could be removed from different users
456
        qs = ORPHAN_QS.order_by('team__name', 'id')
457
458
        if limit:
459
            qs = islice(qs, limit)
460
461
        for orphan_sample in qs:
462
            # define the url I need to check
463
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
464
465
            # read data from url
466
            response = session.get(url)
467
            data = response.json()
468
469
            # check status
470
            if response.status_code == 403:
471
                logger.error("Error for %s (%s): %s" % (
472
                    orphan_sample.biosample_id,
473
                    data['error'],
474
                    data['message'])
475
                )
476
477
                # this sample seems already removed
478
                continue
479
480
            # I need a new data dictionary to submit
481
            new_data = dict()
482
483
            # I suppose the accession exists, since I found this sample
484
            # using accession [biosample.id]
485
            new_data['accession'] = data.get(
486
                'accession', orphan_sample.biosample_id)
487
488
            new_data['alias'] = data['name']
489
490
            new_data['title'] = data['characteristics']['title'][0]['text']
491
492
            # this will be the most important attribute
493
            new_data['releaseDate'] = str(
494
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
495
496
            new_data['taxonId'] = data['taxId']
497
498
            # need to determine taxon as
499
            new_data['taxon'] = DictSpecie.objects.get(
500
                term__endswith=data['taxId']).label
501
502
            new_data['attributes'] = dict()
503
504
            new_data['description'] = "Removed by InjectTool"
505
506
            # set project again
507
            new_data['attributes']["Project"] = format_attribute(
508
                value="IMAGE")
509
510
            # return new biosample data
511
            yield {
512
                'data': new_data,
513
                'team': orphan_sample.team,
514
                'sample': orphan_sample,
515
            }
516
517
518
# register explicitly tasks
519
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
520
celery_app.tasks.register(CleanUpTask)
521
celery_app.tasks.register(SearchOrphanTask)
522