Completed
Push — master ( 4f7ee6...646424 )
by Paolo
08:30 queued 06:53
created

biosample.tasks.submission   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 656
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 51
eloc 300
dl 0
loc 656
rs 7.92
c 0
b 0
f 0

24 Methods

Rating   Name   Duplication   Size   Complexity  
A SubmissionHelper.read_token() 0 31 1
A SubmissionHelper.usi_submission_name() 0 8 1
A SubmissionHelper.__init__() 0 20 1
A SplitSubmissionHelper.__init__() 0 5 1
A SubmissionHelper.mark_submission() 0 4 1
A SubmissionHelper.recover_submission() 0 27 3
A SubmissionHelper.mark_fail() 0 5 1
A SubmitTask.run() 0 46 4
A SubmissionHelper.create_submission() 0 21 1
A SubmissionHelper.create_or_update_sample() 0 30 2
A SubmissionHelper.team_name() 0 8 1
A SubmissionHelper.read_samples() 0 13 2
A SubmissionHelper.start_submission() 0 11 2
A SubmissionHelper.mark_success() 0 5 1
A SubmissionHelper.add_samples() 0 20 3
A SubmissionHelper.owner() 0 11 1
A SubmissionCompleteTask.update_message() 0 12 2
A SplitSubmissionHelper.model_in_submission() 0 51 4
A SplitSubmissionHelper.process_model() 0 10 2
A SplitSubmissionHelper.create_submission() 0 17 1
B SplitSubmissionHelper.add_to_submission_data() 0 36 5
A SplitSubmissionTask.run() 0 41 1
B SubmissionCompleteTask.run() 0 78 5
A SplitSubmissionHelper.process_data() 0 27 4

How to fix   Complexity   

Complexity

Complex classes like biosample.tasks.submission often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count, F
19
from django.db.models.functions import Least
20
from django.utils import timezone
21
22
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
23
from common.tasks import BaseTask, NotifyAdminTaskMixin
24
from image.celery import app as celery_app
25
from submissions.tasks import SubmissionTaskMixin
26
27
from ..helpers import get_auth
28
from ..models import (
29
    Submission as USISubmission, SubmissionData as USISubmissionData)
30
31
# Get an instance of a logger
32
logger = get_task_logger(__name__)
33
34
# how many sample for submission
35
MAX_SAMPLES = 100
36
37
38
class SubmissionError(Exception):
39
    """Exception call for Error with submissions"""
40
41
    pass
42
43
44
# HINT: move into helper module?
45
class SubmissionHelper():
46
    """
47
    An helper class for submission task, used to deal with pyUSIrest
48
    """
49
50
    # define my class attributes
51
    def __init__(self, submission_id):
52
53
        # ok those are my default class attributes
54
        self.submission_id = submission_id
55
        self.submission_obj = None
56
        self.token = None
57
58
        # here are pyUSIrest object
59
        self.auth = None
60
        self.root = None
61
62
        # here I will track a USI submission
63
        self.usi_submission = None
64
65
        # here I will store samples already submitted
66
        self.submitted_samples = {}
67
68
        # get a submission object
69
        self.submission_obj = USISubmission.objects.get(
70
            pk=self.submission_id)
71
72
        # HINT: should I check my status?
73
74
    @property
75
    def owner(self):
76
        """Recover owner from a submission object related with a UID
77
        Submission
78
79
        Returns:
80
            :py:attr:`Submission.owner`: a django
81
            :py:class:`django.contrib.auth.models.User` object
82
        """
83
84
        return self.submission_obj.uid_submission.owner
85
86
    @property
87
    def team_name(self):
88
        """Recover team_name from a submission object
89
90
        Returns:
91
            str: the team name"""
92
93
        return self.owner.biosample_account.team.name
94
95
    @property
96
    def usi_submission_name(self):
97
        """Get/set biosample submission id from database
98
99
        Returns:
100
            str: the biosample USI submission identifier"""
101
102
        return self.submission_obj.usi_submission_name
103
104
    @usi_submission_name.setter
105
    def usi_submission_name(self, name):
106
107
        self.submission_obj.usi_submission_name = name
108
        self.submission_obj.save()
109
110
    def read_token(self):
111
        """Read token from REDIS database and set root attribute with a
112
        pyUSIrest.client.Root instance
113
114
        Returns:
115
            str: the read token"""
