Completed
Pull Request — devel (#90)
by Paolo
06:25
created

biosample.tasks.cleanup.get_biosamples()   B

Complexity

Conditions 8

Size

Total Lines 73
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 73
rs 7.3333
c 0
b 0
f 0
cc 8
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
from itertools import islice
16
17
from datetime import timedelta
18
from celery.utils.log import get_task_logger
19
from django.utils import timezone
20
from django.utils.dateparse import parse_date
21
22
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
23
from common.helpers import format_attribute, send_mail_to_admins
24
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
25
from image.celery import app as celery_app
26
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
27
28
from ..helpers import get_manager_auth
29
from ..models import Submission, OrphanSample, ManagedTeam
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# defining constants. Clean biosample database data after
35
CLEANUP_DAYS = 30
36
37
# this is the timedelta which I want to add to relaseDate to remove samples
38
RELEASE_TIMEDELTA = timedelta(days=365*1000)
39
40
# Setting page size for biosample requests
41
PAGE_SIZE = 100
42
43
BIOSAMPLE_PARAMS = MultiDict([
44
    ('size', PAGE_SIZE),
45
    ('filter', 'attr:project:IMAGE'),
46
    ])
47
HEADERS = {
48
        'Accept': 'application/hal+json',
49
    }
50
51
52
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
53
    """Perform biosample.models cleanup by selecting old completed submission
54
    and remove them from database"""
55
56
    name = "Clean biosample models"
57
    description = """Clean biosample models"""
58
59
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
60
    def run(self):
61
        """
62
        This function is called when delay is called. It will acquire a lock
63
        in redis, so those tasks are mutually exclusive
64
65
        Returns:
66
            str: success if everything is ok. Different messages if task is
67
            already running or exception is caught"""
68
69
        logger.info("Clean biosample.database started")
70
71
        # get an interval starting from now
72
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
73
74
        # select all COMPLETED object older than interval
75
        qs = Submission.objects.filter(
76
            updated_at__lt=interval,
77
            status=COMPLETED)
78
79
        logger.info(
80
            "Deleting %s biosample.models.Submission objects" % qs.count())
81
82
        # delete all old objects
83
        qs.delete()
84
85
        # debug
86
        logger.info("Clean biosample.database completed")
87
88
        return "success"
89
90
91
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
92
    """
93
    Fetch a generic url, read data as json and return a promise
94
95
    Parameters
96
    ----------
97
    session : aiohttp.ClientSession
98
        an async session object.
99
    url : str, optional
100
        the desidered url. The default is BIOSAMPLE_URL.
101
    params : MultiDict, optional
102
        Additional params for request. The default is BIOSAMPLE_PARAMS.
103
104
    Returns
105
    -------
106
    dict
107
        json content of the page
108
109
    """
110
    """"""
111
112
    # define a URL with yarl
113
    url = URL(url)
114
    url = url.update_query(params)
115
116
    logger.debug(url)
117
118
    async with session.get(url, headers=HEADERS) as response:
119
        try:
120
            return await response.json()
121
122
        except aiohttp.client_exceptions.ContentTypeError as exc:
123
            logger.error(repr(exc))
124
            logger.warning(
125
                "error while getting data from %s" % url)
126
            return {}
127
128
129
async def filter_managed_biosamples(data, managed_domains):
130
    """
131
    Parse data from a BioSample results page and yield samples managed
132
    by InjectTool users.
133
134
    Parameters
135
    ----------
136
    data : dict
137
        biosample data read from BIOSAMPLE_URL.
138
    managed_domains : list
139
        A list of AAP domains, as returned from
140
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
141
142
    Yields
143
    ------
144
    sample : dict
145
        a BioSample record.
146
147
    """
148
    # get samples objects
149
    try:
150
        samples = data['_embedded']['samples']
151
152
        for sample in samples:
153
            # filter out unmanaged records
154
            if sample['domain'] not in managed_domains:
155
                logger.warning("Ignoring %s" % (sample['name']))
156
                continue
157
158
            # otherwise return to the caller the sample
159
            yield sample
160
161
    except KeyError as exc:
162
        # logger exception. With repr() the exception name is rendered
163
        logger.error(repr(exc))
164
        logger.warning("error while parsing samples")
165
        logger.warning(data)
166
167
168
async def get_biosamples(
169
        url=BIOSAMPLE_URL,
170
        params=BIOSAMPLE_PARAMS,
171
        managed_domains=[]):
172
    """
173
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
174
    once, determines how many pages to request and return only sample record
175
    managed by InjectTool
176
177
    Parameters
178
    ----------
179
    url : str, optional
180
        The desidered URL. The default is BIOSAMPLE_URL.
181
    params : MultiDict, optional
182
        Additional params for request. The default is BIOSAMPLE_PARAMS.
183
    managed_domains : list
184
        A list of AAP domains, as returned from
185
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
186
187
    Yields
188
    ------
189
    sample : dict
190
        a BioSample record.
191
192
    """
193
    async with aiohttp.ClientSession() as session:
194
        # get data for the first time to determine how many pages I have
195
        # to requests
196
        data = await fetch_url(session, url, params)
197
198
        # maybe the request had issues
199
        if data == {}:
200
            logger.debug("Got a result with no data")
201
            raise ConnectionError("Can't fetch biosamples for orphan samples")
202
203
        # process data and filter samples I own
204
        # https://stackoverflow.com/a/47378063
205
        async for sample in filter_managed_biosamples(data, managed_domains):
206
            # return a managed biosample record
207
            yield sample
208
209
        tasks = []
210
211
        # get pages
212
        totalPages = data['page']['totalPages']
213
214
        # generate new awaitable objects
215
        for page in range(1, totalPages):
216
            # get a new param object to edit
217
            my_params = params.copy()
218
219
            # edit a multidict object
220
            my_params.update(page=page)
221
222
            # track the new awaitable object
223
            tasks.append(fetch_url(session, url, my_params))
224
225
        # Run awaitable objects in the aws set concurrently.
226
        # Return an iterator of Future objects.
227
        for task in asyncio.as_completed(tasks):
228
            # read data
229
            data = await task
230
231
            # maybe the request had issues
232
            if data == {}:
233
                logger.debug("Got a result with no data")
234
                continue
235
236
            # process data and filter samples I own
237
            # https://stackoverflow.com/a/47378063
238
            async for sample in filter_managed_biosamples(
239
                    data, managed_domains):
240
                yield sample
241
242
243
async def check_samples():
244
    """
245
    Get all records from BioSamples submitted by the InjectTool manager auth
246
    managed domains, and call check_orphan_sample for each of them
247
248
    Returns
249
    -------
250
    None.
251
252
    """
253
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
254
    # belong to me
255
    auth = get_manager_auth()
256
    managed_domains = auth.get_domains()
257
258
    async for sample in get_biosamples(managed_domains=managed_domains):
259
        check_orphan_sample(sample)
260
261
262
def check_orphan_sample(sample):
263
    """
264
    Get a BioSample record and check if such BioSampleId is registered into
265
    InjectTool UID. If Such record is not present, create a new
266
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
267
    orphan table
268
269
    Parameters
270
    ----------
271
    sample : dict
272
        a BioSample record.
273
274
    Returns
275
    -------
276
    None.
277
278
    """
279
    animal_qs = UIDAnimal.objects.filter(
280
        biosample_id=sample['accession'])
281
282
    sample_qs = UIDSample.objects.filter(
283
        biosample_id=sample['accession'])
284
285
    if animal_qs.exists() or sample_qs.exists():
286
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
287
288
    else:
289
        # get a managed team
290
        team = ManagedTeam.objects.get(name=sample["domain"])
291
292
        # Create an orphan sample
293
        orphan, created = OrphanSample.objects.get_or_create(
294
            biosample_id=sample['accession'],
295
            name=sample['name'],
296
            team=team,
297
            status=READY,
298
        )
299
300
        if created:
301
            logger.warning("Add %s to orphan samples" % sample['accession'])
302
303
304
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
305
    """Search accross biosamples for objects not present in UID"""
306
307
    name = "Search Orphan BioSamples IDs"
308
    description = """Track BioSamples IDs not present in UID"""
309
310
    @exclusive_task(
311
        task_name=name, lock_id="SearchOrphanTask")
312
    def run(self):
313
        """
314
        This function is called when delay is called. It will acquire a lock
315
        in redis, so those tasks are mutually exclusive
316
317
        Returns:
318
            str: success if everything is ok. Different messages if task is
319
            already running or exception is caught"""
320
321
        logger.info("%s started" % (self.name))
322
323
        # create a loop object
324
        loop = asyncio.new_event_loop()
325
326
        # execute stuff
327
        try:
328
            loop.run_until_complete(check_samples())
329
330
        finally:
331
            # close loop
332
            loop.close()
333
334
        # Ok count orphan samples
335
        orphan_count = sum(1 for orphan in get_orphan_samples())
336
337
        if orphan_count > 0:
338
            email_subject = "Some entries in BioSamples are orphan"
339
            email_message = (
340
                "There are %s biosample ids which are not managed by "
341
                "InjectTool" % orphan_count)
342
343
            logger.warning(email_message)
344
345
            # Notify admins if I have orphan samples
346
            send_mail_to_admins(email_subject, email_message)
347
348
        # debug
349
        logger.info("%s completed" % (self.name))
350
351
        return "success"
352
353
354
def get_orphan_samples(limit=None):
355
    """
356
    Iterate for all BioSample orphaned records which are not yet removed and
357
    are tracked for removal, get minimal data from BioSample and return a
358
    dictionary which can be used to patch a BioSample id with a new
359
    BioSample submission in order to remove a BioSamples record
360
    (publish the BioSample record after 1000 years from Now).
361
362
    Yields
363
    ------
364
    new_data : dict
365
        payload to submit to BioSample in order to remove a BioSamples record.
366
    """
367
368
    with requests.Session() as session:
369
        # get all biosamples candidate for a removal. Pay attention that
370
        # could be removed from different users
371
        qs = OrphanSample.objects.filter(
372
            ignore=False,
373
            removed=False,
374
            status=READY
375
        ).order_by('team__name', 'id')
376
377
        if limit:
378
            qs = islice(qs, limit)
379
380
        for orphan_sample in qs:
381
            # define the url I need to check
382
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
383
384
            # read data from url
385
            response = session.get(url)
386
            data = response.json()
387
388
            # I need a new data dictionary to submit
389
            new_data = dict()
390
391
            # I suppose the accession exists, since I found this sample
392
            # using accession [biosample.id]
393
            new_data['accession'] = data.get(
394
                'accession', orphan_sample.biosample_id)
395
396
            new_data['alias'] = data['name']
397
398
            new_data['title'] = data['characteristics']['title'][0]['text']
399
400
            # this will be the most important attribute
401
            new_data['releaseDate'] = str(
402
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
403
404
            new_data['taxonId'] = data['taxId']
405
406
            # need to determine taxon as
407
            new_data['taxon'] = DictSpecie.objects.get(
408
                term__endswith=data['taxId']).label
409
410
            new_data['attributes'] = dict()
411
412
            new_data['description'] = "Removed by InjectTool"
413
414
            # set project again
415
            new_data['attributes']["Project"] = format_attribute(
416
                value="IMAGE")
417
418
            # return new biosample data
419
            yield {
420
                'data': new_data,
421
                'team': orphan_sample.team,
422
                'sample': orphan_sample,
423
            }
424
425
426
# register explicitly tasks
427
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
428
celery_app.tasks.register(CleanUpTask)
429
celery_app.tasks.register(SearchOrphanTask)
430