Passed
Pull Request — master (#97)
by Paolo
03:10
created

biosample.tasks.cleanup.get_orphan_samples()   B

Complexity

Conditions 5

Size

Total Lines 76
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 33
dl 0
loc 76
rs 8.6213
c 0
b 0
f 0
cc 5
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
from itertools import islice
16
17
from datetime import timedelta
18
from celery.utils.log import get_task_logger
19
from django.utils import timezone
20
from django.utils.dateparse import parse_date
21
22
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
23
from common.helpers import format_attribute, send_mail_to_admins
24
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
25
from image.celery import app as celery_app
26
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
27
28
from ..helpers import get_manager_auth
29
from ..models import Submission, OrphanSample, ManagedTeam
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# defining constants. Clean biosample database data after
35
CLEANUP_DAYS = 30
36
37
# this is the timedelta which I want to add to relaseDate to remove samples
38
RELEASE_TIMEDELTA = timedelta(days=365*1000)
39
40
# Setting page size for biosample requests
41
PAGE_SIZE = 500
42
43
BIOSAMPLE_PARAMS = MultiDict([
44
    ('size', PAGE_SIZE),
45
    ('filter', 'attr:project:IMAGE'),
46
    ])
47
HEADERS = {
48
        'Accept': 'application/hal+json',
49
    }
50
51
# define the orphan queryset once
52
ORPHAN_QS = OrphanSample.objects.filter(
53
    ignore=False,
54
    removed=False,
55
    status=READY
56
)
57
58
59
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
60
    """Perform biosample.models cleanup by selecting old completed submission
61
    and remove them from database"""
62
63
    name = "Clean biosample models"
64
    description = """Clean biosample models"""
65
66
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
67
    def run(self):
68
        """
69
        This function is called when delay is called. It will acquire a lock
70
        in redis, so those tasks are mutually exclusive
71
72
        Returns:
73
            str: success if everything is ok. Different messages if task is
74
            already running or exception is caught"""
75
76
        logger.info("Clean biosample.database started")
77
78
        # get an interval starting from now
79
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
80
81
        # select all COMPLETED object older than interval
82
        qs = Submission.objects.filter(
83
            updated_at__lt=interval,
84
            status=COMPLETED)
85
86
        logger.info(
87
            "Deleting %s biosample.models.Submission objects" % qs.count())
88
89
        # delete all old objects
90
        qs.delete()
91
92
        # debug
93
        logger.info("Clean biosample.database completed")
94
95
        return "success"
96
97
98
async def parse_json(response, url):
99
    """Helper function to parse json data"""
100
101
    try:
102
        return await response.json()
103
104
    except aiohttp.client_exceptions.ContentTypeError as exc:
105
        logger.error(repr(exc))
106
        logger.warning(
107
            "error while getting data from %s" % url)
108
        return {}
109
110
111
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
112
    """
113
    Fetch a generic url, read data as json and return a promise
114
115
    Parameters
116
    ----------
117
    session : aiohttp.ClientSession
118
        an async session object.
119
    url : str, optional
120
        the desidered url. The default is BIOSAMPLE_URL.
121
    params : MultiDict, optional
122
        Additional params for request. The default is BIOSAMPLE_PARAMS.
123
124
    Returns
125
    -------
126
    dict
127
        json content of the page
128
129
    """
130
    """"""
131
132
    # define a URL with yarl
133
    url = URL(url)
134
    url = url.update_query(params)
135
136
    logger.debug(url)
137
138
    try:
139
        async with session.get(url, headers=HEADERS) as response:
140
            # try to read json data
141
            return await parse_json(response, url)
142
143
    except aiohttp.client_exceptions.ServerDisconnectedError as exc:
144
        logger.error(repr(exc))
145
        logger.warning(
146
            "server disconnected during %s" % url)
147
        return {}
148
149
150
async def filter_managed_biosamples(data, managed_domains):
151
    """
152
    Parse data from a BioSample results page and yield samples managed
153
    by InjectTool users.
154
155
    Parameters
156
    ----------
157
    data : dict
158
        biosample data read from BIOSAMPLE_URL.
159
    managed_domains : list
160
        A list of AAP domains, as returned from
161
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
162
163
    Yields
164
    ------
165
    sample : dict
166
        a BioSample record.
167
168
    """
169
    # get samples objects
170
    try:
171
        samples = data['_embedded']['samples']
172
173
        for sample in samples:
174
            # filter out unmanaged records
175
            if sample['domain'] not in managed_domains:
176
                logger.warning("Ignoring %s" % (sample['name']))
177
                continue
178
179
            # otherwise return to the caller the sample
180
            yield sample
181
182
    except KeyError as exc:
183
        # logger exception. With repr() the exception name is rendered
184
        logger.error(repr(exc))
185
        logger.warning("error while parsing samples")
186
        logger.warning(data)
187
188
189
async def get_biosamples(
190
        url=BIOSAMPLE_URL,
191
        params=BIOSAMPLE_PARAMS,
192
        managed_domains=[]):
193
    """
194
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
195
    once, determines how many pages to request and return only sample record
196
    managed by InjectTool
197
198
    Parameters
199
    ----------
200
    url : str, optional
201
        The desidered URL. The default is BIOSAMPLE_URL.
202
    params : MultiDict, optional
203
        Additional params for request. The default is BIOSAMPLE_PARAMS.
204
    managed_domains : list
205
        A list of AAP domains, as returned from
206
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
207
208
    Yields
209
    ------
210
    sample : dict
211
        a BioSample record.
212
213
    """
214
    # limiting the number of connections
215
    # https://docs.aiohttp.org/en/stable/client_advanced.html
216
    connector = aiohttp.TCPConnector(limit=10, ttl_dns_cache=300)
217
218
    # https://stackoverflow.com/a/43857526
219
    async with aiohttp.ClientSession(connector=connector) as session:
220
        # get data for the first time to determine how many pages I have
221
        # to requests
222
        data = await fetch_url(session, url, params)
223
224
        # maybe the request had issues
225
        if data == {}:
226
            logger.debug("Got a result with no data")
227
            raise ConnectionError("Can't fetch biosamples for orphan samples")
228
229
        # process data and filter samples I own
230
        # https://stackoverflow.com/a/47378063
231
        async for sample in filter_managed_biosamples(data, managed_domains):
232
            # return a managed biosample record
233
            yield sample
234
235
        tasks = []
236
237
        # get pages
238
        totalPages = data['page']['totalPages']
239
240
        # generate new awaitable objects
241
        for page in range(1, totalPages):
242
            # get a new param object to edit
243
            my_params = params.copy()
244
245
            # edit a multidict object
246
            my_params.update(page=page)
247
248
            # track the new awaitable object
249
            tasks.append(fetch_url(session, url, my_params))
250
251
        # Run awaitable objects in the aws set concurrently.
252
        # Return an iterator of Future objects.
253
        for task in asyncio.as_completed(tasks):
254
            # read data
255
            data = await task
256
257
            # maybe the request had issues
258
            if data == {}:
259
                logger.debug("Got a result with no data")
260
                continue
261
262
            # process data and filter samples I own
263
            # https://stackoverflow.com/a/47378063
264
            async for sample in filter_managed_biosamples(
265
                    data, managed_domains):
266
                yield sample
267
268
269
async def check_samples():
270
    """
271
    Get all records from BioSamples submitted by the InjectTool manager auth
272
    managed domains, and call check_orphan_sample for each of them
273
274
    Returns
275
    -------
276
    None.
277
278
    """
279
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
280
    # belong to me
281
    auth = get_manager_auth()
282
    managed_domains = auth.get_domains()
283
284
    async for sample in get_biosamples(managed_domains=managed_domains):
285
        check_orphan_sample(sample)
286
287
288
def check_orphan_sample(sample):
289
    """
290
    Get a BioSample record and check if such BioSampleId is registered into
291
    InjectTool UID. If Such record is not present, create a new
292
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
293
    orphan table
294
295
    Parameters
296
    ----------
297
    sample : dict
298
        a BioSample record.
299
300
    Returns
301
    -------
302
    None.
303
304
    """
305
    animal_qs = UIDAnimal.objects.filter(
306
        biosample_id=sample['accession'])
307
308
    sample_qs = UIDSample.objects.filter(
309
        biosample_id=sample['accession'])
310
311
    if animal_qs.exists() or sample_qs.exists():
312
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
313
314
    else:
315
        # get a managed team
316
        team = ManagedTeam.objects.get(name=sample["domain"])
317
318
        # Create an orphan sample
319
        orphan, created = OrphanSample.objects.get_or_create(
320
            biosample_id=sample['accession'],
321
            name=sample['name'],
322
            team=team,
323
            status=READY,
324
        )
325
326
        if created:
327
            logger.warning("Add %s to orphan samples" % sample['accession'])
328
329
330
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
331
    """Search accross biosamples for objects not present in UID"""
332
333
    name = "Search Orphan BioSamples IDs"
334
    description = """Track BioSamples IDs not present in UID"""
335
336
    @exclusive_task(
337
        task_name=name, lock_id="SearchOrphanTask")
338
    def run(self):
339
        """
340
        This function is called when delay is called. It will acquire a lock
341
        in redis, so those tasks are mutually exclusive
342
343
        Returns:
344
            str: success if everything is ok. Different messages if task is
345
            already running or exception is caught"""
346
347
        logger.info("%s started" % (self.name))
348
349
        # create a loop object
350
        loop = asyncio.new_event_loop()
351
352
        # execute stuff
353
        try:
354
            loop.run_until_complete(check_samples())
355
356
        finally:
357
            # close loop
358
            loop.close()
359
360
        # Ok count orphan samples with a query
361
        orphan_count = ORPHAN_QS.count()
362
363
        if orphan_count > 0:
364
            email_subject = "Some entries in BioSamples are orphan"
365
            email_message = (
366
                "There are %s biosample ids which are not managed by "
367
                "InjectTool" % orphan_count)
368
369
            logger.warning(email_message)
370
371
            # Notify admins if I have orphan samples
372
            send_mail_to_admins(email_subject, email_message)
373
374
        # debug
375
        logger.info("%s completed" % (self.name))
376
377
        return "success"
378
379
380
def get_orphan_samples(limit=None):
381
    """
382
    Iterate for all BioSample orphaned records which are not yet removed and
383
    are tracked for removal, get minimal data from BioSample and return a
384
    dictionary which can be used to patch a BioSample id with a new
385
    BioSample submission in order to remove a BioSamples record
386
    (publish the BioSample record after 1000 years from Now).
387
388
    Yields
389
    ------
390
    new_data : dict
391
        payload to submit to BioSample in order to remove a BioSamples record.
392
    """
393
394
    with requests.Session() as session:
395
        # get all biosamples candidate for a removal. Pay attention that
396
        # could be removed from different users
397
        qs = ORPHAN_QS.order_by('team__name', 'id')
398
399
        if limit:
400
            qs = islice(qs, limit)
401
402
        for orphan_sample in qs:
403
            # define the url I need to check
404
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
405
406
            # read data from url
407
            response = session.get(url)
408
            data = response.json()
409
410
            # check status
411
            if response.status_code == 403:
412
                logger.error("Error for %s (%s): %s" % (
413
                    orphan_sample.biosample_id,
414
                    data['error'],
415
                    data['message'])
416
                )
417
418
                # this sample seems already removed
419
                continue
420
421
            # I need a new data dictionary to submit
422
            new_data = dict()
423
424
            # I suppose the accession exists, since I found this sample
425
            # using accession [biosample.id]
426
            new_data['accession'] = data.get(
427
                'accession', orphan_sample.biosample_id)
428
429
            new_data['alias'] = data['name']
430
431
            new_data['title'] = data['characteristics']['title'][0]['text']
432
433
            # this will be the most important attribute
434
            new_data['releaseDate'] = str(
435
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
436
437
            new_data['taxonId'] = data['taxId']
438
439
            # need to determine taxon as
440
            new_data['taxon'] = DictSpecie.objects.get(
441
                term__endswith=data['taxId']).label
442
443
            new_data['attributes'] = dict()
444
445
            new_data['description'] = "Removed by InjectTool"
446
447
            # set project again
448
            new_data['attributes']["Project"] = format_attribute(
449
                value="IMAGE")
450
451
            # return new biosample data
452
            yield {
453
                'data': new_data,
454
                'team': orphan_sample.team,
455
                'sample': orphan_sample,
456
            }
457
458
459
# register explicitly tasks
460
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
461
celery_app.tasks.register(CleanUpTask)
462
celery_app.tasks.register(SearchOrphanTask)
463