Completed
Push — master ( 2c16e2...2c16e2 )
by Paolo
13s queued 11s
created

RetrievalCompleteTask.update_message()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 12
rs 10
c 0
b 0
f 0
cc 2
nop 4
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
12
from decouple import AutoConfig
13
from celery.utils.log import get_task_logger
14
15
import pyUSIrest.client
16
17
from django.conf import settings
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from image.celery import app as celery_app
22
from image_app.helpers import parse_image_alias, get_model_object
23
from image_app.models import Submission
24
from common.tasks import BaseTask, ExclusiveTask, NotifyAdminTaskMixin
25
from common.constants import (
26
    ERROR, NEED_REVISION, SUBMITTED, COMPLETED)
27
from submissions.tasks import SubmissionTaskMixin
28
29
from ..helpers import get_manager_auth
30
from ..models import Submission as USISubmission
31
32
# Get an instance of a logger
33
logger = get_task_logger(__name__)
34
35
# define a decouple config object
36
settings_dir = os.path.join(settings.BASE_DIR, 'image')
37
config = AutoConfig(search_path=settings_dir)
38
39
# a threshold of days to determine a very long task
40
MAX_DAYS = 5
41
42
43
# HINT: how this class could be similar to SubmissionHelper?
44
class FetchStatusHelper():
45
    """Helper class to deal with submission data"""
46
47
    # define my class attributes
48
    def __init__(self, usi_submission):
49
        """
50
        Helper function to have info for a biosample.models.Submission
51
52
        Args:
53
            usi_submission (biosample.models.Submission): a biosample
54
                model Submission instance
55
        """
56
57
        # ok those are my default class attributes
58
        self.usi_submission = usi_submission
59
        self.uid_submission = usi_submission.uid_submission
60
61
        # here are pyUSIrest object
62
        self.auth = get_manager_auth()
63
        self.root = pyUSIrest.client.Root(self.auth)
64
65
        # here I will track the biosample submission
66
        self.submission_name = self.usi_submission.usi_submission_name
67
        self.submission = self.root.get_submission_by_name(
68
            submission_name=self.submission_name)
69
70
    def check_submission_status(self):
71
        """Check submission status, finalize submission, check errors etc"""
72
73
        # reload submission status
74
        self.usi_submission.refresh_from_db()
75
76
        if self.usi_submission.status != SUBMITTED:
77
            # someone else has taken this task and done something. Ignore!
78
            logger.warning("Ignoring submission %s current status is %s" % (
79
                self.usi_submission, self.usi_submission.get_status_display()))
80
            return
81
82
        logger.debug("Checking status for '%s'" % (
83
            self.submission_name))
84
85
        # Update submission status if completed
86
        if self.submission.status == 'Completed':
87
            # fetch biosample ids with a proper function
88
            self.complete()
89
90
        elif self.submission.status == 'Draft':
91
            # check for a long task
92
            if self.submission_has_issues():
93
                # return to the caller. I've just marked the submission with
94
                # errors and sent a mail to the user
95
                return
96
97
            # check validation. If it is ok, finalize submission
98
            status = self.submission.get_status()
99
100
            # this mean validation statuses, I want to see completed in all
101
            # samples
102
            if len(status) == 1 and 'Complete' in status:
103
                # check for errors and eventually finalize
104
                self.finalize()
105
106
            else:
107
                logger.warning(
108
                    "Biosample validation is not completed yet (%s)" %
109
                    (status))
110
111
        elif self.submission.status == 'Submitted':
112
            # check for a long task
113
            if self.submission_has_issues():
114
                # return to the caller. I've just marked the submission with
115
                # errors and sent a mail to the user
116
                return
117
118
            logger.info(
119
                "Submission '%s' is '%s'. Waiting for biosample ids" % (
120
                    self.submission_name,
121
                    self.submission.status))
122
123
            # debug submission status
124
            document = self.submission.follow_url(
125
                "processingStatusSummary", self.auth)
126
127
            logger.debug(
128
                "Current status for submission '%s' is '%s'" % (
129
                    self.submission_name, document.data))
130
131
        else:
132
            # HINT: thrown an exception?
133
            logger.warning("Unknown status '%s' for submission '%s'" % (
134
                self.submission.status,
135
                self.submission_name))
136
137
        logger.debug("Checking status for '%s' completed" % (
138
            self.submission_name))
139
140
    def submission_has_issues(self):
141
        """
142
        Check that biosample submission has not issues. For example, that
143
        it will remain in the same status for a long time
144
145
        Returns:
146
            bool: True if an issue is detected
147
        """
148
149
        logger.debug(
150
            "Check if submission '%s' remained in the same status "
151
            "for a long time" % (
152
                self.submission_name))
153
154
        if (timezone.now() - self.usi_submission.updated_at).days > MAX_DAYS:
155
            message = (
156
                "Biosample submission '%s' remained with the same status "
157
                "for more than %s days. Please report it to InjectTool "
158
                "team" % (self.submission_name, MAX_DAYS))
