SplitSubmissionTask.run()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.5
c 0
b 0
f 0
cc 1
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.usi
12
import pyUSIrest.exceptions
13
14
from celery import chord
15
from celery.utils.log import get_task_logger
16
17
from django.conf import settings
18
from django.contrib.contenttypes.models import ContentType
19
from django.db.models import Count, F
20
from django.db.models.functions import Least
21
from django.utils import timezone
22
from django.template.defaultfilters import truncatechars
23
24
from common.constants import (
25
    ERROR, READY, SUBMITTED, COMPLETED, EMAIL_MAX_BODY_SIZE)
26
from common.tasks import BaseTask, NotifyAdminTaskMixin
27
from image.celery import app as celery_app
28
from submissions.tasks import SubmissionTaskMixin
29
30
from ..helpers import get_auth
31
from ..models import (
32
    Submission as USISubmission, SubmissionData as USISubmissionData)
33
34
# Get an instance of a logger
35
logger = get_task_logger(__name__)
36
37
# how many sample for submission
38
MAX_SAMPLES = 100
39
40
41
class SubmissionError(Exception):
42
    """Exception call for Error with submissions"""
43
44
    pass
45
46
47
# HINT: move into helper module?
48
class SubmissionHelper():
49
    """
50
    An helper class for submission task, used to deal with pyUSIrest
51
    """
52
53
    # define my class attributes
54
    def __init__(self, submission_id):
55
56
        # ok those are my default class attributes
57
        self.submission_id = submission_id
58
        self.submission_obj = None
59
        self.token = None
60
61
        # here are pyUSIrest object
62
        self.auth = None
63
        self.root = None
64
65
        # here I will track a USI submission
66
        self.usi_submission = None
67
68
        # here I will store samples already submitted
69
        self.submitted_samples = {}
70
71
        # get a submission object
72
        self.submission_obj = USISubmission.objects.get(
73
            pk=self.submission_id)
74
75
        # HINT: should I check my status?
76
77
    @property
78
    def owner(self):
79
        """Recover owner from a submission object related with a UID
80
        Submission
81
82
        Returns:
83
            :py:attr:`Submission.owner`: a django
84
            :py:class:`django.contrib.auth.models.User` object
85
        """
86
87
        return self.submission_obj.uid_submission.owner
88
89
    @property
90
    def team_name(self):
91
        """Recover team_name from a submission object
92
93
        Returns:
94
            str: the team name"""
95
96
        return self.owner.biosample_account.team.name
97
98
    @property
99
    def usi_submission_name(self):
100
        """Get/set biosample submission id from database
101
102
        Returns:
103
            str: the biosample USI submission identifier"""
104
105
        return self.submission_obj.usi_submission_name
106
107
    @usi_submission_name.setter
108
    def usi_submission_name(self, name):
109
110
        self.submission_obj.usi_submission_name = name
111
        self.submission_obj.save()
112
113
    def read_token(self):
114
        """Read token from REDIS database and set root attribute with a
115
        pyUSIrest.usi.Root instance
116
117
        Returns:
118
            str: the read token"""
119
120
        # read biosample token from redis database
121
        client = redis.StrictRedis(
122
            host=settings.REDIS_HOST,
123
            port=settings.REDIS_PORT,
124
            db=settings.REDIS_DB)
125
126
        # infere key from submission data
127
        key = "token:submission:{submission_id}:{user}".format(
128
            submission_id=self.submission_obj.uid_submission.id,
129
            user=self.owner)
130
131
        # create a new auth object
132
        logger.debug("Reading token for '%s'" % self.owner)
133
134
        # getting token from redis db and set submission data
135
        self.token = client.get(key).decode("utf8")
136
137
        # get a root object with auth
138
        self.auth = get_auth(token=self.token)
139
140
        logger.debug("getting biosample root")
141
        self.root = pyUSIrest.usi.Root(auth=self.auth)
142
143
        return self.token
144
145
    def start_submission(self):
146
        """
147
        Get a USI submission document. Recover submission if possible,
148
        create a new one if not defined. If recovered submission is
149
        closed, raise an error
150
        """
151
152
        if not self.recover_submission():
