Completed
Pull Request — devel (#90)
by Paolo
06:31
created

biosample.tasks.cleanup.fetch_url()   A

Complexity

Conditions 2

Size

Total Lines 27
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 27
rs 10
c 0
b 0
f 0
cc 2
nop 3
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
import requests
12
13
from yarl import URL
14
from multidict import MultiDict
15
16
from datetime import timedelta
17
from celery.utils.log import get_task_logger
18
from django.utils import timezone
19
from django.utils.dateparse import parse_date
20
21
from common.constants import COMPLETED, BIOSAMPLE_URL, READY
22
from common.helpers import format_attribute, send_mail_to_admins
23
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
24
from image.celery import app as celery_app
25
from uid.models import Animal as UIDAnimal, Sample as UIDSample, DictSpecie
26
27
from ..helpers import get_manager_auth
28
from ..models import Submission, OrphanSample, ManagedTeam
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# defining constants. Clean biosample database data after
34
CLEANUP_DAYS = 30
35
36
# this is the timedelta which I want to add to relaseDate to remove samples
37
RELEASE_TIMEDELTA = timedelta(days=365*1000)
38
39
# Setting page size for biosample requests
40
PAGE_SIZE = 20
41
42
BIOSAMPLE_PARAMS = MultiDict([
43
    ('size', PAGE_SIZE),
44
    ('filter', 'attr:project:IMAGE'),
45
    ])
46
HEADERS = {
47
        'Accept': 'application/hal+json',
48
    }
49
50
51
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
52
    """Perform biosample.models cleanup by selecting old completed submission
53
    and remove them from database"""
54
55
    name = "Clean biosample models"
56
    description = """Clean biosample models"""
57
58
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
59
    def run(self):
60
        """
61
        This function is called when delay is called. It will acquire a lock
62
        in redis, so those tasks are mutually exclusive
63
64
        Returns:
65
            str: success if everything is ok. Different messages if task is
66
            already running or exception is caught"""
67
68
        logger.info("Clean biosample.database started")
69
70
        # get an interval starting from now
71
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
72
73
        # select all COMPLETED object older than interval
74
        qs = Submission.objects.filter(
75
            updated_at__lt=interval,
76
            status=COMPLETED)
77
78
        logger.info(
79
            "Deleting %s biosample.models.Submission objects" % qs.count())
80
81
        # delete all old objects
82
        qs.delete()
83
84
        # debug
85
        logger.info("Clean biosample.database completed")
86
87
        return "success"
88
89
90
async def fetch_url(session, url=BIOSAMPLE_URL, params=BIOSAMPLE_PARAMS):
91
    """
92
    Fetch a generic url, read data as json and return a promise
93
94
    Parameters
95
    ----------
96
    session : aiohttp.ClientSession
97
        an async session object.
98
    url : str, optional
99
        the desidered url. The default is BIOSAMPLE_URL.
100
    params : MultiDict, optional
101
        Additional params for request. The default is BIOSAMPLE_PARAMS.
102
103
    Returns
104
    -------
105
    dict
106
        json content of the page
107
108
    """
109
    """"""
110
111
    # define a URL with yarl
112
    url = URL(url)
113
    url = url.update_query(params)
114
115
    async with session.get(url, headers=HEADERS) as response:
116
        return await response.json()
117
118
119
async def filter_managed_biosamples(data, managed_domains):
120
    """
121
    Parse data from a BioSample results page and yield samples managed
122
    by InjectTool users.
123
124
    Parameters
125
    ----------
126
    data : dict
127
        biosample data read from BIOSAMPLE_URL.
128
    managed_domains : list
129
        A list of AAP domains, as returned from
130
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
131
132
    Yields
133
    ------
134
    sample : dict
135
        a BioSample record.
136
137
    """
138
    # get samples objects
139
    try:
140
        samples = data['_embedded']['samples']
141
142
        for sample in samples:
143
            # filter out unmanaged records
144
            if sample['domain'] not in managed_domains:
145
                logger.warning("Ignoring %s" % (sample['name']))
146
                continue
147
148
            # otherwise return to the caller the sample
149
            yield sample
150
151
    except KeyError as exc:
152
        # logger exception. With repr() the exception name is rendered
153
        logger.error(repr(exc))
154
        logger.warning("error while parsing samples")
155
        logger.warning(data)
156
157
158
async def get_biosamples(
159
        url=BIOSAMPLE_URL,
160
        params=BIOSAMPLE_PARAMS,
161
        managed_domains=[]):
162
    """
163
    Get all records from BioSamples for the IMAGE project. Fecth Biosample
164
    once, determines how many pages to request and return only sample record
165
    managed by InjectTool
166
167
    Parameters
168
    ----------
169
    url : str, optional
170
        The desidered URL. The default is BIOSAMPLE_URL.
171
    params : MultiDict, optional
172
        Additional params for request. The default is BIOSAMPLE_PARAMS.
173
    managed_domains : list
174
        A list of AAP domains, as returned from
175
        :py:meth:`pyUSIrest.auth.Auth.get_domains`.
176
177
    Yields
178
    ------
179
    sample : dict
180
        a BioSample record.
181
182
    """
183
    async with aiohttp.ClientSession() as session:
184
        # get data for the first time to determine how many pages I have
185
        # to requests
186
        data = await fetch_url(session, url, params)
187
188
        # process data and filter samples I own
189
        # https://stackoverflow.com/a/47378063
190
        async for sample in filter_managed_biosamples(data, managed_domains):
191
            # return a managed biosample record
192
            yield sample
193
194
        tasks = []
195
196
        # get pages
197
        totalPages = data['page']['totalPages']
198
199
        # generate new awaitable objects
200
        for page in range(1, totalPages):
201
            # get a new param object to edit
202
            my_params = params.copy()
203
204
            # edit a multidict object
205
            my_params.update(page=page)
206
207
            # track the new awaitable object
208
            tasks.append(fetch_url(session, url, my_params))
209
210
        # Run awaitable objects in the aws set concurrently.
211
        # Return an iterator of Future objects.
212
        for task in asyncio.as_completed(tasks):
213
            # read data
214
            data = await task
215
216
            # process data and filter samples I own
217
            # https://stackoverflow.com/a/47378063
218
            async for sample in filter_managed_biosamples(
219
                    data, managed_domains):
220
                yield sample
221
222
223
async def check_samples():
224
    """
225
    Get all records from BioSamples submitted by the InjectTool manager auth
226
    managed domains, and call check_orphan_sample for each of them
227
228
    Returns
229
    -------
230
    None.
231
232
    """
233
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
234
    # belong to me
235
    auth = get_manager_auth()
236
    managed_domains = auth.get_domains()
237
238
    async for sample in get_biosamples(managed_domains=managed_domains):
239
        check_orphan_sample(sample)
240
241
242
def check_orphan_sample(sample):
243
    """
244
    Get a BioSample record and check if such BioSampleId is registered into
245
    InjectTool UID. If Such record is not present, create a new
246
    :py:class:`biosample.models.OrphanSample` record object in the BioSample
247
    orphan table
248
249
    Parameters
250
    ----------
251
    sample : dict
252
        a BioSample record.
253
254
    Returns
255
    -------
256
    None.
257
258
    """
259
    animal_qs = UIDAnimal.objects.filter(
260
        biosample_id=sample['accession'])
261
262
    sample_qs = UIDSample.objects.filter(
263
        biosample_id=sample['accession'])
264
265
    if animal_qs.exists() or sample_qs.exists():
266
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
267
268
    else:
269
        # get a managed team
270
        team = ManagedTeam.objects.get(name=sample["domain"])
271
272
        # Create an orphan sample
273
        orphan, created = OrphanSample.objects.get_or_create(
274
            biosample_id=sample['accession'],
275
            name=sample['name'],
276
            team=team,
277
            status=READY,
278
        )
279
280
        if created:
281
            logger.warning("Add %s to orphan samples" % sample['accession'])
282
283
284
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
285
    """Search accross biosamples for objects not present in UID"""
286
287
    name = "Search Orphan BioSamples IDs"
288
    description = """Track BioSamples IDs not present in UID"""
289
290
    @exclusive_task(
291
        task_name=name, lock_id="SearchOrphanTask")
292
    def run(self):
293
        """
294
        This function is called when delay is called. It will acquire a lock
295
        in redis, so those tasks are mutually exclusive
296
297
        Returns:
298
            str: success if everything is ok. Different messages if task is
299
            already running or exception is caught"""
300
301
        logger.info("%s started" % (self.name))
302
303
        # create a loop object
304
        loop = asyncio.new_event_loop()
305
306
        # execute stuff
307
        try:
308
            loop.run_until_complete(check_samples())
309
310
        finally:
311
            # close loop
312
            loop.close()
313
314
        # Ok count orphan samples
315
        orphan_count = sum(1 for orphan in get_orphan_samples())
316
317
        if orphan_count > 0:
318
            email_subject = "Some entries in BioSamples are orphan"
319
            email_message = (
320
                "There are %s biosample ids which are not managed by "
321
                "InjectTool" % orphan_count)
322
323
            logger.warning(email_message)
324
325
            # Notify admins if I have orphan samples
326
            send_mail_to_admins(email_subject, email_message)
327
328
        # debug
329
        logger.info("%s completed" % (self.name))
330
331
        return "success"
332
333
334
def get_orphan_samples():
335
    """
336
    Iterate for all BioSample orphaned records which are not yet removed and
337
    are tracked for removal, get minimal data from BioSample and return a
338
    dictionary which can be used to patch a BioSample id with a new
339
    BioSample submission in order to remove a BioSamples record
340
    (publish the BioSample record after 1000 years from Now).
341
342
    Yields
343
    ------
344
    new_data : dict
345
        payload to submit to BioSample in order to remove a BioSamples record.
346
    """
347
348
    with requests.Session() as session:
349
        for orphan_sample in OrphanSample.objects.filter(
350
                ignore=False,
351
                removed=False,
352
                status=READY).order_by('team__name', 'id'):
353
354
            # define the url I need to check
355
            url = "/".join([BIOSAMPLE_URL, orphan_sample.biosample_id])
356
357
            # read data from url
358
            response = session.get(url)
359
            data = response.json()
360
361
            # I need a new data dictionary to submit
362
            new_data = dict()
363
364
            # I suppose the accession exists, since I found this sample
365
            # using accession [biosample.id]
366
            new_data['accession'] = data.get(
367
                'accession', orphan_sample.biosample_id)
368
369
            new_data['alias'] = data['name']
370
371
            new_data['title'] = data['characteristics']['title'][0]['text']
372
373
            # this will be the most important attribute
374
            new_data['releaseDate'] = str(
375
                parse_date(data['releaseDate']) + RELEASE_TIMEDELTA)
376
377
            new_data['taxonId'] = data['taxId']
378
379
            # need to determine taxon as
380
            new_data['taxon'] = DictSpecie.objects.get(
381
                term__endswith=data['taxId']).label
382
383
            new_data['attributes'] = dict()
384
385
            new_data['description'] = "Removed by InjectTool"
386
387
            # set project again
388
            new_data['attributes']["Project"] = format_attribute(
389
                value="IMAGE")
390
391
            # return new biosample data
392
            yield {
393
                'data': new_data,
394
                'team': orphan_sample.team,
395
                'sample': orphan_sample,
396
            }
397
398
399
# register explicitly tasks
400
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
401
celery_app.tasks.register(CleanUpTask)
402
celery_app.tasks.register(SearchOrphanTask)
403