Completed
Pull Request — master (#44)
by Paolo
05:50
created

biosample.tasks.submission.SubmitTask.run()   A

Complexity

Conditions 4

Size

Total Lines 40
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 40
rs 9.328
c 0
b 0
f 0
cc 4
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
51
52
        # get submission object
53
        submission_obj = Submission.objects.get(pk=args[0])
54
55
        submission_obj.status = ERROR
56
        submission_obj.message = (
57
            "Error in biosample submission: %s" % str(exc))
58
        submission_obj.save()
59
60
        # send async message
61
        send_message(submission_obj)
62
63
        # send a mail to the user with the stacktrace (einfo)
64
        submission_obj.owner.email_user(
65
            "Error in biosample submission %s" % (
66
                submission_obj.id),
67
            ("Something goes wrong with biosample submission. Please report "
68
             "this to InjectTool team\n\n %s" % str(einfo)),
69
        )
70
71
        # TODO: submit mail to admin
72
73
74
# HINT: move into helper module?
75
class SubmissionHelper():
76
    """
77
    An helper class for submission task, used to deal with pyUSIrest
78
    """
79
80
    # define my class attributes
81
    def __init__(self, submission_id):
82
83
        # ok those are my default class attributes
84
        self.submission_id = submission_id
85
        self.submission_obj = None
86
        self.token = None
87
88
        # here are pyUSIrest object
89
        self.auth = None
90
        self.root = None
91
92
        # here I will track a USI submission
93
        self.usi_submission = None
94
95
        # here I will store samples already submitted
96
        self.submitted_samples = {}
97
98
        # get a submission object
99
        self.submission_obj = USISubmission.objects.get(
100
            pk=self.submission_id)
101
102
        # HINT: should I check my status?
103
104
    @property
105
    def owner(self):
106
        """Recover owner from a submission object related with a UID
107
        Submission"""
108
109
        return self.submission_obj.uid_submission.owner
110
111
    @property
112
    def team_name(self):
113
        """Recover team_name from a submission object"""
114
115
        return self.owner.biosample_account.team.name
116
117
    @property
118
    def usi_submission_name(self):
119
        """Get biosample submission id from database"""
120
121
        return self.submission_obj.usi_submission_name
122
123
    @usi_submission_name.setter
124
    def usi_submission_name(self, name):
125
        """Get biosample submission id from database"""
126
127
        self.submission_obj.usi_submission_name = name
128
        self.submission_obj.save()
129
130
    def read_token(self):
131
        """Read token from REDIS database and set root attribute with a
132
        pyUSIrest.client.Root instance"""
133
134
        # read biosample token from redis database
135
        client = redis.StrictRedis(
136
            host=settings.REDIS_HOST,
137
            port=settings.REDIS_PORT,
138
            db=settings.REDIS_DB)
139
140
        # infere key from submission data
141
        key = "token:submission:{submission_id}:{user}".format(
142
            submission_id=self.submission_obj.uid_submission.id,
143
            user=self.owner)
144
145
        # create a new auth object
146
        logger.debug("Reading token for '%s'" % self.owner)
147
148
        # getting token from redis db and set submission data
149
        self.token = client.get(key).decode("utf8")
150
151
        # get a root object with auth
152
        self.auth = get_auth(token=self.token)
153
154
        logger.debug("getting biosample root")
155
        self.root = pyUSIrest.client.Root(auth=self.auth)
156
157
        return self.token
158
159
    def start_submission(self):
160
        """
161
        Get a USI submission document. Recover submission if possible,
162
        create a new one if not defined. If recovered submission is
163
        closed, raise an error
164
        """
165
166
        if not self.recover_submission():
167
            self.create_submission()
168
169
        return self.usi_submission
170
171
    def recover_submission(self):
172
        """Try to recover a USI submission document or raise Exception. If
173
        not defined, return false"""
174
175
        # If no usi_submission_name, return False
176
        if not self.usi_submission_name:
177
            return False
178
179
        logger.info("Recovering submission %s for team %s" % (
180
            self.usi_submission_name,
181
            self.team_name))
182
183
        # get the same submission object
184
        self.usi_submission = self.root.get_submission_by_name(
185
            submission_name=self.usi_submission_name)
186
187
        # check that a submission is still editable
188
        if self.usi_submission.status != "Draft":
189
            raise SubmissionError(
190
                "Cannot recover submission '%s': current status is '%s'" % (
191
                   self.usi_submission_name,
192
                   self.usi_submission.status))
193
194
        # read already submitted samples
195
        self.read_samples()
196
197
        return True
198
199
    def create_submission(self):
200
        """Create a nre USI submission object"""
201
202
        # getting team
203
        logger.debug("getting team '%s'" % (self.team_name))
204
        team = self.root.get_team_by_name(self.team_name)
205
206
        # create a new submission
207
        logger.info("Creating a new submission for '%s'" % (team.name))
208
        self.usi_submission = team.create_submission()
209
210
        # track submission_id in table
211
        self.usi_submission_name = self.usi_submission.name
212
213
        # return USI submission document
214
        return self.usi_submission
215
216
    def read_samples(self):
217
        """Read sample in a USI submission document and set submitted_samples
218
        attribute"""
219
220
        # read already submitted samples
221
        logger.debug("Getting info on samples...")
222
        samples = self.usi_submission.get_samples()
223
        logger.debug("Got %s samples" % (len(samples)))
224
225
        for sample in samples:
226
            self.submitted_samples[sample.alias] = sample
227
228
        return self.submitted_samples
229
230
    def create_or_update_sample(self, model):
231
        """Add or patch a sample into USI submission document. Can be
232
        animal or sample"""
233
234
        # alias is used to reference the same objects
235
        alias = model.biosample_alias
236
237
        # check in my submitted samples
238
        if alias in self.submitted_samples:
239
            # patch sample
240
            logger.info("Patching %s" % (alias))
241
242
            # get usi sample
243
            sample = self.submitted_samples[alias]
244
            sample.patch(model.to_biosample())
245
246
        else:
247
            self.usi_submission.create_sample(
248
                model.to_biosample())
249
250
        # update sample status
251
        model.name.status = SUBMITTED
252
        model.name.last_submitted = timezone.now()
253
        model.name.save()
254
255
    def add_samples(self):
256
        """Iterate over sample data (animal/sample) and call
257
        create_or_update_sample"""
258
259
        # iterate over sample data
260
        for submission_data in self.submission_obj.submission_data\
261
                .order_by('id'):
262
            # get model for simplicity
263
            model = submission_data.content_object
264
265
            if model.name.status == READY:
266
                logger.debug("Adding %s %s to submission %s" % (
267
                    model._meta.verbose_name,
268
                    model,
269
                    self.usi_submission_name))
270
                self.create_or_update_sample(model)
271
272
            else:
273
                logger.debug("Ignoring %s %s" % (
274
                    model._meta.verbose_name, model))
275
276
    def mark_submission(self, status, message):
277
        self.submission_obj.status = status
278
        self.submission_obj.message = message
279
        self.submission_obj.save()
280
281
    def mark_fail(self, message):
282
        """Set a ERROR status for biosample.models.Submission and a message"""
283
284
        self.mark_submission(ERROR, message)
285
286
    def mark_success(self, message="Waiting for biosample validation"):
287
        """Set a ERROR status for biosample.models.Submission and a message"""
288
289
        self.mark_submission(SUBMITTED, message)
290
291
292
class SubmitTask(MyTask):
293
    name = "Submit to Biosample"
294
    description = """Submit to Biosample using USI"""
295
296
    def run(self, usi_submission_id):
297
        # get a submission helper object
298
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
299
300
        # No retries, we expect always success
301
        try:
302
            submission_helper.read_token()
303
            submission_helper.start_submission()
304
            submission_helper.add_samples()
305
            submission_helper.mark_success()
306
307
        except ConnectionError as exc:
308
            logger.error("Error in biosample submission: %s" % exc)
309
            message = "Errors in EBI API endpoints. Please try again later"
310
            logger.error(message)
311
312
            # track message in submission object
313
            submission_helper.mark_submission(READY, message)
314
315
        # TODO: should I rename this exception with a more informative name
316
        # when token expires during a submission?
317
        except RuntimeError as exc:
318
            logger.error("Error in biosample submission: %s" % exc)
319
            message = (
320
                "Your token is expired: please submit again to resume "
321
                "submission")
322
            logger.error(message)
323
324
            # track message in submission object
325
            submission_helper.mark_submission(READY, message)
326
327
        except Exception as exc:
328
            logger.error("Unmanaged error: %s" % exc)
329
            # get exception info
330
            einfo = traceback.format_exc()
331
332
            # track traceback in message
333
            submission_helper.mark_fail(einfo)
334
335
        return "success", usi_submission_id
336
337
338
# HINT: move into helper module?
339
class SplitSubmissionHelper():
340
    def __init__(self, uid_submission):
341
        self.counter = 0
342
        self.uid_submission = uid_submission
343
        self.usi_submission = None
344
        self.submission_ids = []
345
346
    def process_data(self):
347
        """Add animal and its samples to a submission"""
348
349
        for animal in Animal.objects.filter(
350
                name__submission=self.uid_submission):
351
352
            # add animal if not yet submitted, or patch it
353
            if animal.name.status == READY:
354
                self.add_to_submission_data(animal)
355
356
            else:
357
                # already submitted, so could be ignored
358
                logger.debug("Ignoring animal %s" % (animal))
359
360
            # Add their specimen
361
            for sample in animal.sample_set.all():
362
                # add sample if not yet submitted
363
                if sample.name.status == READY:
364
                    self.add_to_submission_data(sample)
365
366
                else:
367
                    # already submittes, so could be ignored
368
                    logger.debug("Ignoring sample %s" % (sample))
369
370
            # end of cicle for animal.
371
372
    def create_submission(self):
373
        """Create a new database object and reset counter"""
374
375
        self.usi_submission = USISubmission.objects.create(
376
            uid_submission=self.uid_submission,
377
            status=READY)
378
379
        # track object pks
380
        self.usi_submission.refresh_from_db()
381
        self.submission_ids.append(self.usi_submission.id)
382
383
        logger.debug("Created submission %s" % (self.usi_submission))
384
385
        # reset couter object
386
        self.counter = 0
387
388
    def model_in_submission(self, model):
389
        """check if model is already in an opened submission"""
390
391
        logger.debug("Searching %s %s in submissions" % (
392
            model._meta.verbose_name,
393
            model))
394
395
        # get content type
396
        ct = ContentType.objects.get_for_model(model)
397
398
        # define a queryset
399
        data_qs = USISubmissionData.objects.filter(
400
            content_type=ct,
401
            object_id=model.id)
402
403
        # exclude opened submission
404
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
405
406
        if data_qs.count() == 1:
407
            usi_submission = data_qs.first().submission
408
409
            logger.debug("Found %s %s in %s" % (
410
                model._meta.verbose_name,
411
                model,
412
                usi_submission))
413
414
            # mark this batch to be called like it was created
415
            if usi_submission.id not in self.submission_ids:
416
                self.submission_ids.append(usi_submission.id)
417
418
                logger.debug(
419
                    "Reset status for submission %s" % (usi_submission))
420
                usi_submission.status = READY
421
                usi_submission.save()
422
423
            return True
424
425
        elif data_qs.count() >= 1:
426
            raise SubmissionError(
427
                "More than one submission opened for %s %s" % (
428
                    model._meta.verbose_name,
429
                    model))
430
431
        else:
432
            # no sample in data. I could append model into submission
433
            logger.debug("No %s %s in submission data" % (
434
                model._meta.verbose_name,
435
                model))
436
            return False
437
438
    def add_to_submission_data(self, model):
439
        # get model type (animal or sample)
440
        model_type = model._meta.verbose_name
441
442
        # check if model is already in an opened submission
443
        if self.model_in_submission(model):
444
            logger.info("Ignoring %s %s: already in a submission" % (
445
                model_type,
446
                model))
447
            return
448
449
        # Create a new submission if necessary
450
        if self.usi_submission is None:
451
            self.create_submission()
452
453
        # every time I split data in chunks I need to call the
454
        # submission task. Do it only on animals, to prevent
455
        # to put samples in a different submission
456
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
457
            self.create_submission()
458
459
        logger.info("Appending %s %s to %s" % (
460
            model._meta.verbose_name,
461
            model,
462
            self.usi_submission))
463
464
        # add object to submission data and updating counter
465
        USISubmissionData.objects.create(
466
            submission=self.usi_submission,
467
            content_object=model)
468
469
        self.counter += 1
470
471
472
class SplitSubmissionTask(TaskFailureMixin, MyTask):
473
    """Split submission data in chunks in order to submit data through
474
    multiple tasks/processes and with smaller submissions"""
475
476
    name = "Split submission data"
477
    description = """Split submission data in chunks"""
478
479
    def run(self, submission_id):
480
        """This function is called when delay is called"""
481
482
        logger.info("Starting %s for submission %s" % (
483
            self.name, submission_id))
484
485
        uid_submission = Submission.objects.get(
486
            pk=submission_id)
487
488
        # call an helper class to create database objects
489
        submission_data_helper = SplitSubmissionHelper(uid_submission)
490
491
        # iterate over animal and samples
492
        submission_data_helper.process_data()
493
494
        # prepare to launch chord tasks
495
        submissioncomplete = SubmissionCompleteTask()
496
497
        # assign kwargs to chord
498
        callback = submissioncomplete.s(uid_submission_id=submission_id)
499
500
        submit = SubmitTask()
501
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
502
503
        logger.debug("Preparing chord for %s tasks" % len(header))
504
505
        # call chord task. Chord will be called only after all tasks
506
        res = chord(header)(callback)
507
508
        logger.info(
509
            "Start submission chord process for %s with task %s" % (
510
                uid_submission,
511
                res.task_id))
512
513
        logger.info("%s completed" % self.name)
514
515
        # return a status
516
        return "success"
517
518
519
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
520
    """Update submission status after batch submission"""
521
522
    name = "Complete Submission Process"
523
    description = """Check submission status and update stuff"""
524
525
    def run(self, *args, **kwargs):
526
        """Fetch submission data and then update UID submission status"""
527
528
        submission_statuses = args[0]
529
530
        # submission_statuses will be an array like this
531
        # [("success", 1), ("success"), 2]
532
        usi_submission_ids = [status[1] for status in submission_statuses]
533
534
        # fetch data from database
535
        submission_qs = USISubmission.objects.filter(
536
            pk__in=usi_submission_ids)
537
538
        # get UID submission
539
        uid_submission = Submission.objects.get(
540
            pk=kwargs['uid_submission_id'])
541
542
        # annotate biosample submission by statuses
543
        statuses = {}
544
545
        for res in submission_qs.values('status').annotate(
546
                count=Count('status')):
547
            statuses[res['status']] = res['count']
548
549
        # check for errors in submission. Those are statuses setted by
550
        # SubmitTask
551
        if ERROR in statuses:
552
            # submission failed
553
            logger.info("Submission %s failed" % uid_submission)
554
555
            self.__update_message(uid_submission, submission_qs, ERROR)
556
557
            # send a mail to the user
558
            uid_submission.owner.email_user(
559
                "Error in biosample submission %s" % (
560
                    uid_submission.id),
561
                ("Something goes wrong with biosample submission. Please "
562
                 "report this to InjectTool team\n\n"
563
                 "%s" % uid_submission.message),
564
            )
565
566
        elif READY in statuses:
567
            # submission failed
568
            logger.info("Temporary error for %s" % uid_submission)
569
570
            self.__update_message(uid_submission, submission_qs, READY)
571
572
            # send a mail to the user
573
            uid_submission.owner.email_user(
574
                "Temporary error in biosample submission %s" % (
575
                    uid_submission.id),
576
                ("Something goes wrong with biosample submission. Please "
577
                 "try again\n\n"
578
                 "%s" % uid_submission.message),
579
            )
580
581
        else:
582
            # Update submission status: a completed but not yet finalized
583
            # submission
584
            logger.info("Submission %s success" % uid_submission)
585
586
            self.__update_message(uid_submission, submission_qs, SUBMITTED)
587
588
        # send async message
589
        send_message(uid_submission)
590
591
        return "success"
592
593
    def __update_message(self, uid_submission, submission_qs, status):
594
        """Read biosample.models.Submission message and set
595
        image_app.models.Submission message"""
596
597
        # get error messages for submission
598
        message = []
599
600
        for submission in submission_qs.filter(status=status):
601
            message.append(submission.message)
602
603
        uid_submission.status = status
604
        uid_submission.message = "\n".join(set(message))
605
        uid_submission.save()
606
607
608
# register explicitly tasks
609
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
610
celery_app.tasks.register(SubmitTask)
611
celery_app.tasks.register(SplitSubmissionTask)
612
celery_app.tasks.register(SubmissionCompleteTask)
613