Completed
Push — master ( ff7896...f95e2c )
by Paolo
15s queued 13s
created

biosample.tasks.submission   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 665
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 50
eloc 305
dl 0
loc 665
rs 8.4
c 0
b 0
f 0

24 Methods

Rating   Name   Duplication   Size   Complexity  
A SubmissionHelper.read_token() 0 31 1
A SubmissionHelper.__init__() 0 20 1
A SubmissionHelper.usi_submission_name() 0 8 1
A SubmissionHelper.recover_submission() 0 27 3
A SubmissionHelper.create_submission() 0 21 1
A SubmissionHelper.team_name() 0 8 1
A SubmissionHelper.read_samples() 0 13 2
A SubmissionHelper.start_submission() 0 11 2
A TaskFailureMixin.on_failure() 0 23 1
A SubmissionHelper.owner() 0 11 1
A SplitSubmissionHelper.__init__() 0 5 1
A SubmissionHelper.mark_submission() 0 4 1
A SubmissionHelper.mark_fail() 0 5 1
A SubmitTask.run() 0 46 4
A SubmissionCompleteTask.update_message() 0 13 2
A SplitSubmissionHelper.model_in_submission() 0 51 4
A SplitSubmissionHelper.create_submission() 0 17 1
A SubmissionHelper.create_or_update_sample() 0 30 2
B SplitSubmissionHelper.add_to_submission_data() 0 36 5
A SubmissionHelper.mark_success() 0 5 1
A SplitSubmissionTask.run() 0 42 1
A SubmissionHelper.add_samples() 0 20 3
A SubmissionCompleteTask.run() 0 68 4
A SplitSubmissionHelper.process_data() 0 23 5

How to fix   Complexity   

Complexity

Complex classes like biosample.tasks.submission often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        """Mark submission with error status, send async message and a
51
        mail to the owner or such submission"""
52
53
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
54
55
        # get submission object
56
        submission_obj = Submission.objects.get(pk=args[0])
57
58
        submission_obj.status = ERROR
59
        submission_obj.message = (
60
            "Error in biosample submission: %s" % str(exc))
61
        submission_obj.save()
62
63
        # send async message
64
        send_message(submission_obj)
65
66
        # send a mail to the user with the stacktrace (einfo)
67
        submission_obj.owner.email_user(
68
            "Error in biosample submission %s" % (
69
                submission_obj.id),
70
            ("Something goes wrong with biosample submission. Please report "
71
             "this to InjectTool team\n\n %s" % str(einfo)),
72
        )
73
74
        # TODO: submit mail to admin
75
76
77
# HINT: move into helper module?
78
class SubmissionHelper():
79
    """
80
    An helper class for submission task, used to deal with pyUSIrest
81
    """
82
83
    # define my class attributes
84
    def __init__(self, submission_id):
85
86
        # ok those are my default class attributes
87
        self.submission_id = submission_id
88
        self.submission_obj = None
89
        self.token = None
90
91
        # here are pyUSIrest object
92
        self.auth = None
93
        self.root = None
94
95
        # here I will track a USI submission
96
        self.usi_submission = None
97
98
        # here I will store samples already submitted
99
        self.submitted_samples = {}
100
101
        # get a submission object
102
        self.submission_obj = USISubmission.objects.get(
103
            pk=self.submission_id)
104
105
        # HINT: should I check my status?
106
107
    @property
108
    def owner(self):
109
        """Recover owner from a submission object related with a UID
110
        Submission
111
112
        Returns:
113
            :py:attr:`Submission.owner`: a django
114
            :py:class:`django.contrib.auth.models.User` object
115
        """
116
117
        return self.submission_obj.uid_submission.owner
118
119
    @property
120
    def team_name(self):
121
        """Recover team_name from a submission object
122
123
        Returns:
124
            str: the team name"""
125
126
        return self.owner.biosample_account.team.name
127
128
    @property
129
    def usi_submission_name(self):
130
        """Get/set biosample submission id from database
131
132
        Returns:
133
            str: the biosample USI submission identifier"""
134
135
        return self.submission_obj.usi_submission_name
