RetrievalCompleteTask.__generic_validationsummary()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 42
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 42
rs 9.28
c 0
b 0
f 0
cc 4
nop 3
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
from collections import Counter, defaultdict
12
13
from decouple import AutoConfig
14
from celery.utils.log import get_task_logger
15
16
import pyUSIrest.usi
17
import pyUSIrest.exceptions
18
19
from django.conf import settings
20
from django.contrib.contenttypes.models import ContentType
21
from django.db.models import Count, F
22
from django.utils import timezone
23
24
from image.celery import app as celery_app
25
from uid.helpers import parse_image_alias, get_model_object
26
from uid.models import Submission
27
from common.tasks import BaseTask, NotifyAdminTaskMixin, exclusive_task
28
from common.constants import ERROR, NEED_REVISION, SUBMITTED, COMPLETED
29
from submissions.tasks import SubmissionTaskMixin
30
from validation.models import ValidationResult, ValidationSummary
31
32
from ..helpers import get_manager_auth
33
from ..models import Submission as USISubmission
34
35
# Get an instance of a logger
36
logger = get_task_logger(__name__)
37
38
# define a decouple config object
39
settings_dir = os.path.join(settings.BASE_DIR, 'image')
40
config = AutoConfig(search_path=settings_dir)
41
42
# a threshold of days to determine a very long task
43
MAX_DAYS = 5
44
45
46
# HINT: how this class could be similar to SubmissionHelper?
47
class FetchStatusHelper():
48
    """Helper class to deal with submission data"""
49
50
    # define my class attributes
51
    def __init__(self, usi_submission, auth):
52
        """
53
        Helper function to have info for a biosample.models.Submission
54
55
        Args:
56
            usi_submission (biosample.models.Submission): a biosample
57
                model Submission instance
58
            auth: a pyUSIrest.auth.Auth instance
59
        """
60
61
        # ok those are my default class attributes
62
        self.usi_submission = usi_submission
63
        self.uid_submission = usi_submission.uid_submission
64
65
        # here are pyUSIrest object
66
        self.auth = auth
67
        self.root = pyUSIrest.usi.Root(self.auth)
68
69
        # here I will track the biosample submission
70
        self.submission_name = self.usi_submission.usi_submission_name
71
72
        logger.info(
73
            "Getting info for usi submission '%s'" % (self.submission_name))
74
        self.submission = self.root.get_submission_by_name(
75
            submission_name=self.submission_name)
76
77
    def check_submission_status(self):
78
        """Check submission status, finalize submission, check errors etc"""
79
80
        # reload submission status
81
        self.usi_submission.refresh_from_db()
82
83
        if self.usi_submission.status != SUBMITTED:
84
            # someone else has taken this task and done something. Ignore!
85
            logger.warning("Ignoring submission %s current status is %s" % (
86
                self.usi_submission, self.usi_submission.get_status_display()))
87
            return
88
89
        logger.info("Submission '%s' is currently '%s'" % (
90
            self.submission_name, self.submission.status))
91
92
        # Update submission status if completed
93
        if self.submission.status == 'Completed':
94
            # fetch biosample ids with a proper function
95
            self.complete()
96
97
        elif self.submission.status == 'Draft':
98
            # check for a long task
99
            if self.submission_has_issues():
100
                # return to the caller. I've just marked the submission with
101
                # errors and sent a mail to the user
102
                return
103
104
            # check validation. If it is ok, finalize submission
105
            status = self.submission.get_status()
106
107
            # write status into database
108
            self.usi_submission.samples_status = dict(status)
109
            self.usi_submission.save()
110
111
            # this mean validation statuses, I want to see completed in all
112
            # samples
113
            if len(status) == 1 and 'Complete' in status:
114
                # check for errors and eventually finalize
115
                self.finalize()
116
117
            else:
118
                logger.warning(
119
                    "Biosample validation is not completed yet (%s)" %
120
                    (status))
121
122
        elif self.submission.status == 'Submitted':
123
            # check for a long task
124
            if self.submission_has_issues():
125
                # return to the caller. I've just marked the submission with
126
                # errors and sent a mail to the user
127
                return
128
129
            logger.info(
130
                "Submission '%s' is '%s'. Waiting for biosample ids" % (
131
                    self.submission_name,
132
                    self.submission.status))
133
134
            # debug submission status
135
            document = self.submission.follow_url(
136
                "processingStatusSummary", self.auth)
137
138
            logger.debug(
139
                "Current status for submission '%s' is '%s'" % (
140
                    self.submission_name, document.data))
141
142
        elif self.submission.status == 'Processing':
143
            # check for a long task
144
            if self.submission_has_issues():
145
                # return to the caller. I've just marked the submission with
146
                # errors and sent a mail to the user
