Completed
Push — master ( 2c16e2...2c16e2 )
by Paolo
13s queued 11s
created

TaskFailureMixin.on_failure()   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 23
rs 9.75
c 0
b 0
f 0
cc 1
nop 6
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from common.tasks import BaseTask, NotifyAdminTaskMixin
23
from image.celery import app as celery_app
24
from image_app.models import Animal
25
from submissions.tasks import SubmissionTaskMixin
26
27
from ..helpers import get_auth
28
from ..models import (
29
    Submission as USISubmission, SubmissionData as USISubmissionData)
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# how many sample for submission
35
MAX_SAMPLES = 100
36
37
38
class SubmissionError(Exception):
39
    """Exception call for Error with submissions"""
40
41
    pass
42
43
44
# HINT: move into helper module?
45
class SubmissionHelper():
46
    """
47
    An helper class for submission task, used to deal with pyUSIrest
48
    """
49
50
    # define my class attributes
51
    def __init__(self, submission_id):
52
53
        # ok those are my default class attributes
54
        self.submission_id = submission_id
55
        self.submission_obj = None
56
        self.token = None
57
58
        # here are pyUSIrest object
59
        self.auth = None
60
        self.root = None
61
62
        # here I will track a USI submission
63
        self.usi_submission = None
64
65
        # here I will store samples already submitted
66
        self.submitted_samples = {}
67
68
        # get a submission object
69
        self.submission_obj = USISubmission.objects.get(
70
            pk=self.submission_id)
71
72
        # HINT: should I check my status?
73
74
    @property
75
    def owner(self):
76
        """Recover owner from a submission object related with a UID
77
        Submission
78
79
        Returns:
80
            :py:attr:`Submission.owner`: a django
81
            :py:class:`django.contrib.auth.models.User` object
82
        """
83
84
        return self.submission_obj.uid_submission.owner
85
86
    @property
87
    def team_name(self):
88
        """Recover team_name from a submission object
89
90
        Returns:
91
            str: the team name"""
92
93
        return self.owner.biosample_account.team.name
94
95
    @property
96
    def usi_submission_name(self):
97
        """Get/set biosample submission id from database
98
99
        Returns:
100
            str: the biosample USI submission identifier"""
101
102
        return self.submission_obj.usi_submission_name
103
104
    @usi_submission_name.setter
105
    def usi_submission_name(self, name):
106
107
        self.submission_obj.usi_submission_name = name
108
        self.submission_obj.save()
109
110
    def read_token(self):
111
        """Read token from REDIS database and set root attribute with a
112
        pyUSIrest.client.Root instance
113
114
        Returns:
115
            str: the read token"""
116
117
        # read biosample token from redis database
118
        client = redis.StrictRedis(
119
            host=settings.REDIS_HOST,
120
            port=settings.REDIS_PORT,
121
            db=settings.REDIS_DB)
122
123
        # infere key from submission data
124
        key = "token:submission:{submission_id}:{user}".format(
125
            submission_id=self.submission_obj.uid_submission.id,
126
            user=self.owner)
127
128
        # create a new auth object
129
        logger.debug("Reading token for '%s'" % self.owner)
130
131
        # getting token from redis db and set submission data
132
        self.token = client.get(key).decode("utf8")
133
134
        # get a root object with auth
135
        self.auth = get_auth(token=self.token)
136
137
        logger.debug("getting biosample root")
138
        self.root = pyUSIrest.client.Root(auth=self.auth)
139
140
        return self.token
141
142
    def start_submission(self):
143
        """
144
        Get a USI submission document. Recover submission if possible,
145
        create a new one if not defined. If recovered submission is
146
        closed, raise an error
147
        """
148
149
        if not self.recover_submission():
150
            self.create_submission()
151
152
        return self.usi_submission
153
154
    def recover_submission(self):
155
        """Try to recover a USI submission document or raise Exception. If
156
        not defined, return false"""
157
158
        # If no usi_submission_name, return False
159
        if not self.usi_submission_name:
160
            return False
161
162
        logger.info("Recovering submission %s for team %s" % (
163
            self.usi_submission_name,
164
            self.team_name))
165
166
        # get the same submission object
167
        self.usi_submission = self.root.get_submission_by_name(
168
            submission_name=self.usi_submission_name)
169
170
        # check that a submission is still editable
171
        if self.usi_submission.status != "Draft":
172
            raise SubmissionError(
173
                "Cannot recover submission '%s': current status is '%s'" % (
174
                   self.usi_submission_name,
175
                   self.usi_submission.status))
176
177
        # read already submitted samples
178
        self.read_samples()
179
180
        return True
181
182
    def create_submission(self):