116
117
        # read biosample token from redis database
118
        client = redis.StrictRedis(
119
            host=settings.REDIS_HOST,
120
            port=settings.REDIS_PORT,
121
            db=settings.REDIS_DB)
122
123
        # infere key from submission data
124
        key = "token:submission:{submission_id}:{user}".format(
125
            submission_id=self.submission_obj.uid_submission.id,
126
            user=self.owner)
127
128
        # create a new auth object
129
        logger.debug("Reading token for '%s'" % self.owner)
130
131
        # getting token from redis db and set submission data
132
        self.token = client.get(key).decode("utf8")
133
134
        # get a root object with auth
135
        self.auth = get_auth(token=self.token)
136
137
        logger.debug("getting biosample root")
138
        self.root = pyUSIrest.client.Root(auth=self.auth)
139
140
        return self.token
141
142
    def start_submission(self):
143
        """
144
        Get a USI submission document. Recover submission if possible,
145
        create a new one if not defined. If recovered submission is
146
        closed, raise an error
147
        """
148
149
        if not self.recover_submission():
150
            self.create_submission()
151
152
        return self.usi_submission
153
154
    def recover_submission(self):
155
        """Try to recover a USI submission document or raise Exception. If
156
        not defined, return false"""
157
158
        # If no usi_submission_name, return False
159
        if not self.usi_submission_name:
160
            return False
161
162
        logger.info("Recovering submission %s for team %s" % (
163
            self.usi_submission_name,
164
            self.team_name))
165
166
        # get the same submission object
167
        self.usi_submission = self.root.get_submission_by_name(
168
            submission_name=self.usi_submission_name)
169
170
        # check that a submission is still editable
171
        if self.usi_submission.status != "Draft":
172
            raise SubmissionError(
173
                "Cannot recover submission '%s': current status is '%s'" % (
174
                   self.usi_submission_name,
175
                   self.usi_submission.status))
176
177
        # read already submitted samples
178
        self.read_samples()
179
180
        return True
181
182
    def create_submission(self):
183
        """Create a new USI submission object
184
185
        Returns:
186
            :py:class:`pyUSIrest.client.Submission` a pyUSIrest submission
187
            object
188
        """
189
190
        # getting team
191
        logger.debug("getting team '%s'" % (self.team_name))
192
        team = self.root.get_team_by_name(self.team_name)
193
194
        # create a new submission
195
        logger.info("Creating a new submission for '%s'" % (team.name))
196
        self.usi_submission = team.create_submission()
197
198
        # track submission_id in table
199
        self.usi_submission_name = self.usi_submission.name
200
201
        # return USI submission document
202
        return self.usi_submission
203
204
    def read_samples(self):
205
        """Read sample in a USI submission document and set submitted_samples
206
        attribute"""
207
208
        # read already submitted samples
209
        logger.debug("Getting info on samples...")
210
        samples = self.usi_submission.get_samples()
211
        logger.debug("Got %s samples" % (len(samples)))
212
213
        for sample in samples:
214
            self.submitted_samples[sample.alias] = sample
215
216
        return self.submitted_samples
217
218
    def create_or_update_sample(self, model):
219
        """Add or patch a sample into USI submission document. Can be
220
        animal or sample
221
222
        Args:
223
            model (:py:class:`uid.mixins.BioSampleMixin`): An animal or
224
                sample object"""
225
226
        # alias is used to reference the same objects
227
        alias = model.biosample_alias
228
229
        # check in my submitted samples
230
        if alias in self.submitted_samples:
231
            # patch sample
232
            logger.info("Patching %s" % (alias))
233
234
            # get usi sample
235
            sample = self.submitted_samples[alias]
236
            sample.patch(model.to_biosample())
237
238
        else:
239
            sample = self.usi_submission.create_sample(
240
                model.to_biosample())
241
242
            self.submitted_samples[alias] = sample
243
244
        # update sample status
245
        model.status = SUBMITTED
246
        model.last_submitted = timezone.now()
247
        model.save()
248
249
    def add_samples(self):
250
        """Iterate over sample data (animal/sample) and call
251
        create_or_update_sample (if model is in READY state)"""
252
253
        # iterate over sample data
254
        for submission_data in self.submission_obj.submission_data\
255
                .order_by('id'):
256
            # get model for simplicity