147
                return
148
149
            logger.debug(
150
                "Submission '%s' is '%s'. Still waiting from BioSamples" % (
151
                    self.submission_name,
152
                    self.submission.status))
153
154
        else:
155
            # HINT: thrown an exception?
156
            logger.warning("Unknown status '%s' for submission '%s'" % (
157
                self.submission.status,
158
                self.submission_name))
159
160
        logger.debug("Checking status for '%s' completed" % (
161
            self.submission_name))
162
163
    def submission_has_issues(self):
164
        """
165
        Check that biosample submission has not issues. For example, that
166
        it will remain in the same status for a long time
167
168
        Returns:
169
            bool: True if an issue is detected
170
        """
171
172
        logger.debug(
173
            "Check if submission '%s' remained in the same status "
174
            "for a long time" % (
175
                self.submission_name))
176
177
        if (timezone.now() - self.usi_submission.updated_at).days > MAX_DAYS:
178
            message = (
179
                "Biosample submission '%s' remained with the same status "
180
                "for more than %s days. Please report it to InjectTool "
181
                "team" % (self.submission_name, MAX_DAYS))
182
183
            self.usi_submission.status = ERROR
184
            self.usi_submission.message = message
185
            self.usi_submission.save()
186
187
            logger.error(
188
                "Errors for submission: %s" % (
189
                    self.submission_name))
190
            logger.error(message)
191
192
            return True
193
194
        else:
195
            return False
196
197
    def sample_has_errors(self, sample, table, pk):
198
        """
199
        Helper metod to mark a (animal/sample) with its own errors. Table
200
        sould be Animal or Sample to update the approriate object. Sample
201
        is a USI sample object
202
203
        Args:
204
            sample (pyUSIrest.usi.sample): a USI sample object
205
            table (str): ``Animal`` or ``Sample``, mean the table where this
206
                object should be searched
207
            pk (int): table primary key
208
        """
209
210
        # get sample/animal object relying on table name and pk
211
        sample_obj = get_model_object(table, pk)
212
213
        sample_obj.status = NEED_REVISION
214
        sample_obj.save()
215
216
        # get a USI validation result
217
        validation_result = sample.get_validation_result()
218
219
        # track errors  in validation tables
220
        errorMessages = validation_result.errorMessages
221
222
        # since I validated this object, I have already a ValidationResult
223
        # object associated to my model
224
        sample_obj.validationresult.status = 'Error'
225
        sample_obj.validationresult.messages = [
226
            "%s: %s" % (k, v) for k, v in errorMessages.items()]
227
        sample_obj.validationresult.save()
228
229
        # need to update ValidationSummary table, since here I know if this
230
        # is a sample or an animal
231
        summary = self.uid_submission.validationsummary_set.filter(
232
            type=table.lower()).first()
233
234
        # now update query using django F function. Decrease pass count
235
        # an increase error count for this object
236
        # HINT: should I define message here?
237
        summary.pass_count = F('pass_count') - 1
238
        summary.error_count = F('error_count') + 1
239
        summary.issues_count = F('issues_count') + 1
240
        summary.save()
241
242
        # return an error for each object
243
        return {str(sample_obj): errorMessages}
244
245
    def finalize(self):
246
        """Finalize a submission by closing document and send it to
247
        biosample"""
248
249
        logger.info("Finalizing submission '%s'" % (
250
            self.submission_name))
251
252
        # get errors for a submission
253
        errors = self.submission.has_errors()
254
255
        # collect all error messages in a list
256
        messages = []
257
258
        if True in errors:
259
            # get sample with errors then update database
260
            samples = self.submission.get_samples(has_errors=True)
261
262
            for sample in samples:
263
                # derive pk and table from alias
264
                table, pk = parse_image_alias(sample.alias)
265
266
                # need to check if this sample/animals has errors or not
267
                if sample.has_errors():
268
                    logger.warning(
269
                        "%s in table %s has errors!!!" % (sample, table))
270
271
                    # mark this sample since has problems
272
                    errorMessages = self.sample_has_errors(
273
                        sample, table, pk)
274
275
                    # append this into error messages list
276
                    messages.append(errorMessages)
277
278
                # if a sample has no errors, status will be the same
279
280
            logger.error(
281
                "Errors for submission: '%s'" % (self.submission_name))
282
            logger.error("Fix them, then finalize")
283
284
            # report error
285
            message = json.dumps(messages, indent=2)
286
287
            # Update status for biosample.models.Submission
288
            self.usi_submission.status = NEED_REVISION
289
            self.usi_submission.message = message
290
            self.usi_submission.save()
291
292
        else:
293
            # raising an exception while finalizing will result
294
            # in a failed task.
295
            # TODO: model and test exception in finalization
