Passed
Push — master ( bd2622...f167a9 )
by Paolo
06:51
created

RetrievalCompleteTask.update_validationsummary()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
from collections import Counter, defaultdict
12
13
from decouple import AutoConfig
14
from celery.utils.log import get_task_logger
15
16
import pyUSIrest.usi
17
import pyUSIrest.exceptions
18
19
from django.conf import settings
20
from django.contrib.contenttypes.models import ContentType
21
from django.db.models import Count, F
22
from django.utils import timezone
23
24
from image.celery import app as celery_app
25
from uid.helpers import parse_image_alias, get_model_object
26
from uid.models import Submission
27
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
28
from common.constants import ERROR, NEED_REVISION, SUBMITTED, COMPLETED
29
from submissions.tasks import SubmissionTaskMixin
30
from validation.models import ValidationResult, ValidationSummary
31
32
from ..helpers import get_manager_auth
33
from ..models import Submission as USISubmission
34
35
# Get an instance of a logger
36
logger = get_task_logger(__name__)
37
38
# define a decouple config object
39
settings_dir = os.path.join(settings.BASE_DIR, 'image')
40
config = AutoConfig(search_path=settings_dir)
41
42
# a threshold of days to determine a very long task
43
MAX_DAYS = 5
44
45
46
# HINT: how this class could be similar to SubmissionHelper?
47
class FetchStatusHelper():
48
    """Helper class to deal with submission data"""
49
50
    # define my class attributes
51
    def __init__(self, usi_submission):
52
        """
53
        Helper function to have info for a biosample.models.Submission
54
55
        Args:
56
            usi_submission (biosample.models.Submission): a biosample
57
                model Submission instance
58
        """
59
60
        # ok those are my default class attributes
61
        self.usi_submission = usi_submission
62
        self.uid_submission = usi_submission.uid_submission
63
64
        # here are pyUSIrest object
65
        self.auth = get_manager_auth()
66
        self.root = pyUSIrest.usi.Root(self.auth)
67
68
        # here I will track the biosample submission
69
        self.submission_name = self.usi_submission.usi_submission_name
70
71
        logger.info(
72
            "Getting info for usi submission '%s'" % (self.submission_name))
73
        self.submission = self.root.get_submission_by_name(
74
            submission_name=self.submission_name)
75
76
    def check_submission_status(self):
77
        """Check submission status, finalize submission, check errors etc"""
78
79
        # reload submission status
80
        self.usi_submission.refresh_from_db()
81
82
        if self.usi_submission.status != SUBMITTED:
83
            # someone else has taken this task and done something. Ignore!
84
            logger.warning("Ignoring submission %s current status is %s" % (
85
                self.usi_submission, self.usi_submission.get_status_display()))
86
            return
87
88
        logger.info("Submission '%s' is currently '%s'" % (
89
            self.submission_name, self.submission.status))
90
91
        # Update submission status if completed
92
        if self.submission.status == 'Completed':
93
            # fetch biosample ids with a proper function
94
            self.complete()
95
96
        elif self.submission.status == 'Draft':
97
            # check for a long task
98
            if self.submission_has_issues():
99
                # return to the caller. I've just marked the submission with
100
                # errors and sent a mail to the user
101
                return
102
103
            # check validation. If it is ok, finalize submission
104
            status = self.submission.get_status()
105
106
            # write status into database
107
            self.usi_submission.samples_status = dict(status)
108
            self.usi_submission.save()
109
110
            # this mean validation statuses, I want to see completed in all
111
            # samples
112
            if len(status) == 1 and 'Complete' in status:
113
                # check for errors and eventually finalize
114
                self.finalize()
115
116
            else:
117
                logger.warning(
118
                    "Biosample validation is not completed yet (%s)" %
119
                    (status))
120
121
        elif self.submission.status == 'Submitted':
122
            # check for a long task
123
            if self.submission_has_issues():
124
                # return to the caller. I've just marked the submission with
125
                # errors and sent a mail to the user
126
                return
127
128
            logger.info(
129
                "Submission '%s' is '%s'. Waiting for biosample ids" % (
130
                    self.submission_name,
131
                    self.submission.status))
132
133
            # debug submission status
134
            document = self.submission.follow_url(
135
                "processingStatusSummary", self.auth)
136
137
            logger.debug(
138
                "Current status for submission '%s' is '%s'" % (
139
                    self.submission_name, document.data))
140
141
        elif self.submission.status == 'Processing':
142
            # check for a long task
143
            if self.submission_has_issues():
144
                # return to the caller. I've just marked the submission with
145
                # errors and sent a mail to the user
146
                return