136
137
    @usi_submission_name.setter
138
    def usi_submission_name(self, name):
139
140
        self.submission_obj.usi_submission_name = name
141
        self.submission_obj.save()
142
143
    def read_token(self):
144
        """Read token from REDIS database and set root attribute with a
145
        pyUSIrest.client.Root instance
146
147
        Returns:
148
            str: the read token"""
149
150
        # read biosample token from redis database
151
        client = redis.StrictRedis(
152
            host=settings.REDIS_HOST,
153
            port=settings.REDIS_PORT,
154
            db=settings.REDIS_DB)
155
156
        # infere key from submission data
157
        key = "token:submission:{submission_id}:{user}".format(
158
            submission_id=self.submission_obj.uid_submission.id,
159
            user=self.owner)
160
161
        # create a new auth object
162
        logger.debug("Reading token for '%s'" % self.owner)
163
164
        # getting token from redis db and set submission data
165
        self.token = client.get(key).decode("utf8")
166
167
        # get a root object with auth
168
        self.auth = get_auth(token=self.token)
169
170
        logger.debug("getting biosample root")
171
        self.root = pyUSIrest.client.Root(auth=self.auth)
172
173
        return self.token
174
175
    def start_submission(self):
176
        """
177
        Get a USI submission document. Recover submission if possible,
178
        create a new one if not defined. If recovered submission is
179
        closed, raise an error
180
        """
181
182
        if not self.recover_submission():
183
            self.create_submission()
184
185
        return self.usi_submission
186
187
    def recover_submission(self):
188
        """Try to recover a USI submission document or raise Exception. If
189
        not defined, return false"""
190
191
        # If no usi_submission_name, return False
192
        if not self.usi_submission_name:
193
            return False
194
195
        logger.info("Recovering submission %s for team %s" % (
196
            self.usi_submission_name,
197
            self.team_name))
198
199
        # get the same submission object
200
        self.usi_submission = self.root.get_submission_by_name(
201
            submission_name=self.usi_submission_name)
202
203
        # check that a submission is still editable
204
        if self.usi_submission.status != "Draft":
205
            raise SubmissionError(
206
                "Cannot recover submission '%s': current status is '%s'" % (
207
                   self.usi_submission_name,
208
                   self.usi_submission.status))
209
210
        # read already submitted samples
211
        self.read_samples()
212
213
        return True
214
215
    def create_submission(self):
216
        """Create a new USI submission object
217
218
        Returns:
219
            :py:class:`pyUSIrest.client.Submission` a pyUSIrest submission
220
            object
221
        """
222
223
        # getting team
224
        logger.debug("getting team '%s'" % (self.team_name))
225
        team = self.root.get_team_by_name(self.team_name)
226
227
        # create a new submission
228
        logger.info("Creating a new submission for '%s'" % (team.name))
229
        self.usi_submission = team.create_submission()
230
231
        # track submission_id in table
232
        self.usi_submission_name = self.usi_submission.name
233
234
        # return USI submission document
235
        return self.usi_submission
236
237
    def read_samples(self):
238
        """Read sample in a USI submission document and set submitted_samples
239
        attribute"""
240
241
        # read already submitted samples
242
        logger.debug("Getting info on samples...")
243
        samples = self.usi_submission.get_samples()
244
        logger.debug("Got %s samples" % (len(samples)))
245
246
        for sample in samples:
247
            self.submitted_samples[sample.alias] = sample
248
249
        return self.submitted_samples
250
251
    def create_or_update_sample(self, model):
252
        """Add or patch a sample into USI submission document. Can be
253
        animal or sample
254
255
        Args:
256
            model (:py:class:`image_app.mixins.BioSampleMixin`): An animal or
257
                sample object"""
258
259
        # alias is used to reference the same objects
260
        alias = model.biosample_alias
261
262
        # check in my submitted samples
263
        if alias in self.submitted_samples:
264
            # patch sample
265
            logger.info("Patching %s" % (alias))
266
267
            # get usi sample
268
            sample = self.submitted_samples[alias]
269
            sample.patch(model.to_biosample())
270
271
        else:
272
            sample = self.usi_submission.create_sample(
273
                model.to_biosample())
274
275
            self.submitted_samples[alias] = sample
276
277
        # update sample status
278
        model.name.status = SUBMITTED
279
        model.name.last_submitted = timezone.now()
280
        model.name.save()
281
282
    def add_samples(self):
283
        """Iterate over sample data (animal/sample) and call
284
        create_or_update_sample (if model is in READY state)"""
285
286
        # iterate over sample data
287
        for submission_data in self.submission_obj.submission_data\
288
                .order_by('id'):
289
            # get model for simplicity
290
            model = submission_data.content_object
291
292
            if model.name.status == READY:
293
                logger.debug("Adding %s %s to submission %s" % (
294
                    model._meta.verbose_name,
295
                    model,
296
                    self.usi_submission_name))
297
                self.create_or_update_sample(model)
298
299
            else:
300
                logger.debug("Ignoring %s %s" % (
301
                    model._meta.verbose_name, model))
302
303
    def mark_submission(self, status, message):
304
        self.submission_obj.status = status
305
        self.submission_obj.message = message
306
        self.submission_obj.save()
307
308
    def mark_fail(self, message):
309
        """Set a :py:const:`ERROR <common.constants.ERROR>` status for
310
        :py:class:`biosample.models.Submission` and a message"""
311
312
        self.mark_submission(ERROR, message)
313
314
    def mark_success(self, message="Waiting for biosample validation"):
315
        """Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
316
        :py:class:`biosample.models.Submission` and a message"""
317
318
        self.mark_submission(SUBMITTED, message)
319
320
321
class SubmitTask(MyTask):
322
    name = "Submit to Biosample"
323
    description = """Submit to Biosample using USI"""
324
325
    def run(self, usi_submission_id):
326
        """Run task. Instantiate a SubmissionHelper with the provided
327
        :py:class:`biosample.models.Submission` id. Read token from database,
328
        start or recover a submission, add samples to it and then mark a
329
        status for it
330
        """
331
332
        # get a submission helper object
333
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
334
335
        # No retries, we expect always success
336
        try:
337
            submission_helper.read_token()
338
            submission_helper.start_submission()
339
            submission_helper.add_samples()
340
            submission_helper.mark_success()
341
342
        except ConnectionError as exc:
343
            logger.error("Error in biosample submission: %s" % exc)
344
            message = "Errors in EBI API endpoints. Please try again later"
345
            logger.error(message)
346
347
            # track message in submission object
348
            submission_helper.mark_submission(READY, message)
349
350
        # TODO: should I rename this exception with a more informative name
351
        # when token expires during a submission?
352
        except RuntimeError as exc:
353
            logger.error("Error in biosample submission: %s" % exc)
354
            message = (
355
                "Your token is expired: please submit again to resume "
356
                "submission")
357
            logger.error(message)
358
359
            # track message in submission object
360
            submission_helper.mark_submission(READY, message)
361
362
        except Exception as exc:
363
            logger.error("Unmanaged error: %s" % exc)
364
            # get exception info
365
            einfo = traceback.format_exc()
366
367
            # track traceback in message
368
            submission_helper.mark_fail(einfo)
369
370
        return "success", usi_submission_id
371
372
373
# HINT: move into helper module?
374
class SplitSubmissionHelper():
375
    """
376
    helper class to split py:class`image_app.models.Submission` data in
377
    bacthes limited in sizes"""
378
379
    def __init__(self, uid_submission):
380
        self.counter = 0
381
        self.uid_submission = uid_submission
382
        self.usi_submission = None
383
        self.submission_ids = []
384
385
    def process_data(self):
386
        """Add animal and its samples to a submission"""
387
388
        for animal in Animal.objects.filter(
389
                name__submission=self.uid_submission):
390
391
            # add animal if not yet submitted, or patch it
392
            if animal.name.status == READY:
393
                self.add_to_submission_data(animal)
394
395
            else:
396
                # already submitted, so could be ignored
397
                logger.debug("Ignoring animal %s" % (animal))
398
399
            # Add their specimen
400
            for sample in animal.sample_set.all():
401
                # add sample if not yet submitted