183
        """Create a new USI submission object
184
185
        Returns:
186
            :py:class:`pyUSIrest.client.Submission` a pyUSIrest submission
187
            object
188
        """
189
190
        # getting team
191
        logger.debug("getting team '%s'" % (self.team_name))
192
        team = self.root.get_team_by_name(self.team_name)
193
194
        # create a new submission
195
        logger.info("Creating a new submission for '%s'" % (team.name))
196
        self.usi_submission = team.create_submission()
197
198
        # track submission_id in table
199
        self.usi_submission_name = self.usi_submission.name
200
201
        # return USI submission document
202
        return self.usi_submission
203
204
    def read_samples(self):
205
        """Read sample in a USI submission document and set submitted_samples
206
        attribute"""
207
208
        # read already submitted samples
209
        logger.debug("Getting info on samples...")
210
        samples = self.usi_submission.get_samples()
211
        logger.debug("Got %s samples" % (len(samples)))
212
213
        for sample in samples:
214
            self.submitted_samples[sample.alias] = sample
215
216
        return self.submitted_samples
217
218
    def create_or_update_sample(self, model):
219
        """Add or patch a sample into USI submission document. Can be
220
        animal or sample
221
222
        Args:
223
            model (:py:class:`image_app.mixins.BioSampleMixin`): An animal or
224
                sample object"""
225
226
        # alias is used to reference the same objects
227
        alias = model.biosample_alias
228
229
        # check in my submitted samples
230
        if alias in self.submitted_samples:
231
            # patch sample
232
            logger.info("Patching %s" % (alias))
233
234
            # get usi sample
235
            sample = self.submitted_samples[alias]
236
            sample.patch(model.to_biosample())
237
238
        else:
239
            sample = self.usi_submission.create_sample(
240
                model.to_biosample())
241
242
            self.submitted_samples[alias] = sample
243
244
        # update sample status
245
        model.name.status = SUBMITTED
246
        model.name.last_submitted = timezone.now()
247
        model.name.save()
248
249
    def add_samples(self):
250
        """Iterate over sample data (animal/sample) and call
251
        create_or_update_sample (if model is in READY state)"""
252
253
        # iterate over sample data
254
        for submission_data in self.submission_obj.submission_data\
255
                .order_by('id'):
256
            # get model for simplicity
257
            model = submission_data.content_object
258
259
            if model.name.status == READY:
260
                logger.debug("Adding %s %s to submission %s" % (
261
                    model._meta.verbose_name,
262
                    model,
263
                    self.usi_submission_name))
264
                self.create_or_update_sample(model)
265
266
            else:
267
                logger.debug("Ignoring %s %s" % (
268
                    model._meta.verbose_name, model))
269
270
    def mark_submission(self, status, message):
271
        self.submission_obj.status = status
272
        self.submission_obj.message = message
273
        self.submission_obj.save()
274
275
    def mark_fail(self, message):
276
        """Set a :py:const:`ERROR <common.constants.ERROR>` status for
277
        :py:class:`biosample.models.Submission` and a message"""
278
279
        self.mark_submission(ERROR, message)
280
281
    def mark_success(self, message="Waiting for biosample validation"):
282
        """Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
283
        :py:class:`biosample.models.Submission` and a message"""
284
285
        self.mark_submission(SUBMITTED, message)
286
287
288
class SubmitTask(NotifyAdminTaskMixin, BaseTask):
289
    name = "Submit to Biosample"
290
    description = """Submit to Biosample using USI"""
291
292
    def run(self, usi_submission_id):
293
        """Run task. Instantiate a SubmissionHelper with the provided
294
        :py:class:`biosample.models.Submission` id. Read token from database,
295
        start or recover a submission, add samples to it and then mark a
296
        status for it
297
        """
298
299
        # get a submission helper object
300
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
301
302
        # No retries, we expect always success
303
        try:
304
            submission_helper.read_token()
305
            submission_helper.start_submission()
306
            submission_helper.add_samples()
307
            submission_helper.mark_success()
308
309
        except ConnectionError as exc:
310
            logger.error("Error in biosample submission: %s" % exc)
311
            message = "Errors in EBI API endpoints. Please try again later"
312
            logger.error(message)
313
314
            # track message in submission object
315
            submission_helper.mark_submission(READY, message)
316
317
        # TODO: should I rename this exception with a more informative name
318
        # when token expires during a submission?
319
        except RuntimeError as exc:
320
            logger.error("Error in biosample submission: %s" % exc)
321
            message = (
322
                "Your token is expired: please submit again to resume "
323
                "submission")
324
            logger.error(message)
325
326
            # track message in submission object
327
            submission_helper.mark_submission(READY, message)