153
            self.create_submission()
154
155
        return self.usi_submission
156
157
    def recover_submission(self):
158
        """Try to recover a USI submission document or raise Exception. If
159
        not defined, return false"""
160
161
        # If no usi_submission_name, return False
162
        if not self.usi_submission_name:
163
            return False
164
165
        logger.info("Recovering submission %s for team %s" % (
166
            self.usi_submission_name,
167
            self.team_name))
168
169
        # get the same submission object
170
        self.usi_submission = self.root.get_submission_by_name(
171
            submission_name=self.usi_submission_name)
172
173
        # check that a submission is still editable
174
        if self.usi_submission.status != "Draft":
175
            raise SubmissionError(
176
                "Cannot recover submission '%s': current status is '%s'" % (
177
                   self.usi_submission_name,
178
                   self.usi_submission.status))
179
180
        # read already submitted samples
181
        self.read_samples()
182
183
        return True
184
185
    def create_submission(self):
186
        """Create a new USI submission object
187
188
        Returns:
189
            :py:class:`pyUSIrest.usi.Submission` a pyUSIrest submission
190
            object
191
        """
192
193
        # getting team
194
        logger.debug("getting team '%s'" % (self.team_name))
195
        team = self.root.get_team_by_name(self.team_name)
196
197
        # create a new submission
198
        logger.info("Creating a new submission for '%s'" % (team.name))
199
        self.usi_submission = team.create_submission()
200
201
        # track submission_id in table
202
        self.usi_submission_name = self.usi_submission.name
203
204
        # return USI submission document
205
        return self.usi_submission
206
207
    def read_samples(self):
208
        """Read sample in a USI submission document and set submitted_samples
209
        attribute"""
210
211
        # read already submitted samples
212
        logger.debug("Getting info on samples...")
213
214
        for i, sample in enumerate(self.usi_submission.get_samples()):
215
            self.submitted_samples[sample.alias] = sample
216
217
        logger.debug("Got %s samples" % (i+1))
0 ignored issues
show
introduced by
The variable i does not seem to be defined in case the for loop on line 214 is not entered. Are you sure this can never be the case?
Loading history...
218
219
        return self.submitted_samples
220
221
    def create_or_update_sample(self, model):
222
        """Add or patch a sample into USI submission document. Can be
223
        animal or sample
224
225
        Args:
226
            model (:py:class:`uid.mixins.BioSampleMixin`): An animal or
227
                sample object"""
228
229
        # alias is used to reference the same objects
230
        alias = model.biosample_alias
231
232
        # check in my submitted samples
233
        if alias in self.submitted_samples:
234
            # patch sample
235
            logger.info("Patching %s" % (alias))
236
237
            # get usi sample
238
            sample = self.submitted_samples[alias]
239
            sample.patch(model.to_biosample())
240
241
        else:
242
            sample = self.usi_submission.create_sample(
243
                model.to_biosample())
244
245
            self.submitted_samples[alias] = sample
246
247
        # update sample status
248
        model.status = SUBMITTED
249
        model.last_submitted = timezone.now()
250
        model.save()
251
252
    def add_samples(self):
253
        """Iterate over sample data (animal/sample) and call
254
        create_or_update_sample (if model is in READY state)"""
255
256
        # iterate over sample data
257
        for submission_data in self.submission_obj.submission_data\
258
                .order_by('id'):
259
            # get model for simplicity
260
            model = submission_data.content_object
261
262
            if model.status == READY:
263
                logger.debug("Adding %s %s to submission %s" % (
264
                    model._meta.verbose_name,
265
                    model,
266
                    self.usi_submission_name))
267
                self.create_or_update_sample(model)
268
269
            else:
270
                logger.debug("Ignoring %s %s: current status is %s" % (
271
                    model._meta.verbose_name,
272
                    model,
273
                    model.get_status_display()))
274
275
    def mark_submission(self, status, message):
276
        self.submission_obj.status = status
277
        self.submission_obj.message = message
278
        self.submission_obj.save()
279
280
    def mark_fail(self, message):
281
        """Set a :py:const:`ERROR <common.constants.ERROR>` status for
282
        :py:class:`biosample.models.Submission` and a message"""