159
160
            self.usi_submission.status = ERROR
161
            self.usi_submission.message = message
162
            self.usi_submission.save()
163
164
            logger.error(
165
                "Errors for submission: %s" % (
166
                    self.submission_name))
167
            logger.error(message)
168
169
            return True
170
171
        else:
172
            return False
173
174
    def __sample_has_errors(self, sample, table, pk):
175
        """
176
        Helper metod to mark a (animal/sample) with its own errors. Table
177
        sould be Animal or Sample to update the approriate object. Sample
178
        is a USI sample object
179
180
        Args:
181
            sample (pyUSIrest.client.sample): a USI sample object
182
            table (str): ``Animal`` or ``Sample``, mean the table where this
183
                object should be searched
184
            pk (int): table primary key
185
        """
186
187
        # get sample/animal object relying on table name and pk
188
        sample_obj = get_model_object(table, pk)
189
190
        sample_obj.name.status = NEED_REVISION
191
        sample_obj.name.save()
192
193
        # get a USI validation result
194
        validation_result = sample.get_validation_result()
195
196
        # TODO: should I store validation_result error in validation tables?
197
        errorMessages = validation_result.errorMessages
198
199
        # return an error for each object
200
        return {str(sample_obj): errorMessages}
201
202
    def finalize(self):
203
        """Finalize a submission by closing document and send it to
204
        biosample"""
205
206
        logger.debug("Finalizing submission '%s'" % (
207
            self.submission_name))
208
209
        # get errors for a submission
210
        errors = self.submission.has_errors()
211
212
        # collect all error messages in a list
213
        messages = []
214
215
        if True in errors:
216
            # get sample with errors then update database
217
            samples = self.submission.get_samples(has_errors=True)
218
219
            for sample in samples:
220
                # derive pk and table from alias
221
                table, pk = parse_image_alias(sample.alias)
222
223
                # need to check if this sample/animals has errors or not
224
                if sample.has_errors():
225
                    logger.warning(
226
                        "%s in table %s has errors!!!" % (sample, table))
227
228
                    # mark this sample since has problems
229
                    errorMessages = self.__sample_has_errors(
230
                        sample, table, pk)
231
232
                    # append this into error messages list
233
                    messages.append(errorMessages)
234
235
                # if a sample has no errors, status will be the same
236
237
            logger.error(
238
                "Errors for submission: '%s'" % (self.submission_name))
239
            logger.error("Fix them, then finalize")
240
241
            # report error
242
            message = json.dumps(messages, indent=2)
243
244
            # Update status for biosample.models.Submission
245
            self.usi_submission.status = NEED_REVISION
246
            self.usi_submission.message = message
247
            self.usi_submission.save()
248
249
        else:
250
            # raising an exception while finalizing will result
251
            # in a failed task.
252
            # TODO: model and test exception in finalization
253
            self.submission.finalize()
254
255
    def complete(self):
256
        """Complete a submission and fetch name objects"""
257
258
        logger.debug("Completing submission '%s'" % (
259
            self.submission_name))
260
261
        for sample in self.submission.get_samples():
262
            # derive pk and table from alias
263
            table, pk = parse_image_alias(sample.alias)
264
265
            # if no accession, return without doing anything
266
            if sample.accession is None:
267
                logger.error("No accession found for sample '%s'" % (sample))
268
                logger.error("Ignoring submission '%s'" % (self.submission))
269
                return
270
271
            # get sample/animal object relying on table name and pk
272
            sample_obj = get_model_object(table, pk)
273
274
            # update statuses
275
            sample_obj.name.status = COMPLETED
276
            sample_obj.name.biosample_id = sample.accession
277
            sample_obj.name.save()
278
279
        # update submission
280
        self.usi_submission.status = COMPLETED
281
        self.usi_submission.message = "Successful submission into biosample"
282
        self.usi_submission.save()
283
284
        logger.info(
285
            "Submission %s is now completed and recorded into UID" % (
286
                self.submission))
287
288
289
class FetchStatusTask(NotifyAdminTaskMixin, ExclusiveTask):
290
    name = "Fetch USI status"
291
    description = """Fetch biosample using USI API"""
292
    lock_id = "FetchStatusTask"
293
294
    def run(self):
295
        """
296
        This function is called when delay is called. It will acquire a lock
297
        in redis, so those tasks are mutually exclusive
298
299
        Returns:
300
            str: success if everything is ok. Different messages if task is
301
            already running or exception is caught"""
302
303
        # debugging instance
304
        self.debug_task()
305
306
        # do stuff and return something
307
        return self.fetch_status()
308
309
    def fetch_status(self):
310
        """
311
        Fetch status from pending submissions. Called from
312
        :py:meth:`run`, handles exceptions from USI, select
313
        all :py:class:`Submission <image_app.models.Submission>` objects
314
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
315
        from :ref:`UID <The Unified Internal Database>` and call
316
        :py:meth:`fetch_queryset` with this data
317
        """