296
            self.submission.finalize()
297
298
    def complete(self):
299
        """Complete a submission and fetch biosample names"""
300
301
        logger.info("Completing submission '%s'" % (
302
            self.submission_name))
303
304
        for sample in self.submission.get_samples():
305
            # derive pk and table from alias
306
            table, pk = parse_image_alias(sample.alias)
307
308
            # if no accession, return without doing anything
309
            if sample.accession is None:
310
                logger.error("No accession found for sample '%s'" % (sample))
311
                logger.error("Ignoring submission '%s'" % (self.submission))
312
                return
313
314
            # get sample/animal object relying on table name and pk
315
            sample_obj = get_model_object(table, pk)
316
317
            # update statuses
318
            sample_obj.status = COMPLETED
319
            sample_obj.biosample_id = sample.accession
320
            sample_obj.save()
321
322
        # update submission
323
        self.usi_submission.status = COMPLETED
324
        self.usi_submission.message = "Successful submission into biosample"
325
        self.usi_submission.save()
326
327
        logger.info(
328
            "Submission %s is now completed and recorded into UID" % (
329
                self.submission))
330
331
332
class FetchStatusTask(NotifyAdminTaskMixin, BaseTask):
333
    name = "Fetch USI status"
334
    description = """Fetch biosample using USI API"""
335
336
    @exclusive_task(task_name="Fetch USI status", lock_id="FetchStatusTask")
337
    def run(self):
338
        """
339
        This function is called when delay is called. It will acquire a lock
340
        in redis, so those tasks are mutually exclusive
341
342
        Returns:
343
            str: success if everything is ok. Different messages if task is
344
            already running or exception is caught"""
345
346
        # debugging instance
347
        self.debug_task()
348
349
        # do stuff and return something
350
        return self.fetch_status()
351
352
    def fetch_status(self):
353
        """
354
        Fetch status from pending submissions. Called from
355
        :py:meth:`run`, handles exceptions from USI, select
356
        all :py:class:`Submission <uid.models.Submission>` objects
357
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
358
        from :ref:`UID <The Unified Internal Database>` and call
359
        :py:meth:`fetch_queryset` with this data
360
        """
361
362
        logger.info("fetch_status started")
363
364
        # search for submission with SUBMITTED status. Other submission are
365
        # not yet finalized. This function need to be called by exclusives
366
        # tasks
367
        qs = Submission.objects.filter(status=SUBMITTED)
368
369
        # check for queryset length
370
        if qs.count() != 0:
371
            try:
372
                # fetch biosample status
373
                self.fetch_queryset(qs)
374
375
            # retry a task under errors
376
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
377
            except pyUSIrest.exceptions.USIConnectionError as exc:
378
                raise self.retry(exc=exc)
379
380
        else:
381
            logger.debug("No pending submission in UID database")
382
383
        # debug
384
        logger.info("fetch_status completed")
385
386
        return "success"
387
388
    # a function to retrieve biosample submission
389
    def fetch_queryset(self, queryset):
390
        """Fetch biosample against a queryset (a list of
391
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
392
        :py:class:`Submission <uid.models.Submission>` objects). Iterate
393
        through submission to get USI info. Calls
394
        :py:class:`FetchStatusHelper`
395
        """
396
397
        logger.debug("get an pyUSIrest.auth.Auth object")
398
399
        auth = get_manager_auth()
400
401
        logger.info("Searching for submissions into biosample")
402
403
        for uid_submission in queryset:
404
            logger.info("getting USI submission for UID '%s'" % (
405
                uid_submission))
406
            usi_submissions = USISubmission.objects.filter(
407
                uid_submission=uid_submission,
408
                status=SUBMITTED)
409
410
            # HINT: fetch statuses using tasks?
411
            for usi_submission in usi_submissions:
412
                status_helper = FetchStatusHelper(usi_submission, auth)
413
                status_helper.check_submission_status()
414
415
            # set the final status for a submission like SubmissionCompleteTask
416
            retrievalcomplete = RetrievalCompleteTask()
417
418
            # assign kwargs to chord
419
            res = retrievalcomplete.delay(uid_submission_id=uid_submission.id)
420
421
            logger.info(
422
                "Start RetrievalCompleteTask process for '%s' "
423
                "with task '%s'" % (uid_submission, res.task_id))
424
425
        logger.info("fetch_queryset completed")
426
427
428
class RetrievalCompleteTask(SubmissionTaskMixin, BaseTask):
429
    """Update submission status after fetching status"""
430
431
    name = "Complete Retrieval Process"
432
    description = """Check submission status after retrieval nd update stuff"""
433
    action = "biosample retrieval"
434
435
    def run(self, *args, **kwargs):
436
        """Fetch submission data and then update UID submission status"""