257
            model = submission_data.content_object
258
259
            if model.status == READY:
260
                logger.debug("Adding %s %s to submission %s" % (
261
                    model._meta.verbose_name,
262
                    model,
263
                    self.usi_submission_name))
264
                self.create_or_update_sample(model)
265
266
            else:
267
                logger.debug("Ignoring %s %s" % (
268
                    model._meta.verbose_name, model))
269
270
    def mark_submission(self, status, message):
271
        self.submission_obj.status = status
272
        self.submission_obj.message = message
273
        self.submission_obj.save()
274
275
    def mark_fail(self, message):
276
        """Set a :py:const:`ERROR <common.constants.ERROR>` status for
277
        :py:class:`biosample.models.Submission` and a message"""
278
279
        self.mark_submission(ERROR, message)
280
281
    def mark_success(self, message="Waiting for biosample validation"):
282
        """Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
283
        :py:class:`biosample.models.Submission` and a message"""
284
285
        self.mark_submission(SUBMITTED, message)
286
287
288
class SubmitTask(NotifyAdminTaskMixin, BaseTask):
289
    name = "Submit to Biosample"
290
    description = """Submit to Biosample using USI"""
291
292
    def run(self, usi_submission_id):
293
        """Run task. Instantiate a SubmissionHelper with the provided
294
        :py:class:`biosample.models.Submission` id. Read token from database,
295
        start or recover a submission, add samples to it and then mark a
296
        status for it
297
        """
298
299
        # get a submission helper object
300
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
301
302
        # No retries, we expect always success
303
        try:
304
            submission_helper.read_token()
305
            submission_helper.start_submission()
306
            submission_helper.add_samples()
307
            submission_helper.mark_success()
308
309
        except ConnectionError as exc:
310
            logger.error("Error in biosample submission: %s" % exc)
311
            message = "Errors in EBI API endpoints. Please try again later"
312
            logger.error(message)
313
314
            # track message in submission object
315
            submission_helper.mark_submission(READY, message)
316
317
        # TODO: should I rename this exception with a more informative name
318
        # when token expires during a submission?
319
        except RuntimeError as exc:
320
            logger.error("Error in biosample submission: %s" % exc)
321
            message = (
322
                "Your token is expired: please submit again to resume "
323
                "submission")
324
            logger.error(message)
325
326
            # track message in submission object
327
            submission_helper.mark_submission(READY, message)
328
329
        except Exception as exc:
330
            logger.error("Unmanaged error: %s" % exc)
331
            # get exception info
332
            einfo = traceback.format_exc()
333
334
            # track traceback in message
335
            submission_helper.mark_fail(einfo)
336
337
        return "success", usi_submission_id
338
339
340
# HINT: move into helper module?
341
class SplitSubmissionHelper():
342
    """
343
    helper class to split py:class`uid.models.Submission` data in
344
    bacthes limited in sizes"""
345
346
    def __init__(self, uid_submission):
347
        self.counter = 0
348
        self.uid_submission = uid_submission
349
        self.usi_submission = None
350
        self.submission_ids = []
351
352
    def process_data(self):
353
        """Add animal and its samples to a submission"""
354
355
        # here we try to submit first animal without parents, then animal
356
        # with parent with lowest foreign keys, supposing that when uploadimg
357
        # a chiuld, his parents need to be defined and so they have a lower id
358
        # the postgres LEAST is a function that will return the column with
359
        # the lowest value, then we have to order explicitely with F in
360
        # order to apply the NULLS FIRST condition (animal without parents)
361
        for animal in self.uid_submission.animal_set.annotate(
362
                least=Least('father_id', 'mother_id')).order_by(
363
                    F('least').asc(nulls_first=True), F('id')):
364
365
            # ignore not READY models
366
            self.process_model(animal)
367
368
            # Add their specimen
369
            for sample in animal.sample_set.all():
370
                # ignore not READY models
371
                self.process_model(sample)
372
373
            # end of cicle for animal
374
375
        # are there orphaned samples (a submission with only samples)?
376
        for sample in self.uid_submission.sample_set.all():
377
            # ignore not READY models
378
            self.process_model(sample)
379
380
    def process_model(self, model):
381
        """Test for a model in a biosample submission. Ignore a model if
382
        status is not READY"""
383
384
        if model.status == READY:
385
            self.add_to_submission_data(model)
386
387
        else:
388
            # already submittes, so could be ignored
389
            logger.debug("Ignoring %s %s" % (model._meta.verbose_name, model))
390
391
    def create_submission(self):
392
        """
393
        Create a new :py:class:`biosample.models.Submission` object and
394
        set sample counter to 0"""
395
396
        self.usi_submission = USISubmission.objects.create(
397
            uid_submission=self.uid_submission,
398
            status=READY)
399
400
        # track object pks
401
        self.usi_submission.refresh_from_db()
402
        self.submission_ids.append(self.usi_submission.id)
403
404
        logger.debug("Created submission %s" % (self.usi_submission))
405
406
        # reset couter object
407
        self.counter = 0
408
409
    def model_in_submission(self, model):
410
        """
411
        Check if :py:class:`uid.mixins.BioSampleMixin` is already in an
412
        opened submission"""
413
414
        logger.debug("Searching %s %s in submissions" % (
415
            model._meta.verbose_name,
416
            model))
417
418
        # get content type
419
        ct = ContentType.objects.get_for_model(model)
420
421
        # define a queryset
422
        data_qs = USISubmissionData.objects.filter(
423
            content_type=ct,
424
            object_id=model.id)
425
426
        # exclude opened submission
427
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
428
429
        if data_qs.count() == 1:
430
            usi_submission = data_qs.first().submission
431
432
            logger.debug("Found %s %s in %s" % (
433
                model._meta.verbose_name,
434
                model,
435
                usi_submission))
436
437
            # mark this batch to be called like it was created
438
            if usi_submission.id not in self.submission_ids:
439
                self.submission_ids.append(usi_submission.id)
440
441
                logger.debug(
442
                    "Reset status for submission %s" % (usi_submission))
443
                usi_submission.status = READY
444
                usi_submission.save()
445
446
            return True
447
448
        elif data_qs.count() >= 1:
449
            raise SubmissionError(
450
                "More than one submission opened for %s %s" % (
451
                    model._meta.verbose_name,
452
                    model))
453
454
        else:
455
            # no sample in data. I could append model into submission
456
            logger.debug("No %s %s in submission data" % (
457
                model._meta.verbose_name,
458
                model))
459
            return False
460
461
    def add_to_submission_data(self, model):
462
        """Add a :py:class:`uid.mixins.BioSampleMixin` to a
463
        :py:class:`biosample.models.Submission` object, or create a new
464
        one if there are more samples than required"""
465
466
        # get model type (animal or sample)
467
        model_type = model._meta.verbose_name
468
469
        # check if model is already in an opened submission
470
        if self.model_in_submission(model):
471
            logger.debug("Ignoring %s %s: already in a submission" % (
472
                model_type,
473
                model))
474
            return
475
476
        # Create a new submission if necessary
477
        if self.usi_submission is None:
478
            self.create_submission()
479
480
        # every time I split data in chunks I need to call the
481
        # submission task. Do it only on animals, to prevent
482
        # to put samples in a different submission
483
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
484
            self.create_submission()
485
486
        logger.info("Appending %s %s to %s" % (
487
            model._meta.verbose_name,
488
            model,
489
            self.usi_submission))
490
491
        # add object to submission data and updating counter
492
        USISubmissionData.objects.create(
493
            submission=self.usi_submission,
494
            content_object=model)
495
496
        self.counter += 1