318
319
        logger.info("fetch_status started")
320
321
        # search for submission with SUBMITTED status. Other submission are
322
        # not yet finalized. This function need to be called by exclusives
323
        # tasks
324
        qs = Submission.objects.filter(status=SUBMITTED)
325
326
        # check for queryset length
327
        if qs.count() != 0:
328
            try:
329
                # fetch biosample status
330
                self.fetch_queryset(qs)
331
332
            # retry a task under errors
333
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
334
            except ConnectionError as exc:
335
                raise self.retry(exc=exc)
336
337
        else:
338
            logger.debug("No pending submission in UID database")
339
340
        # debug
341
        logger.info("fetch_status completed")
342
343
        return "success"
344
345
    # a function to retrieve biosample submission
346
    def fetch_queryset(self, queryset):
347
        """Fetch biosample against a queryset (a list of
348
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
349
        :py:class:`Submission <image_app.models.Submission>` objects). Iterate
350
        through submission to get USI info. Calls
351
        :py:class:`FetchStatusHelper`
352
        """
353
354
        logger.info("Searching for submissions into biosample")
355
356
        for uid_submission in queryset:
357
            usi_submissions = USISubmission.objects.filter(
358
                uid_submission=uid_submission,
359
                status=SUBMITTED)
360
361
            # HINT: fetch statuses using tasks?
362
            for usi_submission in usi_submissions:
363
                status_helper = FetchStatusHelper(usi_submission)
364
                status_helper.check_submission_status()
365
366
            # set the final status for a submission like SubmissionCompleteTask
367
            retrievalcomplete = RetrievalCompleteTask()
368
369
            # assign kwargs to chord
370
            res = retrievalcomplete.delay(uid_submission_id=uid_submission.id)
371
372
            logger.info(
373
                "Start RetrievalCompleteTask process for %s with task %s" % (
374
                    uid_submission,
375
                    res.task_id))
376
377
        logger.info("fetch_queryset completed")
378
379
380
class RetrievalCompleteTask(SubmissionTaskMixin, BaseTask):
381
    """Update submission status after fetching status"""
382
383
    name = "Complete Retrieval Process"
384
    description = """Check submission status after retrieval nd update stuff"""
385
    action = "biosample retrieval"
386
387
    def run(self, *args, **kwargs):
388
        """Fetch submission data and then update UID submission status"""
389
390
        # get UID submission
391
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
392
393
        # fetch data from database
394
        submission_qs = USISubmission.objects.filter(
395
            uid_submission=uid_submission)
396
397
        # annotate biosample submission by statuses
398
        statuses = {}
399
400
        for res in submission_qs.values('status').annotate(
401
                count=Count('status')):
402
            statuses[res['status']] = res['count']
403
404
        if SUBMITTED in statuses:
405
            # ignoring the other models. No errors thrown until there is
406
            # as SUBMITTED USISubmission
407
            logger.info("Submission %s not yet finished" % uid_submission)
408
409
            return "success"
410
411
        # if there is ANY errors in biosample.models.Submission for a
412
        # particoular submission, I will mark it as ERROR
413
        elif ERROR in statuses:
414
            # submission failed
415
            logger.info("Submission %s failed" % uid_submission)
416
417
            self.update_message(uid_submission, submission_qs, ERROR)
418
419
            # send a mail to the user
420
            uid_submission.owner.email_user(
421
                "Error in biosample submission %s" % (
422
                    uid_submission.id),
423
                ("Something goes wrong with biosample submission. Please "
424
                 "report this to InjectTool team\n\n"
425
                 "%s" % uid_submission.message),
426
            )
427
428
        # check if submission need revision
429
        elif NEED_REVISION in statuses:
430
            # submission failed
431
            logger.info("Submission %s failed" % uid_submission)
432
433
            self.update_message(uid_submission, submission_qs, NEED_REVISION)
434
435
            # send a mail to the user
436
            uid_submission.owner.email_user(
437
                "Error in biosample submission %s" % (
438
                    uid_submission.id),
439
                "Some items needs revision:\n\n" + uid_submission.message,
440
            )
441
442
        elif COMPLETED in statuses and len(statuses) == 1:
443
            # if all status are complete, the submission is completed
444
            logger.info(
445
                "Submission %s completed with success" % uid_submission)
446
447
            self.update_message(uid_submission, submission_qs, COMPLETED)
448
449
        return "success"
450
451
    def update_message(self, uid_submission, submission_qs, status):
452
        """Read biosample.models.Submission message and set
453
        image_app.models.Submission message"""
454
455
        # get error messages for submission
456
        message = []
457
458
        for submission in submission_qs.filter(status=status):
459
            message.append(submission.message)
460
461
        self.update_submission_status(
462
            uid_submission, status, "\n".join(set(message)))
463
464
465
# register explicitly tasks
466
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
467
celery_app.tasks.register(FetchStatusTask)
468
celery_app.tasks.register(RetrievalCompleteTask)
469