147
148
            logger.debug(
149
                "Submission '%s' is '%s'. Still waiting from BioSamples" % (
150
                    self.submission_name,
151
                    self.submission.status))
152
153
        else:
154
            # HINT: thrown an exception?
155
            logger.warning("Unknown status '%s' for submission '%s'" % (
156
                self.submission.status,
157
                self.submission_name))
158
159
        logger.debug("Checking status for '%s' completed" % (
160
            self.submission_name))
161
162
    def submission_has_issues(self):
163
        """
164
        Check that biosample submission has not issues. For example, that
165
        it will remain in the same status for a long time
166
167
        Returns:
168
            bool: True if an issue is detected
169
        """
170
171
        logger.debug(
172
            "Check if submission '%s' remained in the same status "
173
            "for a long time" % (
174
                self.submission_name))
175
176
        if (timezone.now() - self.usi_submission.updated_at).days > MAX_DAYS:
177
            message = (
178
                "Biosample submission '%s' remained with the same status "
179
                "for more than %s days. Please report it to InjectTool "
180
                "team" % (self.submission_name, MAX_DAYS))
181
182
            self.usi_submission.status = ERROR
183
            self.usi_submission.message = message
184
            self.usi_submission.save()
185
186
            logger.error(
187
                "Errors for submission: %s" % (
188
                    self.submission_name))
189
            logger.error(message)
190
191
            return True
192
193
        else:
194
            return False
195
196
    def sample_has_errors(self, sample, table, pk):
197
        """
198
        Helper metod to mark a (animal/sample) with its own errors. Table
199
        sould be Animal or Sample to update the approriate object. Sample
200
        is a USI sample object
201
202
        Args:
203
            sample (pyUSIrest.usi.sample): a USI sample object
204
            table (str): ``Animal`` or ``Sample``, mean the table where this
205
                object should be searched
206
            pk (int): table primary key
207
        """
208
209
        # get sample/animal object relying on table name and pk
210
        sample_obj = get_model_object(table, pk)
211
212
        sample_obj.status = NEED_REVISION
213
        sample_obj.save()
214
215
        # get a USI validation result
216
        validation_result = sample.get_validation_result()
217
218
        # track errors  in validation tables
219
        errorMessages = validation_result.errorMessages
220
221
        # since I validated this object, I have already a ValidationResult
222
        # object associated to my model
223
        sample_obj.validationresult.status = 'Error'
224
        sample_obj.validationresult.messages = [
225
            "%s: %s" % (k, v) for k, v in errorMessages.items()]
226
        sample_obj.validationresult.save()
227
228
        # need to update ValidationSummary table, since here I know if this
229
        # is a sample or an animal
230
        summary = self.uid_submission.validationsummary_set.filter(
231
            type=table.lower()).first()
232
233
        # now update query using django F function. Decrease pass count
234
        # an increase error count for this object
235
        # HINT: should I define message here?
236
        summary.pass_count = F('pass_count') - 1
237
        summary.error_count = F('error_count') + 1
238
        summary.issues_count = F('issues_count') + 1
239
        summary.save()
240
241
        # return an error for each object
242
        return {str(sample_obj): errorMessages}
243
244
    def finalize(self):
245
        """Finalize a submission by closing document and send it to
246
        biosample"""
247
248
        logger.info("Finalizing submission '%s'" % (
249
            self.submission_name))
250
251
        # get errors for a submission
252
        errors = self.submission.has_errors()
253
254
        # collect all error messages in a list
255
        messages = []
256
257
        if True in errors:
258
            # get sample with errors then update database
259
            samples = self.submission.get_samples(has_errors=True)
260
261
            for sample in samples:
262
                # derive pk and table from alias
263
                table, pk = parse_image_alias(sample.alias)
264
265
                # need to check if this sample/animals has errors or not
266
                if sample.has_errors():
267
                    logger.warning(
268
                        "%s in table %s has errors!!!" % (sample, table))
269
270
                    # mark this sample since has problems
271
                    errorMessages = self.sample_has_errors(
272
                        sample, table, pk)
273
274
                    # append this into error messages list
275
                    messages.append(errorMessages)
276
277
                # if a sample has no errors, status will be the same
278
279
            logger.error(
280
                "Errors for submission: '%s'" % (self.submission_name))
281
            logger.error("Fix them, then finalize")
282
283
            # report error
284
            message = json.dumps(messages, indent=2)
285
286
            # Update status for biosample.models.Submission
287
            self.usi_submission.status = NEED_REVISION
288
            self.usi_submission.message = message
289
            self.usi_submission.save()
290
291
        else:
292
            # raising an exception while finalizing will result
293
            # in a failed task.
294
            # TODO: model and test exception in finalization