497
498
499
class SplitSubmissionTask(SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
500
    """Split submission data in chunks in order to submit data through
501
    multiple tasks/processes and with smaller submissions"""
502
503
    name = "Split submission data"
504
    description = """Split submission data in chunks"""
505
    action = "biosample submission"
506
507
    def run(self, submission_id):
508
        """Call :py:class:`SplitSubmissionHelper` to split
509
        :py:class:`uid.models.Submission` data.
510
        Call :py:class:`SubmitTask` for each
511
        batch of data and then call :py:class:`SubmissionCompleteTask` after
512
        all data were submitted"""
513
514
        logger.info("Starting %s for submission %s" % (
515
            self.name, submission_id))
516
517
        uid_submission = self.get_uid_submission(submission_id)
518
519
        # call an helper class to create database objects
520
        submission_data_helper = SplitSubmissionHelper(uid_submission)
521
522
        # iterate over animal and samples
523
        submission_data_helper.process_data()
524
525
        # prepare to launch chord tasks
526
        submissioncomplete = SubmissionCompleteTask()
527
528
        # assign kwargs to chord
529
        callback = submissioncomplete.s(uid_submission_id=submission_id)
530
531
        submit = SubmitTask()
532
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
533
534
        logger.debug("Preparing chord for %s tasks" % len(header))
535
536
        # call chord task. Chord will be called only after all tasks
537
        res = chord(header)(callback)
538
539
        logger.info(
540
            "Start submission chord process for %s with task %s" % (
541
                uid_submission,
542
                res.task_id))
543
544
        logger.info("%s completed" % self.name)
545
546
        # return a status
547
        return "success"
548
549
550
class SubmissionCompleteTask(
551
        SubmissionTaskMixin, NotifyAdminTaskMixin, BaseTask):
552
    """Update submission status after batch submission"""
553
554
    name = "Complete Submission Process"
555
    description = """Check submission status and update stuff"""
556
    action = "biosample submission"
557
558
    def run(self, *args, **kwargs):
559
        """Fetch submission data and then update
560
        :py:class:`uid.models.Submission` status"""
561
562
        # those are the output of SubmitTask, as a tuple of
563
        # biosample.model.Submission.pk and "success"
564
        submission_statuses = args[0]
565
566
        # get UID submission
567
        uid_submission = self.get_uid_submission(kwargs['uid_submission_id'])
568
569
        # mark as completed if submission_statuses is empty, for example when
570
        # submitting a uid submission with no data
571
        if not submission_statuses:
572
            message = "Submission %s is empty!" % uid_submission
573
            logger.warning(message)
574
575
            # update submission status. No more queries on this
576
            self.update_submission_status(
577
                uid_submission, ERROR, message)
578
579
            return "success"
580
581
        # submission_statuses will be an array like this
582
        # [("success", 1), ("success"), 2]
583
        usi_submission_ids = [status[1] for status in submission_statuses]
584
585
        # fetch data from database
586
        submission_qs = USISubmission.objects.filter(
587
            pk__in=usi_submission_ids)
588
589
        # annotate biosample submission by statuses
590
        statuses = {}
591
592
        for res in submission_qs.values('status').annotate(
593
                count=Count('status')):
594
            statuses[res['status']] = res['count']
595
596
        # check for errors in submission. Those are statuses setted by
597
        # SubmitTask
598
        if ERROR in statuses:
599
            # submission failed
600
            logger.info("Submission %s failed" % uid_submission)
601
602
            self.update_message(uid_submission, submission_qs, ERROR)
603
604
            # send a mail to the user
605
            uid_submission.owner.email_user(
606
                "Error in biosample submission %s" % (
607
                    uid_submission.id),
608
                ("Something goes wrong with biosample submission. Please "
609
                 "report this to InjectTool team\n\n"
610
                 "%s" % uid_submission.message),
611
            )
612
613
        elif READY in statuses:
614
            # submission failed
615
            logger.info("Temporary error for %s" % uid_submission)
616
617
            self.update_message(uid_submission, submission_qs, READY)
618
619
            # send a mail to the user
620
            uid_submission.owner.email_user(
621
                "Temporary error in biosample submission %s" % (
622
                    uid_submission.id),
623
                ("Something goes wrong with biosample submission. Please "
624
                 "try again\n\n"
625
                 "%s" % uid_submission.message),
626
            )
627
628
        else:
629
            # Update submission status: a completed but not yet finalized
630
            # submission
631
            logger.info("Submission %s success" % uid_submission)
632
633
            self.update_message(uid_submission, submission_qs, SUBMITTED)
634
635
        return "success"
636
637
    def update_message(self, uid_submission, submission_qs, status):
638
        """Read :py:class:`biosample.models.Submission` message and set
639
        :py:class:`uid.models.Submission` message"""
640
641
        # get error messages for submission
642
        message = []
643
644
        for submission in submission_qs.filter(status=status):
645
            message.append(submission.message)
646
647
        self.update_submission_status(
648
            uid_submission, status, "\n".join(set(message)))
649
650
651
# register explicitly tasks
652
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
653
celery_app.tasks.register(SubmitTask)
654
celery_app.tasks.register(SplitSubmissionTask)
655
celery_app.tasks.register(SubmissionCompleteTask)
656