283
284
        self.mark_submission(ERROR, message)
285
286
    def mark_success(self, message="Waiting for biosample validation"):
287
        """Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
288
        :py:class:`biosample.models.Submission` and a message"""
289
290
        self.mark_submission(SUBMITTED, message)
291
292
293
class SubmitTask(NotifyAdminTaskMixin, BaseTask):
294
    name = "Submit to Biosample"
295
    description = """Submit to Biosample using USI"""
296
297
    def run(self, usi_submission_id):
298
        """Run task. Instantiate a SubmissionHelper with the provided
299
        :py:class:`biosample.models.Submission` id. Read token from database,
300
        start or recover a submission, add samples to it and then mark a
301
        status for it
302
        """
303
304
        # get a submission helper object
305
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
306
307
        # No retries, we expect always success
308
        try:
309
            submission_helper.read_token()
310
            submission_helper.start_submission()
311
            submission_helper.add_samples()
312
            submission_helper.mark_success()
313
314
        except pyUSIrest.exceptions.USIConnectionError as exc:
315
            logger.error("Error in biosample submission: %s" % exc)
316
            message = "Errors in EBI API endpoints. Please try again later"
317
            logger.error(message)
318
319
            # track message in submission object
320
            submission_helper.mark_submission(READY, message)
321
322
        # TODO: should I rename this exception with a more informative name
323
        # when token expires during a submission?
324
        except pyUSIrest.exceptions.TokenExpiredError as exc:
325
            logger.error("Error in biosample submission: %s" % exc)
326
            message = (
327
                "Your token is expired: please submit again to resume "
328
                "submission")
329
            logger.error(message)
330
331
            # track message in submission object
332
            submission_helper.mark_submission(READY, message)
333
334
        except Exception as exc:
335
            logger.error("Unmanaged error: %s" % exc)
336
            # get exception info
337
            einfo = traceback.format_exc()
338
339
            # track traceback in message
340
            submission_helper.mark_fail(einfo)
341
342
        return "success", usi_submission_id
343
344
345
# HINT: move into helper module?
346
class SplitSubmissionHelper():
347
    """
348
    helper class to split py:class`uid.models.Submission` data in
349
    bacthes limited in sizes"""
350
351
    def __init__(self, uid_submission):
352
        self.counter = 0
353
        self.uid_submission = uid_submission
354
        self.usi_submission = None
355
        self.submission_ids = []
356
357
    def process_data(self):
358
        """Add animal and its samples to a submission"""
359
360
        # here we try to submit first animal without parents, then animal
361
        # with parent with lowest foreign keys, supposing that when uploadimg
362
        # a chiuld, his parents need to be defined and so they have a lower id
363
        # the postgres LEAST is a function that will return the column with
364
        # the lowest value, then we have to order explicitely with F in
365
        # order to apply the NULLS FIRST condition (animal without parents)
366
        for animal in self.uid_submission.animal_set.annotate(
367
                least=Least('father_id', 'mother_id')).order_by(
368
                    F('least').asc(nulls_first=True), F('id')):
369
370
            # ignore not READY models
371
            self.process_model(animal)
372
373
            # Add their specimen
374
            for sample in animal.sample_set.all():
375
                # ignore not READY models
376
                self.process_model(sample)
377
378
            # end of cicle for animal
379
380
        # are there orphaned samples (a submission with only samples)?
381
        for sample in self.uid_submission.sample_set.all():
382
            # ignore not READY models
383
            self.process_model(sample)
384
385
    def process_model(self, model):
386
        """Test for a model in a biosample submission. Ignore a model if
387
        status is not READY"""
388
389
        if model.status == READY:
390
            self.add_to_submission_data(model)
391
392
        else:
393
            # already submittes, so could be ignored
394
            logger.debug("Ignoring %s %s" % (model._meta.verbose_name, model))
395
396
    def create_submission(self):
397
        """
398
        Create a new :py:class:`biosample.models.Submission` object and
399
        set sample counter to 0"""
400
401
        self.usi_submission = USISubmission.objects.create(
402
            uid_submission=self.uid_submission,
403
            status=READY)
404
405
        # track object pks