295
            self.submission.finalize()
296
297
    def complete(self):
298
        """Complete a submission and fetch biosample names"""
299
300
        logger.info("Completing submission '%s'" % (
301
            self.submission_name))
302
303
        for sample in self.submission.get_samples():
304
            # derive pk and table from alias
305
            table, pk = parse_image_alias(sample.alias)
306
307
            # if no accession, return without doing anything
308
            if sample.accession is None:
309
                logger.error("No accession found for sample '%s'" % (sample))
310
                logger.error("Ignoring submission '%s'" % (self.submission))
311
                return
312
313
            # get sample/animal object relying on table name and pk
314
            sample_obj = get_model_object(table, pk)
315
316
            # update statuses
317
            sample_obj.status = COMPLETED
318
            sample_obj.biosample_id = sample.accession
319
            sample_obj.save()
320
321
        # update submission
322
        self.usi_submission.status = COMPLETED
323
        self.usi_submission.message = "Successful submission into biosample"
324
        self.usi_submission.save()
325
326
        logger.info(
327
            "Submission %s is now completed and recorded into UID" % (
328
                self.submission))
329
330
331
class FetchStatusTask(NotifyAdminTaskMixin, BaseTask):
332
    name = "Fetch USI status"
333
    description = """Fetch biosample using USI API"""
334
335
    @exclusive_task(task_name="Fetch USI status", lock_id="FetchStatusTask")
336
    def run(self):
337
        """
338
        This function is called when delay is called. It will acquire a lock
339
        in redis, so those tasks are mutually exclusive
340
341
        Returns:
342
            str: success if everything is ok. Different messages if task is
343
            already running or exception is caught"""
344
345
        # debugging instance
346
        self.debug_task()
347
348
        # do stuff and return something
349
        return self.fetch_status()
350
351
    def fetch_status(self):
352
        """
353
        Fetch status from pending submissions. Called from
354
        :py:meth:`run`, handles exceptions from USI, select
355
        all :py:class:`Submission <uid.models.Submission>` objects
356
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
357
        from :ref:`UID <The Unified Internal Database>` and call
358
        :py:meth:`fetch_queryset` with this data
359
        """
360
361
        logger.info("fetch_status started")
362
363
        # search for submission with SUBMITTED status. Other submission are
364
        # not yet finalized. This function need to be called by exclusives
365
        # tasks
366
        qs = Submission.objects.filter(status=SUBMITTED)
367
368
        # check for queryset length
369
        if qs.count() != 0:
370
            try:
371
                # fetch biosample status
372
                self.fetch_queryset(qs)
373
374
            # retry a task under errors
375
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
376
            except pyUSIrest.exceptions.USIConnectionError as exc:
377
                raise self.retry(exc=exc)
378
379
        else:
380
            logger.debug("No pending submission in UID database")
381
382
        # debug
383
        logger.info("fetch_status completed")
384
385
        return "success"
386
387
    # a function to retrieve biosample submission
388
    def fetch_queryset(self, queryset):
389
        """Fetch biosample against a queryset (a list of
390
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
391
        :py:class:`Submission <uid.models.Submission>` objects). Iterate
392
        through submission to get USI info. Calls
393
        :py:class:`FetchStatusHelper`
394
        """
395
396
        logger.info("Searching for submissions into biosample")
397
398
        for uid_submission in queryset:
399
            logger.info("getting USI submission for UID '%s'" % (
400
                uid_submission))
401
            usi_submissions = USISubmission.objects.filter(
402
                uid_submission=uid_submission,
403
                status=SUBMITTED)
404
405
            # HINT: fetch statuses using tasks?
406
            for usi_submission in usi_submissions:
407
                status_helper = FetchStatusHelper(usi_submission)
408
                status_helper.check_submission_status()
409
410
            # set the final status for a submission like SubmissionCompleteTask
411
            retrievalcomplete = RetrievalCompleteTask()
412
413
            # assign kwargs to chord
414
            res = retrievalcomplete.delay(uid_submission_id=uid_submission.id)
415
416
            logger.info(
417
                "Start RetrievalCompleteTask process for '%s' "
418
                "with task '%s'" % (uid_submission, res.task_id))
419
420
        logger.info("fetch_queryset completed")
421
422
423
class RetrievalCompleteTask(SubmissionTaskMixin, BaseTask):
424
    """Update submission status after fetching status"""
425
426
    name = "Complete Retrieval Process"
427
    description = """Check submission status after retrieval nd update stuff"""
428
    action = "biosample retrieval"
429
430
    def run(self, *args, **kwargs):
431
        """Fetch submission data and then update UID submission status"""
432
433
        logger.info("RetrievalCompleteTask started")
