Completed
Pull Request — master (#44)
by Paolo
11:03 queued 04:44
created

TaskFailureMixin.on_failure()   A

Complexity

Conditions 1

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 20
rs 9.75
c 0
b 0
f 0
cc 1
nop 6
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
51
52
        # get submission object
53
        submission_obj = Submission.objects.get(pk=args[0])
54
55
        submission_obj.status = ERROR
56
        submission_obj.message = (
57
            "Error in biosample submission: %s" % str(exc))
58
        submission_obj.save()
59
60
        # send async message
61
        send_message(submission_obj)
62
63
        # send a mail to the user with the stacktrace (einfo)
64
        submission_obj.owner.email_user(
65
            "Error in biosample submission %s" % (
66
                submission_obj.id),
67
            ("Something goes wrong with biosample submission. Please report "
68
             "this to InjectTool team\n\n %s" % str(einfo)),
69
        )
70
71
        # TODO: submit mail to admin
72
73
74
# HINT: move into helper module?
75
class SubmissionHelper():
76
    """
77
    An helper class for submission task, used to deal with pyUSIrest
78
    """
79
80
    # define my class attributes
81
    def __init__(self, submission_id):
82
83
        # ok those are my default class attributes
84
        self.submission_id = submission_id
85
        self.submission_obj = None
86
        self.token = None
87
88
        # here are pyUSIrest object
89
        self.auth = None
90
        self.root = None
91
92
        # here I will track a USI submission
93
        self.usi_submission = None
94
95
        # here I will store samples already submitted
96
        self.submitted_samples = {}
97
98
        # get a submission object
99
        self.submission_obj = USISubmission.objects.get(
100
            pk=self.submission_id)
101
102
        # HINT: should I check my status?
103
104
    @property
105
    def owner(self):
106
        """Recover owner from a submission object related with a UID
107
        Submission"""
108
109
        return self.submission_obj.uid_submission.owner
110
111
    @property
112
    def team_name(self):
113
        """Recover team_name from a submission object"""
114
115
        return self.owner.biosample_account.team.name
116
117
    @property
118
    def usi_submission_name(self):
119
        """Get biosample submission id from database"""
120
121
        return self.submission_obj.usi_submission_name
122
123
    @usi_submission_name.setter
124
    def usi_submission_name(self, name):
125
        """Get biosample submission id from database"""
126
127
        self.submission_obj.usi_submission_name = name
128
        self.submission_obj.save()
129
130
    def read_token(self):
131
        """Read token from REDIS database and set root attribute with a
132
        pyUSIrest.client.Root instance"""
133
134
        # read biosample token from redis database
135
        client = redis.StrictRedis(
136
            host=settings.REDIS_HOST,
137
            port=settings.REDIS_PORT,
138
            db=settings.REDIS_DB)
139
140
        # infere key from submission data
141
        key = "token:submission:{submission_id}:{user}".format(
142
            submission_id=self.submission_obj.uid_submission.id,
143
            user=self.owner)
144
145
        # create a new auth object
146
        logger.debug("Reading token for '%s'" % self.owner)
147
148
        # getting token from redis db and set submission data
149
        self.token = client.get(key).decode("utf8")
150
151
        # get a root object with auth
152
        self.auth = get_auth(token=self.token)
153
154
        logger.debug("getting biosample root")
155
        self.root = pyUSIrest.client.Root(auth=self.auth)
156
157
        return self.token
158
159
    def start_submission(self):
160
        """
161
        Get a USI submission document. Recover submission if possible,
162
        create a new one if not defined. If recovered submission is
163
        closed, raise an error
164
        """
165
166
        if not self.recover_submission():
167
            self.create_submission()
168
169
        return self.usi_submission
170
171
    def recover_submission(self):
172
        """Try to recover a USI submission document or raise Exception. If
173
        not defined, return false"""
174
175
        # If no usi_submission_name, return False
176
        if not self.usi_submission_name:
177
            return False
178
179
        logger.info("Recovering submission %s for team %s" % (
180
            self.usi_submission_name,
181
            self.team_name))
182
183
        # get the same submission object
184
        self.usi_submission = self.root.get_submission_by_name(
185
            submission_name=self.usi_submission_name)
186
187
        # check that a submission is still editable
188
        if self.usi_submission.status != "Draft":
189
            raise SubmissionError(
190
                "Cannot recover submission '%s': current status is '%s'" % (
191
                   self.usi_submission_name,
192
                   self.usi_submission.status))
193
194
        # read already submitted samples
195
        self.read_samples()
196
197
        return True
198
199
    def create_submission(self):
200
        """Create a nre USI submission object"""
201
202
        # getting team
203
        logger.debug("getting team '%s'" % (self.team_name))
204
        team = self.root.get_team_by_name(self.team_name)
205
206
        # create a new submission
207
        logger.info("Creating a new submission for '%s'" % (team.name))
208
        self.usi_submission = team.create_submission()
209
210
        # track submission_id in table
211
        self.usi_submission_name = self.usi_submission.name
212
213
        # return USI submission document
214
        return self.usi_submission
215
216
    def read_samples(self):
217
        """Read sample in a USI submission document and set submitted_samples
218
        attribute"""
219
220
        # read already submitted samples
221
        logger.debug("Getting info on samples...")
222
        samples = self.usi_submission.get_samples()
223
        logger.debug("Got %s samples" % (len(samples)))
224
225
        for sample in samples:
226
            self.submitted_samples[sample.alias] = sample
227
228
        return self.submitted_samples
229
230
    def create_or_update_sample(self, model):
231
        """Add or patch a sample into USI submission document. Can be
232
        animal or sample"""
233
234
        # alias is used to reference the same objects
235
        alias = model.biosample_alias
236
237
        # check in my submitted samples
238
        if alias in self.submitted_samples:
239
            # patch sample
240
            logger.info("Patching %s" % (alias))
241
242
            # get usi sample
243
            sample = self.submitted_samples[alias]
244
            sample.patch(model.to_biosample())
245
246
        else:
247
            self.usi_submission.create_sample(
248
                model.to_biosample())
249
250
        # update sample status
251
        model.name.status = SUBMITTED
252
        model.name.last_submitted = timezone.now()
253
        model.name.save()
254
255
    def add_samples(self):
256
        """Iterate over sample data (animal/sample) and call
257
        create_or_update_sample"""
258
259
        # iterate over sample data
260
        for submission_data in self.submission_obj.submission_data.all():
261
            # get model for simplicity
262
            model = submission_data.content_object
263
264
            if model.name.status == READY:
265
                logger.debug("Adding %s %s to submission %s" % (
266
                    model._meta.verbose_name,
267
                    model,
268
                    self.usi_submission_name))
269
                self.create_or_update_sample(model)
270
271
            else:
272
                logger.debug("Ignoring %s %s" % (
273
                    model._meta.verbose_name, model))
274
275
    def mark_submission(self, status, message):
276
        self.submission_obj.status = status
277
        self.submission_obj.message = message
278
        self.submission_obj.save()
279
280
    def mark_fail(self, message):
281
        """Set a ERROR status for biosample.models.Submission and a message"""
282
283
        self.mark_submission(ERROR, message)
284
285
    def mark_success(self, message="Waiting for biosample validation"):
286
        """Set a ERROR status for biosample.models.Submission and a message"""
287
288
        self.mark_submission(SUBMITTED, message)
289
290
291
class SubmitTask(MyTask):
292
    name = "Submit to Biosample"
293
    description = """Submit to Biosample using USI"""
294
295
    def run(self, usi_submission_id):
296
        # get a submission helper object
297
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
298
299
        # No retries, we expect always success
300
        try:
301
            submission_helper.read_token()
302
            submission_helper.start_submission()
303
            submission_helper.add_samples()
304
            submission_helper.mark_success()
305
306
        except ConnectionError as exc:
307
            logger.error("Error in biosample submission: %s" % exc)
308
            message = "Errors in EBI API endpoints. Please try again later"
309
            logger.error(message)
310
311
            # track message in submission object
312
            submission_helper.mark_submission(READY, message)
313
314
        # TODO: should I rename this exception with a more informative name
315
        # when token expires during a submission?
316
        except RuntimeError as exc:
317
            logger.error("Error in biosample submission: %s" % exc)
318
            message = (
319
                "Your token is expired: please submit again to resume "
320
                "submission")
321
            logger.error(message)
322
323
            # track message in submission object
324
            submission_helper.mark_submission(READY, message)
325
326
        except Exception as exc:
327
            logger.error("Unmanaged error: %s" % exc)
328
            # get exception info
329
            einfo = traceback.format_exc()
330
331
            # track traceback in message
332
            submission_helper.mark_fail(einfo)
333
334
        return "success", usi_submission_id
335
336
337
# HINT: move into helper module?
338
class SplitSubmissionHelper():
339
    def __init__(self, uid_submission):
340
        self.counter = 0
341
        self.uid_submission = uid_submission
342
        self.usi_submission = None
343
        self.submission_ids = []
344
345
    def process_data(self):
346
        for animal in Animal.objects.filter(
347
                name__submission=self.uid_submission):
348
349
            # add animal if not yet submitted, or patch it
350
            if animal.name.status == READY:
351
                self.add_to_submission_data(animal)
352
353
            else:
354
                # already submitted, so could be ignored
355
                logger.debug("Ignoring animal %s" % (animal))
356
357
            # Add their specimen
358
            for sample in animal.sample_set.all():
359
                # add sample if not yet submitted
360
                if sample.name.status == READY:
361
                    self.add_to_submission_data(sample)
362
363
                else:
364
                    # already submittes, so could be ignored
365
                    logger.debug("Ignoring sample %s" % (sample))
366
367
            # end of cicle for animal.
368
369
    def create_submission(self):
370
        """Create a new database object and reset counter"""
371
372
        self.usi_submission = USISubmission.objects.create(
373
            uid_submission=self.uid_submission,
374
            status=READY)
375
376
        # track object pks
377
        self.usi_submission.refresh_from_db()
378
        self.submission_ids.append(self.usi_submission.id)
379
380
        # reset couter object
381
        self.counter = 0
382
383
    def model_in_submission(self, model):
384
        """check if model is already in an opened submission"""
385
386
        logger.debug("Searching %s %s in submissions" % (
387
            model._meta.verbose_name,
388
            model))
389
390
        # get content type
391
        ct = ContentType.objects.get_for_model(model)
392
393
        # define a queryset
394
        data_qs = USISubmissionData.objects.filter(
395
            content_type=ct,
396
            object_id=model.id)
397
398
        # exclude opened submission
399
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
400
401
        if data_qs.count() == 1:
402
            usi_submission = data_qs.first().submission
403
404
            logger.debug("Found %s %s in %s" % (
405
                model._meta.verbose_name,
406
                model,
407
                usi_submission))
408
409
            # mark this batch to be called like it was created
410
            if usi_submission.id not in self.submission_ids:
411
                self.submission_ids.append(usi_submission.id)
412
413
                logger.debug(
414
                    "Reset status for submission %s" % (usi_submission))
415
                usi_submission.status = READY
416
                usi_submission.save()
417
418
            return True
419
420
        elif data_qs.count() >= 1:
421
            raise SubmissionError(
422
                "More than one submission opened for %s %s" % (
423
                    model._meta.verbose_name,
424
                    model))
425
426
        else:
427
            # no sample in data. I could append model into submission
428
            logger.debug("No %s %s in submission data" % (
429
                model._meta.verbose_name,
430
                model))
431
            return False
432
433
    def add_to_submission_data(self, model):
434
        # check if model is already in an opened submission
435
        if self.model_in_submission(model):
436
            logger.info("Ignoring %s %s: already in a submission" % (
437
                model._meta.verbose_name,
438
                model))
439
            return
440
441
        # Create a new submission if necessary
442
        if self.usi_submission is None:
443
            self.create_submission()
444
445
        # every time I split data in chunks I need to call the
446
        # submission task
447
        if self.counter >= MAX_SAMPLES:
448
            self.create_submission()
449
450
        logger.info("Appending %s %s to %s" % (
451
            model._meta.verbose_name,
452
            model,
453
            self.usi_submission))
454
455
        # add object to submission data and updating counter
456
        USISubmissionData.objects.create(
457
            submission=self.usi_submission,
458
            content_object=model)
459
460
        self.counter += 1
461
462
463
class SplitSubmissionTask(TaskFailureMixin, MyTask):
464
    """Split submission data in chunks in order to submit data through
465
    multiple tasks/processes and with smaller submissions"""
466
467
    name = "Split submission data"
468
    description = """Split submission data in chunks"""
469
470
    def run(self, submission_id):
471
        """This function is called when delay is called"""
472
473
        logger.info("Starting %s for submission %s" % (
474
            self.name, submission_id))
475
476
        uid_submission = Submission.objects.get(
477
            pk=submission_id)
478
479
        # call an helper class to create database objects
480
        submission_data_helper = SplitSubmissionHelper(uid_submission)
481
482
        # iterate over animal and samples
483
        submission_data_helper.process_data()
484
485
        # prepare to launch chord tasks
486
        submissioncomplete = SubmissionCompleteTask()
487
488
        # assign kwargs to chord
489
        callback = submissioncomplete.s(uid_submission_id=submission_id)
490
491
        submit = SubmitTask()
492
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
493
494
        logger.debug("Preparing chord for %s tasks" % len(header))
495
496
        # call chord task. Chord will be called only after all tasks
497
        res = chord(header)(callback)
498
499
        logger.info(
500
            "Start submission chord process for %s with task %s" % (
501
                uid_submission,
502
                res.task_id))
503
504
        logger.info("%s completed" % self.name)
505
506
        # return a status
507
        return "success"
508
509
510
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
511
    """Update submission status after batch submission"""
512
513
    name = "Complete Submission"
514
    description = """Check submission status and update stuff"""
515
516
    def run(self, *args, **kwargs):
517
        """Fetch submission data and then update UID submission status"""
518
519
        submission_statuses = args[0]
520
521
        logger.debug(args)
522
        logger.debug(kwargs)
523
        logger.debug(submission_statuses)
524
525
        # submission_statuses will be an array like this
526
        # [("success", 1), ("success"), 2]
527
        usi_submission_ids = [status[1] for status in submission_statuses]
528
529
        # fetch data from database
530
        submission_qs = USISubmission.objects.filter(
531
            pk__in=usi_submission_ids)
532
533
        # get UID submission
534
        uid_submission = Submission.objects.get(
535
            pk=kwargs['uid_submission_id'])
536
537
        # annotate biosample submission by statuses
538
        statuses = {}
539
540
        for res in submission_qs.values('status').annotate(
541
                count=Count('status')):
542
            statuses[res['status']] = res['count']
543
544
        # check for errors in submission. Those are statuses setted by
545
        # SubmitTask
546
        if ERROR in statuses:
547
            # submission failed
548
            logger.info("Submission %s failed" % uid_submission)
549
550
            self.__update_message(uid_submission, submission_qs, ERROR)
551
552
        elif READY in statuses:
553
            # submission failed
554
            logger.info("Temporary error for %s" % uid_submission)
555
556
            self.__update_message(uid_submission, submission_qs, READY)
557
558
        else:
559
            # Update submission status: a completed but not yet finalized
560
            # submission
561
            logger.info("Submission %s success" % uid_submission)
562
563
            self.__update_message(uid_submission, submission_qs, SUBMITTED)
564
565
        # send async message
566
        send_message(uid_submission)
567
568
        return "success"
569
570
    def __update_message(self, uid_submission, submission_qs, status):
571
        """Read biosample.models.Submission message and set
572
        image_app.models.Submission message"""
573
574
        # get error messages for submission
575
        message = []
576
577
        for submission in submission_qs.filter(status=status):
578
            message.append(submission.message)
579
580
        uid_submission.status = status
581
        uid_submission.message = "\n".join(set(message))
582
        uid_submission.save()
583
584
585
# register explicitly tasks
586
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
587
celery_app.tasks.register(SubmitTask)
588
celery_app.tasks.register(SplitSubmissionTask)
589
celery_app.tasks.register(SubmissionCompleteTask)
590