402
                if sample.name.status == READY:
403
                    self.add_to_submission_data(sample)
404
405
                else:
406
                    # already submittes, so could be ignored
407
                    logger.debug("Ignoring sample %s" % (sample))
408
409
            # end of cicle for animal.
410
411
    def create_submission(self):
412
        """
413
        Create a new :py:class:`biosample.models.Submission` object and
414
        set sample counter to 0"""
415
416
        self.usi_submission = USISubmission.objects.create(
417
            uid_submission=self.uid_submission,
418
            status=READY)
419
420
        # track object pks
421
        self.usi_submission.refresh_from_db()
422
        self.submission_ids.append(self.usi_submission.id)
423
424
        logger.debug("Created submission %s" % (self.usi_submission))
425
426
        # reset couter object
427
        self.counter = 0
428
429
    def model_in_submission(self, model):
430
        """
431
        Check if :py:class:`image_app.mixins.BioSampleMixin` is already in an
432
        opened submission"""
433
434
        logger.debug("Searching %s %s in submissions" % (
435
            model._meta.verbose_name,
436
            model))
437
438
        # get content type
439
        ct = ContentType.objects.get_for_model(model)
440
441
        # define a queryset
442
        data_qs = USISubmissionData.objects.filter(
443
            content_type=ct,
444
            object_id=model.id)
445
446
        # exclude opened submission
447
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
448
449
        if data_qs.count() == 1:
450
            usi_submission = data_qs.first().submission
451
452
            logger.debug("Found %s %s in %s" % (
453
                model._meta.verbose_name,
454
                model,
455
                usi_submission))
456
457
            # mark this batch to be called like it was created
458
            if usi_submission.id not in self.submission_ids:
459
                self.submission_ids.append(usi_submission.id)
460
461
                logger.debug(
462
                    "Reset status for submission %s" % (usi_submission))
463
                usi_submission.status = READY
464
                usi_submission.save()
465
466
            return True
467
468
        elif data_qs.count() >= 1:
469
            raise SubmissionError(
470
                "More than one submission opened for %s %s" % (
471
                    model._meta.verbose_name,
472
                    model))
473
474
        else:
475
            # no sample in data. I could append model into submission
476
            logger.debug("No %s %s in submission data" % (
477
                model._meta.verbose_name,
478
                model))
479
            return False
480
481
    def add_to_submission_data(self, model):
482
        """Add a :py:class:`image_app.mixins.BioSampleMixin` to a
483
        :py:class:`biosample.models.Submission` object, or create a new
484
        one if there are more samples than required"""
485
486
        # get model type (animal or sample)
487
        model_type = model._meta.verbose_name
488
489
        # check if model is already in an opened submission
490
        if self.model_in_submission(model):
491
            logger.info("Ignoring %s %s: already in a submission" % (
492
                model_type,
493
                model))
494
            return
495
496
        # Create a new submission if necessary
497
        if self.usi_submission is None:
498
            self.create_submission()
499
500
        # every time I split data in chunks I need to call the
501
        # submission task. Do it only on animals, to prevent
502
        # to put samples in a different submission
503
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
504
            self.create_submission()
505
506
        logger.info("Appending %s %s to %s" % (
507
            model._meta.verbose_name,
508
            model,
509
            self.usi_submission))
510
511
        # add object to submission data and updating counter
512
        USISubmissionData.objects.create(
513
            submission=self.usi_submission,
514
            content_object=model)
515
516
        self.counter += 1
517
518
519
class SplitSubmissionTask(TaskFailureMixin, MyTask):
520
    """Split submission data in chunks in order to submit data through
521
    multiple tasks/processes and with smaller submissions"""
522
523
    name = "Split submission data"
524
    description = """Split submission data in chunks"""
525
526
    def run(self, submission_id):
527
        """Call :py:class:`SplitSubmissionHelper` to split
528
        :py:class:`image_app.models.Submission` data.
529
        Call :py:class:`SubmitTask` for each
530
        batch of data and then call :py:class:`SubmissionCompleteTask` after
531
        all data were submitted"""