406
        self.usi_submission.refresh_from_db()
407
        self.submission_ids.append(self.usi_submission.id)
408
409
        logger.debug("Created submission %s" % (self.usi_submission))
410
411
        # reset couter object
412
        self.counter = 0
413
414
    def model_in_submission(self, model):
415
        """
416
        Check if :py:class:`uid.mixins.BioSampleMixin` is already in an
417
        opened submission"""
418
419
        logger.debug("Searching %s %s in submissions" % (
420
            model._meta.verbose_name,
421
            model))
422
423
        # get content type
424
        ct = ContentType.objects.get_for_model(model)
425
426
        # define a queryset
427
        data_qs = USISubmissionData.objects.filter(
428
            content_type=ct,
429
            object_id=model.id)
430
431
        # exclude opened submission
432
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
433
434
        if data_qs.count() == 1:
435
            usi_submission = data_qs.first().submission
436
437
            logger.debug("Found %s %s in %s" % (
438
                model._meta.verbose_name,
439
                model,
440
                usi_submission))
441
442
            # mark this batch to be called like it was created
443
            if usi_submission.id not in self.submission_ids:
444
                self.submission_ids.append(usi_submission.id)
445
446
                logger.debug(
447
                    "Reset status for submission %s" % (usi_submission))
448
                usi_submission.status = READY
449
                usi_submission.save()
450
451
            return True
452
453
        elif data_qs.count() >= 1:
454
            raise SubmissionError(
455
                "More than one submission opened for %s %s" % (
456
                    model._meta.verbose_name,
457
                    model))
458
459
        else:
460
            # no sample in data. I could append model into submission
461
            logger.debug("No %s %s in submission data" % (
462
                model._meta.verbose_name,
463
                model))
464
            return False
465
466
    def add_to_submission_data(self, model):
467
        """Add a :py:class:`uid.mixins.BioSampleMixin` to a
468
        :py:class:`biosample.models.Submission` object, or create a new
469
        one if there are more samples than required"""
470
471
        # get model type (animal or sample)
472
        model_type = model._meta.verbose_name
473
474
        # check if model is already in an opened submission
475
        if self.model_in_submission(model):
476
            logger.debug("Ignoring %s %s: already in a submission" % (
477
                model_type,
478
                model))
479
            return
480
481
        # Create a new submission if necessary
482
        if self.usi_submission is None:
483
            self.create_submission()
484
485
        # every time I split data in chunks I need to call the
486
        # submission task. Do it only on animals, to prevent
487
        # to put samples in a different submission
488
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
489
            self.create_submission()
490
491
        logger.info("Appending %s %s to %s" % (
492
            model._meta.verbose_name,
493
            model,
494
            self.usi_submission))
495
496
        # add object to submission data and updating counter
497
        USISubmissionData.objects.create(
498
            submission=self.usi_submission,
499
            content_object=model)
500
501
        self.counter += 1
502
503
        # Raise internal counter
504
        self.usi_submission.samples_count = F('samples_count') + 1
505
        self.usi_submission.save()