434
435
        # get UID submission
436
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
437
438
        # fetch data from database
439
        submission_qs = USISubmission.objects.filter(
440
            uid_submission=uid_submission)
441
442
        # annotate biosample submission by statuses
443
        statuses = {}
444
445
        for res in submission_qs.values('status').annotate(
446
                count=Count('status')):
447
            statuses[res['status']] = res['count']
448
449
        if SUBMITTED in statuses:
450
            # ignoring the other models. No errors thrown until there is
451
            # as SUBMITTED USISubmission
452
            logger.info("Submission %s not yet finished" % uid_submission)
453
454
            return "success"
455
456
        # if there is ANY errors in biosample.models.Submission for a
457
        # particoular submission, I will mark it as ERROR
458
        elif ERROR in statuses:
459
            # submission failed
460
            logger.info("Submission %s failed" % uid_submission)
461
462
            # update validationsummary
463
            self.update_validationsummary(uid_submission)
464
465
            self.update_message(uid_submission, submission_qs, ERROR)
466
467
            # send a mail to the user
468
            subject = "Error in biosample submission %s" % (
469
                uid_submission.id)
470
            body = (
471
                "Something goes wrong with biosample submission. Please "
472
                "report this to InjectTool team\n\n"
473
                "%s" % uid_submission.message)
474
475
            self.mail_to_owner(uid_submission, subject, body)
476
477
        # check if submission need revision
478
        elif NEED_REVISION in statuses:
479
            # submission failed
480
            logger.info("Submission %s failed" % uid_submission)
481
482
            # update validationsummary
483
            self.update_validationsummary(uid_submission)
484
485
            self.update_message(uid_submission, submission_qs, NEED_REVISION)
486
487
            # send a mail to the user
488
            subject = "Error in biosample submission %s" % (
489
                uid_submission.id)
490
            body = "Some items needs revision:\n\n" + uid_submission.message
491
492
            self.mail_to_owner(uid_submission, subject, body)
493
494
        elif COMPLETED in statuses and len(statuses) == 1:
495
            # if all status are complete, the submission is completed
496
            logger.info(
497
                "Submission %s completed with success" % uid_submission)
498
499
            self.update_message(uid_submission, submission_qs, COMPLETED)
500
501
        logger.info("RetrievalCompleteTask completed")
502
503
        return "success"
504
505
    def update_message(self, uid_submission, submission_qs, status):
506
        """Read biosample.models.Submission message and set
507
        uid.models.Submission message relying on status"""
508
509
        # get error messages for submission
510
        message = []
511
512
        for submission in submission_qs.filter(status=status):
513
            message.append(submission.message)
514
515
        self.update_submission_status(
516
            uid_submission,
517
            status,
518
            "\n".join(set(message)),
519
            construct_message=True)
520
521
    def update_validationsummary(self, uid_submission):
522
        """Update validationsummary message after our USI submission is
523
        completed (with errors or not)"""
524
525
        self.__generic_validationsummary(uid_submission, "animal")
526
        self.__generic_validationsummary(uid_submission, "sample")
527
528
    def __generic_validationsummary(self, uid_submission, model):
529
        # when arriving here, I have processed the USI results and maybe
530
        # i have update validationresult accordingly
531
        logger.debug("Update validationsummary(%s) for %s" % (
532
            model, uid_submission))
533
534
        model_type = ContentType.objects.get(app_label='uid', model=model)
535
536
        # get validation results querysets
537
        model_qs = ValidationResult.objects.filter(
538
            submission=uid_submission,
539
            content_type=model_type,
540
            status="Error")
541
542
        # ok now prepare messages
543
        messages_counter = Counter()
544
        messages_ids = defaultdict(list)
545
546
        for item in model_qs:
547
            for message in item.messages:
548
                messages_counter.update([message])
549
                messages_ids[message].append(item.content_object.id)
550
551
        # ok preparing to update summary object
552
        model_summary = ValidationSummary.objects.get(
553
            submission=uid_submission, type=model)
554
        messages = []
555
556
        # create messages. No offending column since is not always possible
557
        # to determine a column from USI error. With this, I have a record
558
        # in ValidationSummary page, but I don't have a link to batch update
559
        # since is not possible to determine and change a column in my
560
        # data
561
        for message, count in messages_counter.items():
562
            messages.append({
563
                'message': message,
564
                'count': count,
565
                'ids': messages_ids[message],
566
                'offending_column': ''})
567
568
        model_summary.messages = messages
569
        model_summary.save()
570
571
572
# register explicitly tasks
573
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
574
celery_app.tasks.register(FetchStatusTask)
575
celery_app.tasks.register(RetrievalCompleteTask)
576