328
329
        except Exception as exc:
330
            logger.error("Unmanaged error: %s" % exc)
331
            # get exception info
332
            einfo = traceback.format_exc()
333
334
            # track traceback in message
335
            submission_helper.mark_fail(einfo)
336
337
        return "success", usi_submission_id
338
339
340
# HINT: move into helper module?
341
class SplitSubmissionHelper():
342
    """
343
    helper class to split py:class`image_app.models.Submission` data in
344
    bacthes limited in sizes"""
345
346
    def __init__(self, uid_submission):
347
        self.counter = 0
348
        self.uid_submission = uid_submission
349
        self.usi_submission = None
350
        self.submission_ids = []
351
352
    def process_data(self):
353
        """Add animal and its samples to a submission"""
354
355
        for animal in Animal.objects.filter(
356
                name__submission=self.uid_submission):
357
358
            # add animal if not yet submitted, or patch it
359
            if animal.name.status == READY:
360
                self.add_to_submission_data(animal)
361
362
            else:
363
                # already submitted, so could be ignored
364
                logger.debug("Ignoring animal %s" % (animal))
365
366
            # Add their specimen
367
            for sample in animal.sample_set.all():
368
                # add sample if not yet submitted
369
                if sample.name.status == READY:
370
                    self.add_to_submission_data(sample)
371
372
                else:
373
                    # already submittes, so could be ignored
374
                    logger.debug("Ignoring sample %s" % (sample))
375
376
            # end of cicle for animal.
377
378
    def create_submission(self):
379
        """
380
        Create a new :py:class:`biosample.models.Submission` object and
381
        set sample counter to 0"""
382
383
        self.usi_submission = USISubmission.objects.create(
384
            uid_submission=self.uid_submission,
385
            status=READY)
386
387
        # track object pks
388
        self.usi_submission.refresh_from_db()
389
        self.submission_ids.append(self.usi_submission.id)
390
391
        logger.debug("Created submission %s" % (self.usi_submission))
392
393
        # reset couter object
394
        self.counter = 0
395
396
    def model_in_submission(self, model):
397
        """
398
        Check if :py:class:`image_app.mixins.BioSampleMixin` is already in an
399
        opened submission"""
400
401
        logger.debug("Searching %s %s in submissions" % (
402
            model._meta.verbose_name,
403
            model))
404
405
        # get content type
406
        ct = ContentType.objects.get_for_model(model)
407
408
        # define a queryset
409
        data_qs = USISubmissionData.objects.filter(
410
            content_type=ct,
411
            object_id=model.id)
412
413
        # exclude opened submission
414
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
415
416
        if data_qs.count() == 1:
417
            usi_submission = data_qs.first().submission
418
419
            logger.debug("Found %s %s in %s" % (
420
                model._meta.verbose_name,
421
                model,
422
                usi_submission))
423
424
            # mark this batch to be called like it was created
425
            if usi_submission.id not in self.submission_ids:
426
                self.submission_ids.append(usi_submission.id)
427
428
                logger.debug(
429
                    "Reset status for submission %s" % (usi_submission))
430
                usi_submission.status = READY
431
                usi_submission.save()
432
433
            return True
434
435
        elif data_qs.count() >= 1:
436
            raise SubmissionError(
437
                "More than one submission opened for %s %s" % (
438
                    model._meta.verbose_name,
439
                    model))
440
441
        else:
442
            # no sample in data. I could append model into submission
443
            logger.debug("No %s %s in submission data" % (
444
                model._meta.verbose_name,
445
                model))
446
            return False
447
448
    def add_to_submission_data(self, model):
449
        """Add a :py:class:`image_app.mixins.BioSampleMixin` to a
450
        :py:class:`biosample.models.Submission` object, or create a new
451
        one if there are more samples than required"""
452
453
        # get model type (animal or sample)
454
        model_type = model._meta.verbose_name
455
456
        # check if model is already in an opened submission
457
        if self.model_in_submission(model):
458
            logger.info("Ignoring %s %s: already in a submission" % (
459
                model_type,
460
                model))
461
            return
462
463
        # Create a new submission if necessary
464
        if self.usi_submission is None:
465
            self.create_submission()
466
467
        # every time I split data in chunks I need to call the
468
        # submission task. Do it only on animals, to prevent
469
        # to put samples in a different submission
470
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
471
            self.create_submission()
472
473
        logger.info("Appending %s %s to %s" % (
474
            model._meta.verbose_name,
475
            model,
476
            self.usi_submission))
477
478
        # add object to submission data and updating counter
479
        USISubmissionData.objects.create(
480
            submission=self.usi_submission,
481
            content_object=model)