506
507
508
class SplitSubmissionTask(SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
509
    """Split submission data in chunks in order to submit data through
510
    multiple tasks/processes and with smaller submissions"""
511
512
    name = "Split submission data"
513
    description = """Split submission data in chunks"""
514
    action = "biosample submission"
515
516
    def run(self, submission_id):
517
        """Call :py:class:`SplitSubmissionHelper` to split
518
        :py:class:`uid.models.Submission` data.
519
        Call :py:class:`SubmitTask` for each
520
        batch of data and then call :py:class:`SubmissionCompleteTask` after
521
        all data were submitted"""
522
523
        logger.info("Starting %s for submission %s" % (
524
            self.name, submission_id))
525
526
        uid_submission = self.get_uid_submission(submission_id)
527
528
        # call an helper class to create database objects
529
        submission_data_helper = SplitSubmissionHelper(uid_submission)
530
531
        # iterate over animal and samples
532
        submission_data_helper.process_data()
533
534
        # prepare to launch chord tasks
535
        submissioncomplete = SubmissionCompleteTask()
536
537
        # assign kwargs to chord
538
        callback = submissioncomplete.s(uid_submission_id=submission_id)
539
540
        submit = SubmitTask()
541
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
542
543
        logger.debug("Preparing chord for %s tasks" % len(header))
544
545
        # call chord task. Chord will be called only after all tasks
546
        res = chord(header)(callback)
547
548
        logger.info(
549
            "Start submission chord process for %s with task %s" % (
550
                uid_submission,
551
                res.task_id))
552
553
        logger.info("%s completed" % self.name)
554
555
        # return a status
556
        return "success"
557
558
559
class SubmissionCompleteTask(
560
        SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
561
    """Update submission status after batch submission"""
562
563
    name = "Complete Submission Process"
564
    description = """Check submission status and update stuff"""
565
    action = "biosample submission"
566
567
    def run(self, *args, **kwargs):
568
        """Fetch submission data and then update
569
        :py:class:`uid.models.Submission` status"""
570
571
        # those are the output of SubmitTask, as a tuple of
572
        # biosample.model.Submission.pk and "success"
573
        submission_statuses = args[0]
574
575
        # get UID submission
576
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
577
578
        # mark as completed if submission_statuses is empty, for example when
579
        # submitting a uid submission with no data
580
        if not submission_statuses:
581
            message = "Submission %s is empty!" % uid_submission
582
            logger.warning(message)
583
584
            # update submission status. No more queries on this
585
            self.update_submission_status(
586
                uid_submission, ERROR, message)
587
588
            return "success"
589
590
        # submission_statuses will be an array like this
591
        # [("success", 1), ("success"), 2]
592
        usi_submission_ids = [status[1] for status in submission_statuses]
593
594
        # fetch data from database
595
        submission_qs = USISubmission.objects.filter(
596
            pk__in=usi_submission_ids)
597
598
        # annotate biosample submission by statuses
599
        statuses = {}
600
601
        for res in submission_qs.values('status').annotate(
602
                count=Count('status')):
603
            statuses[res['status']] = res['count']
604
605
        # check for errors in submission. Those are statuses setted by
606
        # SubmitTask
607
        if ERROR in statuses:
608
            # submission failed
609
            logger.info("Submission %s failed" % uid_submission)
610
611
            self.update_message(uid_submission, submission_qs, ERROR)
612
613
            # send a mail to the user
614
            uid_submission.owner.email_user(
615
                "Error in biosample submission %s" % (
616
                    uid_submission.id),
617
                ("Something goes wrong with biosample submission. Please "
618
                 "report this to InjectTool team\n\n"
619
                 "%s" % truncatechars(
620
                    uid_submission.message, EMAIL_MAX_BODY_SIZE)),
621
            )
622
623
        elif READY in statuses:
624
            # submission failed
625
            logger.info("Temporary error for %s" % uid_submission)
626
627
            self.update_message(uid_submission, submission_qs, READY)
628
629
            # send a mail to the user
630
            uid_submission.owner.email_user(
631
                "Temporary error in biosample submission %s" % (
632
                    uid_submission.id),
633
                ("Something goes wrong with biosample submission. Please "
634
                 "try again\n\n"
635
                 "%s" % truncatechars(
636
                    uid_submission.message, EMAIL_MAX_BODY_SIZE)),
637
            )
638
639
        else:
640
            # Update submission status: a completed but not yet finalized
641
            # submission
642
            logger.info("Submission %s success" % uid_submission)
643
644
            self.update_message(uid_submission, submission_qs, SUBMITTED)
645
646
        return "success"
647
648
    def update_message(self, uid_submission, submission_qs, status):
649
        """Read :py:class:`biosample.models.Submission` message and set
650
        :py:class:`uid.models.Submission` message"""
651
652
        # get error messages for submission
653
        message = []
654
655
        for submission in submission_qs.filter(status=status):
656
            message.append(submission.message)
657
658
        self.update_submission_status(
659
            uid_submission, status, "\n".join(set(message)))
660
661
662
# register explicitly tasks
663
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
664
celery_app.tasks.register(SubmitTask)
665
celery_app.tasks.register(SplitSubmissionTask)
666
celery_app.tasks.register(SubmissionCompleteTask)
667