Passed
Pull Request — devel (#90)
by Paolo
06:10
created

biosample.tasks.cleanup.check_samples()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 2
nop 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Nov 14 16:06:10 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import asyncio
10
import aiohttp
11
12
from yarl import URL
13
from multidict import MultiDict
14
15
from datetime import timedelta
16
from celery.utils.log import get_task_logger
17
from django.utils import timezone
18
19
from common.constants import COMPLETED, BIOSAMPLE_URL
20
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
21
from image.celery import app as celery_app
22
from uid.models import Animal as UIDAnimal, Sample as UIDSample
23
24
from ..helpers import get_manager_auth
25
from ..models import Submission, OrphanSample
26
27
# Get an instance of a logger
28
logger = get_task_logger(__name__)
29
30
# defining constants. Clean biosample database data after
31
CLEANUP_DAYS = 30
32
33
# Setting page size for biosample requests
34
PAGE_SIZE = 20
35
36
PARAMS = MultiDict([
37
    ('size', PAGE_SIZE),
38
    ('filter', 'attr:project:IMAGE'),
39
    ])
40
HEADERS = {
41
        'Accept': 'application/hal+json',
42
    }
43
44
45
class CleanUpTask(NotifyAdminTaskMixin, BaseTask):
46
    """Perform biosample.models cleanup by selecting old completed submission
47
    and remove them from database"""
48
49
    name = "Clean biosample models"
50
    description = """Clean biosample models"""
51
52
    @exclusive_task(task_name="Clean biosample models", lock_id="CleanUpTask")
53
    def run(self):
54
        """
55
        This function is called when delay is called. It will acquire a lock
56
        in redis, so those tasks are mutually exclusive
57
58
        Returns:
59
            str: success if everything is ok. Different messages if task is
60
            already running or exception is caught"""
61
62
        logger.info("Clean biosample.database started")
63
64
        # get an interval starting from now
65
        interval = timezone.now() - timedelta(days=CLEANUP_DAYS)
66
67
        # select all COMPLETED object older than interval
68
        qs = Submission.objects.filter(
69
            updated_at__lt=interval,
70
            status=COMPLETED)
71
72
        logger.info(
73
            "Deleting %s biosample.models.Submission objects" % qs.count())
74
75
        # delete all old objects
76
        qs.delete()
77
78
        # debug
79
        logger.info("Clean biosample.database completed")
80
81
        return "success"
82
83
84
async def fetch(session, url=BIOSAMPLE_URL, params=PARAMS):
85
    """Get a page from biosamples"""
86
87
    # define a URL with yarl
88
    url = URL(url)
89
    url = url.update_query(params)
90
91
    async with session.get(url, headers=HEADERS) as response:
92
        return await response.json()
93
94
95
async def parse_samples_data(data, managed_domains):
96
    # get samples objects
97
    samples = data['_embedded']['samples']
98
99
    for sample in samples:
100
        # filter out unmanaged records
101
        if sample['domain'] not in managed_domains:
102
            logger.warning("Ignoring %s" % (sample['name']))
103
            continue
104
105
        # otherwise return to the caller the sample
106
        yield sample
107
108
109
async def get_samples(
110
        url=BIOSAMPLE_URL,
111
        params=PARAMS,
112
        managed_domains=[]):
113
    async with aiohttp.ClientSession() as session:
114
        data = await fetch(session, url, params)
115
116
        # process data and filter samples I own
117
        # https://stackoverflow.com/a/47378063
118
        async for sample in parse_samples_data(data, managed_domains):
119
            yield sample
120
121
        tasks = []
122
123
        # get pages
124
        totalPages = data['page']['totalPages']
125
126
        # generate new awaitable objects
127
        for page in range(1, totalPages):
128
            # get a new param object to edit
129
            my_params = params.copy()
130
131
            # edit a multidict object
132
            my_params.update(page=page)
133
134
            # track the new awaitable object
135
            tasks.append(fetch(session, url, my_params))
136
137
        # Run awaitable objects in the aws set concurrently.
138
        # Return an iterator of Future objects.
139
        for task in asyncio.as_completed(tasks):
140
            # read data
141
            data = await task
142
143
            # process data and filter samples I own
144
            # https://stackoverflow.com/a/47378063
145
            async for sample in parse_samples_data(data, managed_domains):
146
                yield sample
147
148
149
async def check_samples():
150
    # I need an pyUSIrest.auth.Auth object to filter out records that don't
151
    # belong to me
152
    auth = get_manager_auth()
153
    managed_domains = auth.get_domains()
154
155
    async for sample in get_samples(managed_domains=managed_domains):
156
        check_orphan_sample(sample)
157
158
159
def check_orphan_sample(sample):
160
    animal_qs = UIDAnimal.objects.filter(
161
        biosample_id=sample['accession'])
162
163
    sample_qs = UIDSample.objects.filter(
164
        biosample_id=sample['accession'])
165
166
    if animal_qs.exists() or sample_qs.exists():
167
        logger.debug("Sample %s is tracked in UID" % (sample['accession']))
168
169
    else:
170
        # test for orphan sample
171
        orphan, created = OrphanSample.objects.get_or_create(
172
            biosample_id=sample['accession'],
173
            name=sample['name'])
174
175
        if created:
176
            logger.warning("Add %s to orphan samples" % sample['accession'])
177
178
179
class SearchOrphanTask(NotifyAdminTaskMixin, BaseTask):
180
    """Search accross biosamples for objects not present in UID"""
181
182
    name = "Search Orphan BioSamples IDs"
183
    description = """Track BioSamples IDs not present in UID"""
184
185
    @exclusive_task(
186
        task_name=name, lock_id="SearchOrphanTask")
187
    def run(self):
188
        """
189
        This function is called when delay is called. It will acquire a lock
190
        in redis, so those tasks are mutually exclusive
191
192
        Returns:
193
            str: success if everything is ok. Different messages if task is
194
            already running or exception is caught"""
195
196
        logger.info("%s started" % (self.name))
197
198
        # create a loop object
199
        loop = asyncio.get_event_loop()
200
201
        # execute stuff
202
        loop.run_until_complete(check_samples())
203
204
        # close loop
205
        loop.close()
206
207
        # debug
208
        logger.info("%s completed" % (self.name))
209
210
        return "success"
211
212
213
# register explicitly tasks
214
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
215
celery_app.tasks.register(CleanUpTask)
216
celery_app.tasks.register(SearchOrphanTask)
217