482
483
        self.counter += 1
484
485
486
class SplitSubmissionTask(SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
487
    """Split submission data in chunks in order to submit data through
488
    multiple tasks/processes and with smaller submissions"""
489
490
    name = "Split submission data"
491
    description = """Split submission data in chunks"""
492
    action = "biosample submission"
493
494
    def run(self, submission_id):
495
        """Call :py:class:`SplitSubmissionHelper` to split
496
        :py:class:`image_app.models.Submission` data.
497
        Call :py:class:`SubmitTask` for each
498
        batch of data and then call :py:class:`SubmissionCompleteTask` after
499
        all data were submitted"""
500
501
        logger.info("Starting %s for submission %s" % (
502
            self.name, submission_id))
503
504
        uid_submission = self.get_uid_submission(submission_id)
505
506
        # call an helper class to create database objects
507
        submission_data_helper = SplitSubmissionHelper(uid_submission)
508
509
        # iterate over animal and samples
510
        submission_data_helper.process_data()
511
512
        # prepare to launch chord tasks
513
        submissioncomplete = SubmissionCompleteTask()
514
515
        # assign kwargs to chord
516
        callback = submissioncomplete.s(uid_submission_id=submission_id)
517
518
        submit = SubmitTask()
519
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
520
521
        logger.debug("Preparing chord for %s tasks" % len(header))
522
523
        # call chord task. Chord will be called only after all tasks
524
        res = chord(header)(callback)
525
526
        logger.info(
527
            "Start submission chord process for %s with task %s" % (
528
                uid_submission,
529
                res.task_id))
530
531
        logger.info("%s completed" % self.name)
532
533
        # return a status
534
        return "success"
535
536
537
class SubmissionCompleteTask(
538
        SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
539
    """Update submission status after batch submission"""
540
541
    name = "Complete Submission Process"
542
    description = """Check submission status and update stuff"""
543
    action = "biosample submission"
544
545
    def run(self, *args, **kwargs):
546
        """Fetch submission data and then update
547
        :py:class:`image_app.models.Submission` status"""
548
549
        submission_statuses = args[0]
550
551
        # submission_statuses will be an array like this
552
        # [("success", 1), ("success"), 2]
553
        usi_submission_ids = [status[1] for status in submission_statuses]
554
555
        # fetch data from database
556
        submission_qs = USISubmission.objects.filter(
557
            pk__in=usi_submission_ids)
558
559
        # get UID submission
560
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
561
562
        # annotate biosample submission by statuses
563
        statuses = {}
564
565
        for res in submission_qs.values('status').annotate(
566
                count=Count('status')):
567
            statuses[res['status']] = res['count']
568
569
        # check for errors in submission. Those are statuses setted by
570
        # SubmitTask
571
        if ERROR in statuses:
572
            # submission failed
573
            logger.info("Submission %s failed" % uid_submission)
574
575
            self.update_message(uid_submission, submission_qs, ERROR)
576
577
            # send a mail to the user
578
            uid_submission.owner.email_user(
579
                "Error in biosample submission %s" % (
580
                    uid_submission.id),
581
                ("Something goes wrong with biosample submission. Please "
582
                 "report this to InjectTool team\n\n"
583
                 "%s" % uid_submission.message),
584
            )
585
586
        elif READY in statuses:
587
            # submission failed
588
            logger.info("Temporary error for %s" % uid_submission)
589
590
            self.update_message(uid_submission, submission_qs, READY)
591
592
            # send a mail to the user
593
            uid_submission.owner.email_user(
594
                "Temporary error in biosample submission %s" % (
595
                    uid_submission.id),
596
                ("Something goes wrong with biosample submission. Please "
597
                 "try again\n\n"
598
                 "%s" % uid_submission.message),
599
            )
600
601
        else:
602
            # Update submission status: a completed but not yet finalized
603
            # submission
604
            logger.info("Submission %s success" % uid_submission)
605
606
            self.update_message(uid_submission, submission_qs, SUBMITTED)
607
608
        return "success"
609
610
    def update_message(self, uid_submission, submission_qs, status):
611
        """Read :py:class:`biosample.models.Submission` message and set
612
        :py:class:`image_app.models.Submission` message"""
613
614
        # get error messages for submission
615
        message = []
616
617
        for submission in submission_qs.filter(status=status):
618
            message.append(submission.message)
619
620
        self.update_submission_status(
621
            uid_submission, status, "\n".join(set(message)))
622
623
624
# register explicitly tasks
625
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
626
celery_app.tasks.register(SubmitTask)
627
celery_app.tasks.register(SplitSubmissionTask)
628
celery_app.tasks.register(SubmissionCompleteTask)
629