Passed
Pull Request — devel (#90)
by Paolo
07:13
created

biosample.tasks.cleanup.get_biosamples()   B

Complexity

Conditions 8

Size

Total Lines 78
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 78
rs 7.3333
c 0
b 0
f 0
cc 8
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
from itertools import islice
16
17
from datetime import timedelta
18
from celery.utils.log import get_task_logger
19
from django.utils import timezone
20
from django.utils.dateparse import parse_date
21
22
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
23
from common.helpers import format_attribute, send_mail_to_admins
24
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
25
from image.celery import app as celery_app
26
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
27
28
from ..helpers import get_manager_auth
29
from ..models import Submission, OrphanSample, ManagedTeam
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# defining constants. Clean biosample database data after
35
CLEANUP_DAYS = 30
36
37
# this is the timedelta which I want to add to relaseDate to remove samples
38
RELEASE_TIMEDELTA = timedelta(days=365*1000)
39
40
# Setting page size for biosample requests
41
PAGE_SIZE = 500
42
43
BIOSAMPLE_PARAMS = MultiDict([
44
    ('size', PAGE_SIZE),
45
    ('filter', 'attr:project:IMAGE'),
46
    ])
47
HEADERS = {
48
        'Accept': 'application/hal+json',
49
    }
50
51
52
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
53
    """Perform biosample.models cleanup by selecting old completed submission
54
    and remove them from database"""
55
56
    name = "Clean biosample models"
57
    description = """Clean biosample models"""
58
59
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
60
    def run(self):
61
        """
62
        This function is called when delay is called. It will acquire a lock
63
        in redis, so those tasks are mutually exclusive
64
65
        Returns:
66
            str: success if everything is ok. Different messages if task is
67
            already running or exception is caught"""
68
69
        logger.info("Clean biosample.database started")
70
71
        # get an interval starting from now
72
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
73
74
        # select all COMPLETED object older than interval
75
        qs = Submission.objects.filter(
76
            updated_at__lt=interval,
77
            status=COMPLETED)
78
79
        logger.info(
80
            "Deleting %s biosample.models.Submission objects" % qs.count())
81
82
        # delete all old objects
83
        qs.delete()
84
85
        # debug
86
        logger.info("Clean biosample.database completed")
87
88
        return "success"
89
90
91
async def parse_json(response, url):
92
    """Helper function to parse json data"""
93
94
    try:
95
        return await response.json()
96
97
    except aiohttp.client_exceptions.ContentTypeError as exc:
98
        logger.error(repr(exc))
99
        logger.warning(
100
            "error while getting data from %s" % url)
101
        return {}
102
103
104
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
105
    """
106
    Fetch a generic url, read data as json and return a promise
107
108
    Parameters
109
    ----------
110
    session : aiohttp.ClientSession
111
        an async session object.
112
    url : str, optional
113
        the desidered url. The default is BIOSAMPLE_URL.
114
    params : MultiDict, optional
115
        Additional params for request. The default is BIOSAMPLE_PARAMS.
116
117
    Returns
118
    -------
119
    dict
120
        json content of the page
121
122
    """
123
    """"""
124
125
    # define a URL with yarl
126
    url = URL(url)
127
    url = url.update_query(params)
128
129
    logger.debug(url)
130
131
    try:
132
        async with session.get(url, headers=HEADERS) as response:
133
            # try to read json data
134
            return await parse_json(response, url)
135
136
    except aiohttp.client_exceptions.ServerDisconnectedError as exc:
137
        logger.error(repr(exc))
138
        logger.warning(
139
            "server disconnected during %s" % url)
140
        return {}
141
142
143
async def filter_managed_biosamples(data, managed_domains):
144
    """
145
    Parse data from a BioSample results page and yield samples managed
146
    by InjectTool users.
147
148
    Parameters
149
    ----------
150
    data : dict
151
        biosample data read from BIOSAMPLE_URL.
152
    managed_domains : list
153
        A list of AAP domains, as returned from
154
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
155
156
    Yields
157
    ------
158
    sample : dict
159
        a BioSample record.
160
161
    """
162
    # get samples objects
163
    try:
164
        samples = data['_embedded']['samples']
165
166
        for sample in samples:
167
            # filter out unmanaged records
168
            if sample['domain'] not in managed_domains:
169
                logger.warning("Ignoring %s" % (sample['name']))
170
                continue
171
172
            # otherwise return to the caller the sample
173
            yield sample
174
175
    except KeyError as exc:
176
        # logger exception. With repr() the exception name is rendered
177
        logger.error(repr(exc))
178
        logger.warning("error while parsing samples")
179
        logger.warning(data)
180
181
182
async def get_biosamples(
183
        url=BIOSAMPLE_URL,
184
        params=BIOSAMPLE_PARAMS,
185
        managed_domains=[]):
186
    """
187
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
188
    once, determines how many pages to request and return only sample record
189
    managed by InjectTool
190
191
    Parameters
192
    ----------
193
    url : str, optional
194
        The desidered URL. The default is BIOSAMPLE_URL.
195
    params : MultiDict, optional
196
        Additional params for request. The default is BIOSAMPLE_PARAMS.
197
    managed_domains : list
198
        A list of AAP domains, as returned from
199
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
200
201
    Yields
202
    ------
203
    sample : dict
204
        a BioSample record.
205
206
    """
207
    # limiting the number of connections
208
    # https://docs.aiohttp.org/en/stable/client_advanced.html
209
    connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
210
211
    # https://stackoverflow.com/a/43857526
212
    async with aiohttp.ClientSession(connector=connector) as session:
213
        # get data for the first time to determine how many pages I have
214
        # to requests
215
        data = await fetch_url(session, url, params)
216
217
        # maybe the request had issues
218
        if data == {}:
219
            logger.debug("Got a result with no data")
220
            raise ConnectionError("Can't fetch biosamples for orphan samples")
221
222
        # process data and filter samples I own
223
        # https://stackoverflow.com/a/47378063
224
        async for sample in filter_managed_biosamples(data, managed_domains):
225
            # return a managed biosample record
226
            yield sample
227
228
        tasks = []
229
230
        # get pages
231
        totalPages = data['page']['totalPages']
232
233
        # generate new awaitable objects
234
        for page in range(1, totalPages):
235
            # get a new param object to edit
236
            my_params = params.copy()
237
238
            # edit a multidict object
239
            my_params.update(page=page)
240
241
            # track the new awaitable object
242
            tasks.append(fetch_url(session, url, my_params))
243
244
        # Run awaitable objects in the aws set concurrently.
245
        # Return an iterator of Future objects.
246
        for task in asyncio.as_completed(tasks):
247
            # read data
248
            data = await task
249
250
            # maybe the request had issues
251
            if data == {}:
252
                logger.debug("Got a result with no data")
253
                continue
254
255
            # process data and filter samples I own
256
            # https://stackoverflow.com/a/47378063
257
            async for sample in filter_managed_biosamples(
258
                    data, managed_domains):
259
                yield sample
260
261
262
async def check_samples():
263
    """
264
    Get all records from BioSamples submitted by the InjectTool manager auth
265
    managed domains, and call check_orphan_sample for each of them
266
267
    Returns
268
    -------
269
    None.
270
271
    """
272
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
273
    # belong to me
274
    auth = get_manager_auth()
275
    managed_domains = auth.get_domains()
276
277
    async for sample in get_biosamples(managed_domains=managed_domains):
278
        check_orphan_sample(sample)
279
280
281
def check_orphan_sample(sample):
282
    """
283
    Get a BioSample record and check if such BioSampleId is registered into
284
    InjectTool UID. If Such record is not present, create a new
285
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
286
    orphan table
287
288
    Parameters
289
    ----------
290
    sample : dict
291
        a BioSample record.
292
293
    Returns
294
    -------
295
    None.
296
297
    """
298
    animal_qs = UIDAnimal.objects.filter(
299
        biosample_id=sample['accession'])
300
301
    sample_qs = UIDSample.objects.filter(
302
        biosample_id=sample['accession'])
303
304
    if animal_qs.exists() or sample_qs.exists():
305
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
306
307
    else:
308
        # get a managed team
309
        team = ManagedTeam.objects.get(name=sample["domain"])
310
311
        # Create an orphan sample
312
        orphan, created = OrphanSample.objects.get_or_create(
313
            biosample_id=sample['accession'],
314
            name=sample['name'],
315
            team=team,
316
            status=READY,
317
        )
318
319
        if created:
320
            logger.warning("Add %s to orphan samples" % sample['accession'])
321
322
323
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
324
    """Search accross biosamples for objects not present in UID"""
325
326
    name = "Search Orphan BioSamples IDs"
327
    description = """Track BioSamples IDs not present in UID"""
328
329
    @exclusive_task(
330
        task_name=name, lock_id="SearchOrphanTask")
331
    def run(self):
332
        """
333
        This function is called when delay is called. It will acquire a lock
334
        in redis, so those tasks are mutually exclusive
335
336
        Returns:
337
            str: success if everything is ok. Different messages if task is
338
            already running or exception is caught"""
339
340
        logger.info("%s started" % (self.name))
341
342
        # create a loop object
343
        loop = asyncio.new_event_loop()
344
345
        # execute stuff
346
        try:
347
            loop.run_until_complete(check_samples())
348
349
        finally:
350
            # close loop
351
            loop.close()
352
353
        # Ok count orphan samples
354
        orphan_count = sum(1 for orphan in get_orphan_samples())
355
356
        if orphan_count > 0:
357
            email_subject = "Some entries in BioSamples are orphan"
358
            email_message = (
359
                "There are %s biosample ids which are not managed by "
360
                "InjectTool" % orphan_count)
361
362
            logger.warning(email_message)
363
364
            # Notify admins if I have orphan samples
365
            send_mail_to_admins(email_subject, email_message)
366
367
        # debug
368
        logger.info("%s completed" % (self.name))
369
370
        return "success"
371
372
373
def get_orphan_samples(limit=None):
374
    """
375
    Iterate for all BioSample orphaned records which are not yet removed and
376
    are tracked for removal, get minimal data from BioSample and return a
377
    dictionary which can be used to patch a BioSample id with a new
378
    BioSample submission in order to remove a BioSamples record
379
    (publish the BioSample record after 1000 years from Now).
380
381
    Yields
382
    ------
383
    new_data : dict
384
        payload to submit to BioSample in order to remove a BioSamples record.
385
    """
386
387
    with requests.Session() as session:
388
        # get all biosamples candidate for a removal. Pay attention that
389
        # could be removed from different users
390
        qs = OrphanSample.objects.filter(
391
            ignore=False,
392
            removed=False,
393
            status=READY
394
        ).order_by('team__name', 'id')
395
396
        if limit:
397
            qs = islice(qs, limit)
398
399
        for orphan_sample in qs:
400
            # define the url I need to check
401
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
402
403
            # read data from url
404
            response = session.get(url)
405
            data = response.json()
406
407
            # I need a new data dictionary to submit
408
            new_data = dict()
409
410
            # I suppose the accession exists, since I found this sample
411
            # using accession [biosample.id]
412
            new_data['accession'] = data.get(
413
                'accession', orphan_sample.biosample_id)
414
415
            new_data['alias'] = data['name']
416
417
            new_data['title'] = data['characteristics']['title'][0]['text']
418
419
            # this will be the most important attribute
420
            new_data['releaseDate'] = str(
421
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
422
423
            new_data['taxonId'] = data['taxId']
424
425
            # need to determine taxon as
426
            new_data['taxon'] = DictSpecie.objects.get(
427
                term__endswith=data['taxId']).label
428
429
            new_data['attributes'] = dict()
430
431
            new_data['description'] = "Removed by InjectTool"
432
433
            # set project again
434
            new_data['attributes']["Project"] = format_attribute(
435
                value="IMAGE")
436
437
            # return new biosample data
438
            yield {
439
                'data': new_data,
440
                'team': orphan_sample.team,
441
                'sample': orphan_sample,
442
            }
443
444
445
# register explicitly tasks
446
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
447
celery_app.tasks.register(CleanUpTask)
448
celery_app.tasks.register(SearchOrphanTask)
449