437
438
        logger.info("RetrievalCompleteTask started")
439
440
        # get UID submission
441
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
442
443
        # fetch data from database
444
        submission_qs = USISubmission.objects.filter(
445
            uid_submission=uid_submission)
446
447
        # annotate biosample submission by statuses
448
        statuses = {}
449
450
        for res in submission_qs.values('status').annotate(
451
                count=Count('status')):
452
            statuses[res['status']] = res['count']
453
454
        if SUBMITTED in statuses:
455
            # ignoring the other models. No errors thrown until there is
456
            # as SUBMITTED USISubmission
457
            logger.info("Submission %s not yet finished" % uid_submission)
458
459
            return "success"
460
461
        # if there is ANY errors in biosample.models.Submission for a
462
        # particoular submission, I will mark it as ERROR
463
        elif ERROR in statuses:
464
            # submission failed
465
            logger.info("Submission %s failed" % uid_submission)
466
467
            # update validationsummary
468
            self.update_validationsummary(uid_submission)
469
470
            self.update_message(uid_submission, submission_qs, ERROR)
471
472
            # send a mail to the user
473
            subject = "Error in biosample submission %s" % (
474
                uid_submission.id)
475
            body = (
476
                "Something goes wrong with biosample submission. Please "
477
                "report this to InjectTool team\n\n"
478
                "%s" % uid_submission.message)
479
480
            self.mail_to_owner(uid_submission, subject, body)
481
482
        # check if submission need revision
483
        elif NEED_REVISION in statuses:
484
            # submission failed
485
            logger.info("Submission %s failed" % uid_submission)
486
487
            # update validationsummary
488
            self.update_validationsummary(uid_submission)
489
490
            self.update_message(uid_submission, submission_qs, NEED_REVISION)
491
492
            # send a mail to the user
493
            subject = "Error in biosample submission %s" % (
494
                uid_submission.id)
495
            body = "Some items needs revision:\n\n" + uid_submission.message
496
497
            self.mail_to_owner(uid_submission, subject, body)
498
499
        elif COMPLETED in statuses and len(statuses) == 1:
500
            # if all status are complete, the submission is completed
501
            logger.info(
502
                "Submission %s completed with success" % uid_submission)
503
504
            self.update_message(uid_submission, submission_qs, COMPLETED)
505
506
        logger.info("RetrievalCompleteTask completed")
507
508
        return "success"
509
510
    def update_message(self, uid_submission, submission_qs, status):
511
        """Read biosample.models.Submission message and set
512
        uid.models.Submission message relying on status"""
513
514
        # get error messages for submission
515
        message = []
516
517
        for submission in submission_qs.filter(status=status):
518
            message.append(submission.message)
519
520
        self.update_submission_status(
521
            uid_submission,
522
            status,
523
            "\n".join(set(message)),
524
            construct_message=True)
525
526
    def update_validationsummary(self, uid_submission):
527
        """Update validationsummary message after our USI submission is
528
        completed (with errors or not)"""
529
530
        self.__generic_validationsummary(uid_submission, "animal")
531
        self.__generic_validationsummary(uid_submission, "sample")
532
533
    def __generic_validationsummary(self, uid_submission, model):
534
        # when arriving here, I have processed the USI results and maybe
535
        # i have update validationresult accordingly
536
        logger.debug("Update validationsummary(%s) for %s" % (
537
            model, uid_submission))
538
539
        model_type = ContentType.objects.get(app_label='uid', model=model)
540
541
        # get validation results querysets
542
        model_qs = ValidationResult.objects.filter(
543
            submission=uid_submission,
544
            content_type=model_type,
545
            status="Error")
546
547
        # ok now prepare messages
548
        messages_counter = Counter()
549
        messages_ids = defaultdict(list)
550
551
        for item in model_qs:
552
            for message in item.messages:
553
                messages_counter.update([message])
554
                messages_ids[message].append(item.content_object.id)
555
556
        # ok preparing to update summary object
557
        model_summary = ValidationSummary.objects.get(
558
            submission=uid_submission, type=model)
559
        messages = []
560
561
        # create messages. No offending column since is not always possible
562
        # to determine a column from USI error. With this, I have a record
563
        # in ValidationSummary page, but I don't have a link to batch update
564
        # since is not possible to determine and change a column in my
565
        # data
566
        for message, count in messages_counter.items():
567
            messages.append({
568
                'message': message,
569
                'count': count,
570
                'ids': messages_ids[message],
571
                'offending_column': ''})
572
573
        model_summary.messages = messages
574
        model_summary.save()
575
576
577
# register explicitly tasks
578
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
579
celery_app.tasks.register(FetchStatusTask)
580
celery_app.tasks.register(RetrievalCompleteTask)
581