532
533
        logger.info("Starting %s for submission %s" % (
534
            self.name, submission_id))
535
536
        uid_submission = Submission.objects.get(
537
            pk=submission_id)
538
539
        # call an helper class to create database objects
540
        submission_data_helper = SplitSubmissionHelper(uid_submission)
541
542
        # iterate over animal and samples
543
        submission_data_helper.process_data()
544
545
        # prepare to launch chord tasks
546
        submissioncomplete = SubmissionCompleteTask()
547
548
        # assign kwargs to chord
549
        callback = submissioncomplete.s(uid_submission_id=submission_id)
550
551
        submit = SubmitTask()
552
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
553
554
        logger.debug("Preparing chord for %s tasks" % len(header))
555
556
        # call chord task. Chord will be called only after all tasks
557
        res = chord(header)(callback)
558
559
        logger.info(
560
            "Start submission chord process for %s with task %s" % (
561
                uid_submission,
562
                res.task_id))
563
564
        logger.info("%s completed" % self.name)
565
566
        # return a status
567
        return "success"
568
569
570
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
571
    """Update submission status after batch submission"""
572
573
    name = "Complete Submission Process"
574
    description = """Check submission status and update stuff"""
575
576
    def run(self, *args, **kwargs):
577
        """Fetch submission data and then update
578
        :py:class:`image_app.models.Submission` status"""
579
580
        submission_statuses = args[0]
581
582
        # submission_statuses will be an array like this
583
        # [("success", 1), ("success"), 2]
584
        usi_submission_ids = [status[1] for status in submission_statuses]
585
586
        # fetch data from database
587
        submission_qs = USISubmission.objects.filter(
588
            pk__in=usi_submission_ids)
589
590
        # get UID submission
591
        uid_submission = Submission.objects.get(
592
            pk=kwargs['uid_submission_id'])
593
594
        # annotate biosample submission by statuses
595
        statuses = {}
596
597
        for res in submission_qs.values('status').annotate(
598
                count=Count('status')):
599
            statuses[res['status']] = res['count']
600
601
        # check for errors in submission. Those are statuses setted by
602
        # SubmitTask
603
        if ERROR in statuses:
604
            # submission failed
605
            logger.info("Submission %s failed" % uid_submission)
606
607
            self.update_message(uid_submission, submission_qs, ERROR)
608
609
            # send a mail to the user
610
            uid_submission.owner.email_user(
611
                "Error in biosample submission %s" % (
612
                    uid_submission.id),
613
                ("Something goes wrong with biosample submission. Please "
614
                 "report this to InjectTool team\n\n"
615
                 "%s" % uid_submission.message),
616
            )
617
618
        elif READY in statuses:
619
            # submission failed
620
            logger.info("Temporary error for %s" % uid_submission)
621
622
            self.update_message(uid_submission, submission_qs, READY)
623
624
            # send a mail to the user
625
            uid_submission.owner.email_user(
626
                "Temporary error in biosample submission %s" % (
627
                    uid_submission.id),
628
                ("Something goes wrong with biosample submission. Please "
629
                 "try again\n\n"
630
                 "%s" % uid_submission.message),
631
            )
632
633
        else:
634
            # Update submission status: a completed but not yet finalized
635
            # submission
636
            logger.info("Submission %s success" % uid_submission)
637
638
            self.update_message(uid_submission, submission_qs, SUBMITTED)
639
640
        # send async message
641
        send_message(uid_submission)
642
643
        return "success"
644
645
    def update_message(self, uid_submission, submission_qs, status):
646
        """Read :py:class:`biosample.models.Submission` message and set
647
        :py:class:`image_app.models.Submission` message"""
648
649
        # get error messages for submission
650
        message = []
651
652
        for submission in submission_qs.filter(status=status):
653
            message.append(submission.message)
654
655
        uid_submission.status = status
656
        uid_submission.message = "\n".join(set(message))
657
        uid_submission.save()
658
659
660
# register explicitly tasks
661
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
662
celery_app.tasks.register(SubmitTask)
663
celery_app.tasks.register(SplitSubmissionTask)
664
celery_app.tasks.register(SubmissionCompleteTask)
665