Completed
Pull Request — master (#44)
by Paolo
05:54
created

biosample.tasks.fetch   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 355
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 26
eloc 150
dl 0
loc 355
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
B FetchStatusTask.fetch_submission_obj() 0 58 8
A FetchStatusTask.run() 0 23 3
A FetchStatusTask.__sample_has_errors() 0 27 1
A FetchStatusTask.submission_has_issues() 0 41 2
A FetchStatusTask.complete() 0 31 3
A FetchStatusTask.fetch_status() 0 35 3
A FetchStatusTask.finalize() 0 56 4
A FetchStatusTask.fetch_queryset() 0 24 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
12
from decouple import AutoConfig
13
from celery.utils.log import get_task_logger
14
15
import pyUSIrest.client
16
17
from django.conf import settings
18
from django.utils import timezone
19
20
from image.celery import app as celery_app, MyTask
21
from image_app.helpers import parse_image_alias, get_model_object
22
from image_app.models import Submission
23
from common.tasks import redis_lock
24
from common.constants import (
25
    ERROR, NEED_REVISION, SUBMITTED, COMPLETED)
26
from submissions.helpers import send_message
27
28
from ..helpers import get_manager_auth
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# define a decouple config object
34
settings_dir = os.path.join(settings.BASE_DIR, 'image')
35
config = AutoConfig(search_path=settings_dir)
36
37
# a threshold of days to determine a very long task
38
MAX_DAYS = 5
39
40
41
class FetchStatusTask(MyTask):
42
    name = "Fetch USI status"
43
    description = """Fetch biosample using USI API"""
44
    lock_id = "FetchStatusTask"
45
46
    def run(self):
47
        """
48
        This function is called when delay is called. It will acquire a lock
49
        in redis, so those tasks are mutually exclusive
50
51
        Returns:
52
            str: success if everything is ok. Different messages if task is
53
            already running or exception is caught"""
54
55
        # debugging instance
56
        self.debug_task()
57
58
        # forcing blocking condition: Wait until a get a lock object
59
        with redis_lock(self.lock_id, blocking=False) as acquired:
60
            if acquired:
61
                # do stuff and return something
62
                return self.fetch_status()
63
64
        message = "%s already running!" % (self.name)
65
66
        logger.warning(message)
67
68
        return message
69
70
    def fetch_status(self):
71
        """
72
        Fetch status from pending submissions. Called from
73
        :py:meth:`run`, handles exceptions from USI, select
74
        all :py:class:`Submission <image_app.models.Submission>` objects
75
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
76
        from :ref:`UID <The Unified Internal Database>` and call
77
        :py:meth:`fetch_queryset` with this data
78
        """
79
80
        logger.info("fetch_status started")
81
82
        # search for submission with SUBMITTED status. Other submission are
83
        # not yet finalized. This function need to be called by exclusives
84
        # tasks
85
        qs = Submission.objects.filter(status=SUBMITTED)
86
87
        # check for queryset length
88
        if qs.count() != 0:
89
            try:
90
                # fetch biosample status
91
                self.fetch_queryset(qs)
92
93
            # retry a task under errors
94
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
95
            except ConnectionError as exc:
96
                raise self.retry(exc=exc)
97
98
        else:
99
            logger.debug("No pending submission in UID database")
100
101
        # debug
102
        logger.info("fetch_status completed")
103
104
        return "success"
105
106
    # a function to retrieve biosample submission
107
    def fetch_queryset(self, queryset):
108
        """Fetch biosample against a queryset (a list of
109
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
110
        :py:class:`Submission <image_app.models.Submission>` objects). Iterate
111
        through submission to get USI info. Calls
112
        :py:meth:`fetch_submission_obj`
113
        """
114
115
        logger.info("Searching for submissions into biosample")
116
117
        # track data
118
        usi_objs = {}
119
120
        # create a new auth object
121
        logger.debug("Generate a token for 'USI_MANAGER'")
122
        usi_objs['auth'] = get_manager_auth()
123
124
        logger.debug("Getting root")
125
        usi_objs['root'] = pyUSIrest.client.Root(usi_objs['auth'])
126
127
        for submission_obj in queryset:
128
            self.fetch_submission_obj(submission_obj, usi_objs)
129
130
        logger.info("fetch_queryset completed")
131
132
    def fetch_submission_obj(self, submission_obj, usi_objs):
133
        """Fetch USI from a biosample object"""
134
135
        logger.info("Processing submission %s" % (submission_obj))
136
137
        # fetch a biosample object
138
        submission = usi_objs['root'].get_submission_by_name(
139
            submission_name=submission_obj.biosample_submission_id)
140
141
        # Update submission status if completed
142
        if submission.status == 'Completed':
143
            # fetch biosample ids with a proper function
144
            self.complete(submission, submission_obj)
145
146
        elif submission.status == 'Draft':
147
            # check for a long task
148
            if self.submission_has_issues(submission, submission_obj):
149
                # return to the caller. I've just marked the submission with
150
                # errors and sent a mail to the user
151
                return
152
153
            # check validation. If it is ok, finalize submission
154
            status = submission.get_status()
155
156
            # this mean validation statuses, I want to see completed in all
157
            # samples
158
            if len(status) == 1 and 'Complete' in status:
159
                # check for errors and eventually finalize
160
                self.finalize(submission, submission_obj)
161
162
            else:
163
                logger.warning(
164
                    "Biosample validation is not completed yet (%s)" %
165
                    (status))
166
167
        elif submission.status == 'Submitted':
168
            # check for a long task
169
            if self.submission_has_issues(submission, submission_obj):
170
                # return to the caller. I've just marked the submission with
171
                # errors and sent a mail to the user
172
                return
173
174
            logger.info(
175
                "Submission %s is '%s'. Waiting for biosample "
176
                "ids" % (submission.id, submission.status))
177
178
            # debug submission status
179
            document = submission.follow_url(
180
                "processingStatusSummary", usi_objs['auth'])
181
182
            logger.debug(
183
                "Current status for submission %s is %s" % (
184
                    submission.id, document.data))
185
186
        else:
187
            # HINT: thrown an exception?
188
            logger.warning("Unknown status %s for submission %s" % (
189
                submission.status, submission.name))
190
191
    def submission_has_issues(self, submission, submission_obj):
192
        """
193
        Check that biosample submission has not issues. For example, that
194
        it will remain in the same status for a long time
195
196
        Args:
197
            submission (pyUSIrest.client.Submission): a USI submission object
198
            submission_obj (image_app.models.Submission): an UID submission
199
                object
200
201
        Returns:
202
            bool: True if an issue is detected
203
204
        """
205
206
        if (timezone.now() - submission_obj.updated_at).days > MAX_DAYS:
207
            message = (
208
                "Biosample subission %s remained with the same status "
209
                "for more than %s days. Please report it to InjectTool "
210
                "team" % (submission_obj, MAX_DAYS))
211
            submission_obj.status = ERROR
212
            submission_obj.message = message
213
            submission_obj.save()
214
215
            # send async message
216
            send_message(submission_obj)
217
218
            logger.error("Errors for submission: %s" % (submission))
219
            logger.error(message)
220
221
            # send a mail to the user
222
            submission_obj.owner.email_user(
223
                "Error in biosample submission %s" % (
224
                    submission_obj.id),
225
                ("Something goes wrong: %s" % message),
226
            )
227
228
            return True
229
230
        else:
231
            return False
232
233
    def __sample_has_errors(self, sample, table, pk):
234
        """
235
        Helper metod to mark a (animal/sample) with its own errors. Table
236
        sould be Animal or Sample to update the approriate object. Sample
237
        is a USI sample object
238
239
        Args:
240
            sample (pyUSIrest.client.sample): a USI sample object
241
            table (str): ``Animal`` or ``Sample``, mean the table where this
242
                object should be searched
243
            pk (int): table primary key
244
        """
245
246
        # get sample/animal object relying on table name and pk
247
        sample_obj = get_model_object(table, pk)
248
249
        sample_obj.name.status = NEED_REVISION
250
        sample_obj.name.save()
251
252
        # get a USI validation result
253
        validation_result = sample.get_validation_result()
254
255
        # TODO: should I store validation_result error in validation tables?
256
        errorMessages = validation_result.errorMessages
257
258
        # return an error for each object
259
        return {str(sample_obj): errorMessages}
260
261
    # a function to finalize a submission
262
    def finalize(self, submission, submission_obj):
263
        # get errors for a submission
264
        errors = submission.has_errors()
265
266
        # collect all error messages in a list
267
        messages = []
268
269
        if True in errors:
270
            # get sample with errors then update database
271
            samples = submission.get_samples(has_errors=True)
272
273
            for sample in samples:
274
                # derive pk and table from alias
275
                table, pk = parse_image_alias(sample.alias)
276
277
                # need to check if this sample/animals has errors or not
278
                if sample.has_errors():
279
                    logger.warning(
280
                        "%s in table %s has errors!!!" % (sample, table))
281
282
                    # mark this sample since has problems
283
                    errorMessages = self.__sample_has_errors(
284
                        sample, table, pk)
285
286
                    # append this into error messages list
287
                    messages.append(errorMessages)
288
289
                # if a sample has no errors, status will be the same
290
291
            logger.error("Errors for submission: %s" % (submission))
292
            logger.error("Fix them, then finalize")
293
294
            # report error via mai
295
            email_body = "Some items needs revision:\n\n" + \
296
                json.dumps(messages, indent=2)
297
298
            # send a mail for this submission
299
            submission_obj.owner.email_user(
300
                "Error in biosample submission %s" % (submission_obj.id),
301
                email_body,
302
            )
303
304
            # Update status for submission
305
            submission_obj.status = NEED_REVISION
306
            submission_obj.message = "Error in biosample submission"
307
            submission_obj.save()
308
309
            # send async message
310
            send_message(submission_obj)
311
312
        else:
313
            # raising an exception while finalizing will result
314
            # in a failed task.
315
            # TODO: model and test exception in finalization
316
            logger.info("Finalizing submission %s" % (submission.name))
317
            submission.finalize()
318
319
    def complete(self, submission, submission_obj):
320
        # cicle along samples
321
        for sample in submission.get_samples():
322
            # derive pk and table from alias
323
            table, pk = parse_image_alias(sample.alias)
324
325
            # if no accession, return without doing anything
326
            if sample.accession is None:
327
                logger.error("No accession found for sample %s" % (sample))
328
                logger.error("Ignoring submission %s" % (submission))
329
                return
330
331
            # get sample/animal object relying on table name and pk
332
            sample_obj = get_model_object(table, pk)
333
334
            # update statuses
335
            sample_obj.name.status = COMPLETED
336
            sample_obj.name.biosample_id = sample.accession
337
            sample_obj.name.save()
338
339
        # update submission
340
        submission_obj.status = COMPLETED
341
        submission_obj.message = "Successful submission into biosample"
342
        submission_obj.save()
343
344
        # send async message
345
        send_message(submission_obj)
346
347
        logger.info(
348
            "Submission %s is now completed and recorded into UID" % (
349
                submission))
350
351
352
# register explicitly tasks
353
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
354
celery_app.tasks.register(FetchStatusTask)
355