Passed
Pull Request — master (#44)
by Paolo
07:04
created

biosample.tasks.retrieval   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 472
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 37
eloc 213
dl 0
loc 472
rs 9.44
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
B FetchStatusHelper.check_submission_status() 0 58 8
A FetchStatusHelper.__init__() 0 21 1
A FetchStatusHelper.submission_has_issues() 0 33 2
A FetchStatusHelper.__sample_has_errors() 0 27 1
A FetchStatusHelper.complete() 0 32 3
A FetchStatusTask.fetch_status() 0 35 3
A RetrievalCompleteTask.__update_message() 0 13 2
A FetchStatusTask.fetch_queryset() 0 32 3
A FetchStatusTask.run() 0 23 3
B RetrievalCompleteTask.run() 0 67 7
A FetchStatusHelper.finalize() 0 52 4
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
12
from decouple import AutoConfig
13
from celery.utils.log import get_task_logger
14
15
import pyUSIrest.client
16
17
from django.conf import settings
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from image.celery import app as celery_app, MyTask
22
from image_app.helpers import parse_image_alias, get_model_object
23
from image_app.models import Submission
24
from common.tasks import redis_lock
25
from common.constants import (
26
    ERROR, NEED_REVISION, SUBMITTED, COMPLETED)
27
from submissions.helpers import send_message
28
29
from ..helpers import get_manager_auth
30
from ..models import Submission as USISubmission
31
from .submission import TaskFailureMixin
32
33
# Get an instance of a logger
34
logger = get_task_logger(__name__)
35
36
# define a decouple config object
37
settings_dir = os.path.join(settings.BASE_DIR, 'image')
38
config = AutoConfig(search_path=settings_dir)
39
40
# a threshold of days to determine a very long task
41
MAX_DAYS = 5
42
43
44
# HINT: how this class could be similar to SubmissionHelper?
45
class FetchStatusHelper():
46
    """Helper class to deal with submission data"""
47
48
    # define my class attributes
49
    def __init__(self, usi_submission):
50
        """
51
        Helper function to have info for a biosample.models.Submission
52
53
        Args:
54
            usi_submission (biosample.models.Submission): a biosample
55
                model Submission instance
56
        """
57
58
        # ok those are my default class attributes
59
        self.usi_submission = usi_submission
60
        self.uid_submission = usi_submission.uid_submission
61
62
        # here are pyUSIrest object
63
        self.auth = get_manager_auth()
64
        self.root = pyUSIrest.client.Root(self.auth)
65
66
        # here I will track the biosample submission
67
        self.submission_name = self.usi_submission.usi_submission_name
68
        self.submission = self.root.get_submission_by_name(
69
            submission_name=self.submission_name)
70
71
    def check_submission_status(self):
72
        logger.debug("Checking status for '%s'" % (
73
            self.submission_name))
74
75
        # Update submission status if completed
76
        if self.submission.status == 'Completed':
77
            # fetch biosample ids with a proper function
78
            self.complete()
79
80
        elif self.submission.status == 'Draft':
81
            # check for a long task
82
            if self.submission_has_issues():
83
                # return to the caller. I've just marked the submission with
84
                # errors and sent a mail to the user
85
                return
86
87
            # check validation. If it is ok, finalize submission
88
            status = self.submission.get_status()
89
90
            # this mean validation statuses, I want to see completed in all
91
            # samples
92
            if len(status) == 1 and 'Complete' in status:
93
                # check for errors and eventually finalize
94
                self.finalize()
95
96
            else:
97
                logger.warning(
98
                    "Biosample validation is not completed yet (%s)" %
99
                    (status))
100
101
        elif self.submission.status == 'Submitted':
102
            # check for a long task
103
            if self.submission_has_issues():
104
                # return to the caller. I've just marked the submission with
105
                # errors and sent a mail to the user
106
                return
107
108
            logger.info(
109
                "Submission '%s' is '%s'. Waiting for biosample ids" % (
110
                    self.submission_name,
111
                    self.submission.status))
112
113
            # debug submission status
114
            document = self.submission.follow_url(
115
                "processingStatusSummary", self.auth)
116
117
            logger.debug(
118
                "Current status for submission '%s' is '%s'" % (
119
                    self.submission_name, document.data))
120
121
        else:
122
            # HINT: thrown an exception?
123
            logger.warning("Unknown status '%s' for submission '%s'" % (
124
                self.submission.status,
125
                self.submission_name))
126
127
        logger.debug("Checking status for '%s' completed" % (
128
            self.submission_name))
129
130
    def submission_has_issues(self):
131
        """
132
        Check that biosample submission has not issues. For example, that
133
        it will remain in the same status for a long time
134
135
        Returns:
136
            bool: True if an issue is detected
137
        """
138
139
        logger.debug(
140
            "Check if submission '%s' remained in the same status "
141
            "for a long time" % (
142
                self.submission_name))
143
144
        if (timezone.now() - self.usi_submission.updated_at).days > MAX_DAYS:
145
            message = (
146
                "Biosample submission '%s' remained with the same status "
147
                "for more than %s days. Please report it to InjectTool "
148
                "team" % (self.submission_name, MAX_DAYS))
149
150
            self.usi_submission.status = ERROR
151
            self.usi_submission.message = message
152
            self.usi_submission.save()
153
154
            logger.error(
155
                "Errors for submission: %s" % (
156
                    self.submission_name))
157
            logger.error(message)
158
159
            return True
160
161
        else:
162
            return False
163
164
    def __sample_has_errors(self, sample, table, pk):
165
        """
166
        Helper metod to mark a (animal/sample) with its own errors. Table
167
        sould be Animal or Sample to update the approriate object. Sample
168
        is a USI sample object
169
170
        Args:
171
            sample (pyUSIrest.client.sample): a USI sample object
172
            table (str): ``Animal`` or ``Sample``, mean the table where this
173
                object should be searched
174
            pk (int): table primary key
175
        """
176
177
        # get sample/animal object relying on table name and pk
178
        sample_obj = get_model_object(table, pk)
179
180
        sample_obj.name.status = NEED_REVISION
181
        sample_obj.name.save()
182
183
        # get a USI validation result
184
        validation_result = sample.get_validation_result()
185
186
        # TODO: should I store validation_result error in validation tables?
187
        errorMessages = validation_result.errorMessages
188
189
        # return an error for each object
190
        return {str(sample_obj): errorMessages}
191
192
    def finalize(self):
193
        """Finalize a submission by closing document and send it to
194
        biosample"""
195
196
        logger.debug("Finalizing submission '%s'" % (
197
            self.submission_name))
198
199
        # get errors for a submission
200
        errors = self.submission.has_errors()
201
202
        # collect all error messages in a list
203
        messages = []
204
205
        if True in errors:
206
            # get sample with errors then update database
207
            samples = self.submission.get_samples(has_errors=True)
208
209
            for sample in samples:
210
                # derive pk and table from alias
211
                table, pk = parse_image_alias(sample.alias)
212
213
                # need to check if this sample/animals has errors or not
214
                if sample.has_errors():
215
                    logger.warning(
216
                        "%s in table %s has errors!!!" % (sample, table))
217
218
                    # mark this sample since has problems
219
                    errorMessages = self.__sample_has_errors(
220
                        sample, table, pk)
221
222
                    # append this into error messages list
223
                    messages.append(errorMessages)
224
225
                # if a sample has no errors, status will be the same
226
227
            logger.error(
228
                "Errors for submission: '%s'" % (self.submission_name))
229
            logger.error("Fix them, then finalize")
230
231
            # report error
232
            message = json.dumps(messages, indent=2)
233
234
            # Update status for biosample.models.Submission
235
            self.usi_submission.status = NEED_REVISION
236
            self.usi_submission.message = message
237
            self.usi_submission.save()
238
239
        else:
240
            # raising an exception while finalizing will result
241
            # in a failed task.
242
            # TODO: model and test exception in finalization
243
            self.submission.finalize()
244
245
    def complete(self):
246
        """Complete a submission and fetch name objects"""
247
248
        logger.debug("Completing submission '%s'" % (
249
            self.submission_name))
250
251
        for sample in self.submission.get_samples():
252
            # derive pk and table from alias
253
            table, pk = parse_image_alias(sample.alias)
254
255
            # if no accession, return without doing anything
256
            if sample.accession is None:
257
                logger.error("No accession found for sample '%s'" % (sample))
258
                logger.error("Ignoring submission '%s'" % (self.submission))
259
                return
260
261
            # get sample/animal object relying on table name and pk
262
            sample_obj = get_model_object(table, pk)
263
264
            # update statuses
265
            sample_obj.name.status = COMPLETED
266
            sample_obj.name.biosample_id = sample.accession
267
            sample_obj.name.save()
268
269
        # update submission
270
        self.usi_submission.status = COMPLETED
271
        self.usi_submission.message = "Successful submission into biosample"
272
        self.usi_submission.save()
273
274
        logger.info(
275
            "Submission %s is now completed and recorded into UID" % (
276
                self.submission))
277
278
279
class FetchStatusTask(MyTask):
280
    name = "Fetch USI status"
281
    description = """Fetch biosample using USI API"""
282
    lock_id = "FetchStatusTask"
283
284
    def run(self):
285
        """
286
        This function is called when delay is called. It will acquire a lock
287
        in redis, so those tasks are mutually exclusive
288
289
        Returns:
290
            str: success if everything is ok. Different messages if task is
291
            already running or exception is caught"""
292
293
        # debugging instance
294
        self.debug_task()
295
296
        # forcing blocking condition: Wait until a get a lock object
297
        with redis_lock(self.lock_id, blocking=False) as acquired:
298
            if acquired:
299
                # do stuff and return something
300
                return self.fetch_status()
301
302
        message = "%s already running!" % (self.name)
303
304
        logger.warning(message)
305
306
        return message
307
308
    def fetch_status(self):
309
        """
310
        Fetch status from pending submissions. Called from
311
        :py:meth:`run`, handles exceptions from USI, select
312
        all :py:class:`Submission <image_app.models.Submission>` objects
313
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
314
        from :ref:`UID <The Unified Internal Database>` and call
315
        :py:meth:`fetch_queryset` with this data
316
        """
317
318
        logger.info("fetch_status started")
319
320
        # search for submission with SUBMITTED status. Other submission are
321
        # not yet finalized. This function need to be called by exclusives
322
        # tasks
323
        qs = Submission.objects.filter(status=SUBMITTED)
324
325
        # check for queryset length
326
        if qs.count() != 0:
327
            try:
328
                # fetch biosample status
329
                self.fetch_queryset(qs)
330
331
            # retry a task under errors
332
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
333
            except ConnectionError as exc:
334
                raise self.retry(exc=exc)
335
336
        else:
337
            logger.debug("No pending submission in UID database")
338
339
        # debug
340
        logger.info("fetch_status completed")
341
342
        return "success"
343
344
    # a function to retrieve biosample submission
345
    def fetch_queryset(self, queryset):
346
        """Fetch biosample against a queryset (a list of
347
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
348
        :py:class:`Submission <image_app.models.Submission>` objects). Iterate
349
        through submission to get USI info. Calls
350
        :py:class:`FetchStatusHelper`
351
        """
352
353
        logger.info("Searching for submissions into biosample")
354
355
        for uid_submission in queryset:
356
            usi_submissions = USISubmission.objects.filter(
357
                uid_submission=uid_submission,
358
                status=SUBMITTED)
359
360
            # HINT: fetch statuses using tasks?
361
            for usi_submission in usi_submissions:
362
                status_helper = FetchStatusHelper(usi_submission)
363
                status_helper.check_submission_status()
364
365
            # set the final status for a submission like SubmissionCompleteTask
366
            retrievalcomplete = RetrievalCompleteTask()
367
368
            # assign kwargs to chord
369
            res = retrievalcomplete.delay(uid_submission_id=uid_submission.id)
370
371
            logger.info(
372
                "Start RetrievalCompleteTask process for %s with task %s" % (
373
                    uid_submission,
374
                    res.task_id))
375
376
        logger.info("fetch_queryset completed")
377
378
379
class RetrievalCompleteTask(TaskFailureMixin, MyTask):
380
    """Update submission status after fetching status"""
381
382
    name = "Complete Retrieval Process"
383
    description = """Check submission status after retrieval nd update stuff"""
384
385
    def run(self, *args, **kwargs):
386
        """Fetch submission data and then update UID submission status"""
387
388
        # get UID submission
389
        uid_submission = Submission.objects.get(
390
            pk=kwargs['uid_submission_id'])
391
392
        # fetch data from database
393
        submission_qs = USISubmission.objects.filter(
394
            uid_submission=uid_submission)
395
396
        # annotate biosample submission by statuses
397
        statuses = {}
398
399
        for res in submission_qs.values('status').annotate(
400
                count=Count('status')):
401
            statuses[res['status']] = res['count']
402
403
        if SUBMITTED in statuses:
404
            # ignoring the other models. No errors thrown until there is
405
            # as SUBMITTED USISubmission
406
            logger.info("Submission %s not yet finished" % uid_submission)
407
408
            return "success"
409
410
        # if there is ANY errors in biosample.models.Submission for a
411
        # particoular submission, I will mark it as ERROR
412
        elif ERROR in statuses:
413
            # submission failed
414
            logger.info("Submission %s failed" % uid_submission)
415
416
            self.__update_message(uid_submission, submission_qs, ERROR)
417
418
            # send a mail to the user
419
            uid_submission.owner.email_user(
420
                "Error in biosample submission %s" % (
421
                    uid_submission.id),
422
                ("Something goes wrong with biosample submission. Please "
423
                 "report this to InjectTool team\n\n"
424
                 "%s" % uid_submission.message),
425
            )
426
427
        # check if submission need revision
428
        elif NEED_REVISION in statuses:
429
            # submission failed
430
            logger.info("Submission %s failed" % uid_submission)
431
432
            self.__update_message(uid_submission, submission_qs, NEED_REVISION)
433
434
            # send a mail to the user
435
            uid_submission.owner.email_user(
436
                "Error in biosample submission %s" % (
437
                    uid_submission.id),
438
                "Some items needs revision:\n\n" + uid_submission.message,
439
            )
440
441
        elif COMPLETED in statuses and len(statuses) == 1:
442
            # if all status are complete, the submission is completed
443
            logger.info(
444
                "Submission %s completed with success" % uid_submission)
445
446
            self.__update_message(uid_submission, submission_qs, COMPLETED)
447
448
        # send async message
449
        send_message(uid_submission)
450
451
        return "success"
452
453
    def __update_message(self, uid_submission, submission_qs, status):
454
        """Read biosample.models.Submission message and set
455
        image_app.models.Submission message"""
456
457
        # get error messages for submission
458
        message = []
459
460
        for submission in submission_qs.filter(status=status):
461
            message.append(submission.message)
462
463
        uid_submission.status = status
464
        uid_submission.message = "\n".join(set(message))
465
        uid_submission.save()
466
467
468
# register explicitly tasks
469
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
470
celery_app.tasks.register(FetchStatusTask)
471
celery_app.tasks.register(RetrievalCompleteTask)
472