Passed
Pull Request — master (#44)
by Paolo
07:04
created

biosample.tasks.submission   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 612
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 50
eloc 303
dl 0
loc 612
rs 8.4
c 0
b 0
f 0

24 Methods

Rating   Name   Duplication   Size   Complexity  
A SubmissionHelper.read_token() 0 28 1
A SubmissionHelper.usi_submission_name() 0 5 1
A SubmissionHelper.__init__() 0 20 1
A SplitSubmissionHelper.__init__() 0 5 1
A SubmissionHelper.mark_submission() 0 4 1
A SubmissionHelper.recover_submission() 0 27 3
A SubmissionHelper.mark_fail() 0 4 1
A SubmitTask.run() 0 40 4
A SubmissionHelper.create_submission() 0 16 1
A SubmissionHelper.create_or_update_sample() 0 24 2
A SubmissionHelper.team_name() 0 5 1
A SubmissionHelper.read_samples() 0 13 2
A SubmissionHelper.start_submission() 0 11 2
A SubmissionHelper.mark_success() 0 4 1
A SubmissionHelper.add_samples() 0 19 3
A TaskFailureMixin.on_failure() 0 20 1
A SubmissionHelper.owner() 0 6 1
A SplitSubmissionHelper.model_in_submission() 0 49 4
A SplitSubmissionHelper.create_submission() 0 15 1
B SplitSubmissionHelper.add_to_submission_data() 0 32 5
A SplitSubmissionTask.run() 0 38 1
A SubmissionCompleteTask.run() 0 67 4
A SplitSubmissionHelper.process_data() 0 23 5
A SubmissionCompleteTask.__update_message() 0 13 2

How to fix   Complexity   

Complexity

Complex classes like biosample.tasks.submission often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
51
52
        # get submission object
53
        submission_obj = Submission.objects.get(pk=args[0])
54
55
        submission_obj.status = ERROR
56
        submission_obj.message = (
57
            "Error in biosample submission: %s" % str(exc))
58
        submission_obj.save()
59
60
        # send async message
61
        send_message(submission_obj)
62
63
        # send a mail to the user with the stacktrace (einfo)
64
        submission_obj.owner.email_user(
65
            "Error in biosample submission %s" % (
66
                submission_obj.id),
67
            ("Something goes wrong with biosample submission. Please report "
68
             "this to InjectTool team\n\n %s" % str(einfo)),
69
        )
70
71
        # TODO: submit mail to admin
72
73
74
# HINT: move into helper module?
75
class SubmissionHelper():
76
    """
77
    An helper class for submission task, used to deal with pyUSIrest
78
    """
79
80
    # define my class attributes
81
    def __init__(self, submission_id):
82
83
        # ok those are my default class attributes
84
        self.submission_id = submission_id
85
        self.submission_obj = None
86
        self.token = None
87
88
        # here are pyUSIrest object
89
        self.auth = None
90
        self.root = None
91
92
        # here I will track a USI submission
93
        self.usi_submission = None
94
95
        # here I will store samples already submitted
96
        self.submitted_samples = {}
97
98
        # get a submission object
99
        self.submission_obj = USISubmission.objects.get(
100
            pk=self.submission_id)
101
102
        # HINT: should I check my status?
103
104
    @property
105
    def owner(self):
106
        """Recover owner from a submission object related with a UID
107
        Submission"""
108
109
        return self.submission_obj.uid_submission.owner
110
111
    @property
112
    def team_name(self):
113
        """Recover team_name from a submission object"""
114
115
        return self.owner.biosample_account.team.name
116
117
    @property
118
    def usi_submission_name(self):
119
        """Get biosample submission id from database"""
120
121
        return self.submission_obj.usi_submission_name
122
123
    @usi_submission_name.setter
124
    def usi_submission_name(self, name):
125
        """Get biosample submission id from database"""
126
127
        self.submission_obj.usi_submission_name = name
128
        self.submission_obj.save()
129
130
    def read_token(self):
131
        """Read token from REDIS database and set root attribute with a
132
        pyUSIrest.client.Root instance"""
133
134
        # read biosample token from redis database
135
        client = redis.StrictRedis(
136
            host=settings.REDIS_HOST,
137
            port=settings.REDIS_PORT,
138
            db=settings.REDIS_DB)
139
140
        # infere key from submission data
141
        key = "token:submission:{submission_id}:{user}".format(
142
            submission_id=self.submission_obj.uid_submission.id,
143
            user=self.owner)
144
145
        # create a new auth object
146
        logger.debug("Reading token for '%s'" % self.owner)
147
148
        # getting token from redis db and set submission data
149
        self.token = client.get(key).decode("utf8")
150
151
        # get a root object with auth
152
        self.auth = get_auth(token=self.token)
153
154
        logger.debug("getting biosample root")
155
        self.root = pyUSIrest.client.Root(auth=self.auth)
156
157
        return self.token
158
159
    def start_submission(self):
160
        """
161
        Get a USI submission document. Recover submission if possible,
162
        create a new one if not defined. If recovered submission is
163
        closed, raise an error
164
        """
165
166
        if not self.recover_submission():
167
            self.create_submission()
168
169
        return self.usi_submission
170
171
    def recover_submission(self):
172
        """Try to recover a USI submission document or raise Exception. If
173
        not defined, return false"""
174
175
        # If no usi_submission_name, return False
176
        if not self.usi_submission_name:
177
            return False
178
179
        logger.info("Recovering submission %s for team %s" % (
180
            self.usi_submission_name,
181
            self.team_name))
182
183
        # get the same submission object
184
        self.usi_submission = self.root.get_submission_by_name(
185
            submission_name=self.usi_submission_name)
186
187
        # check that a submission is still editable
188
        if self.usi_submission.status != "Draft":
189
            raise SubmissionError(
190
                "Cannot recover submission '%s': current status is '%s'" % (
191
                   self.usi_submission_name,
192
                   self.usi_submission.status))
193
194
        # read already submitted samples
195
        self.read_samples()
196
197
        return True
198
199
    def create_submission(self):
200
        """Create a nre USI submission object"""
201
202
        # getting team
203
        logger.debug("getting team '%s'" % (self.team_name))
204
        team = self.root.get_team_by_name(self.team_name)
205
206
        # create a new submission
207
        logger.info("Creating a new submission for '%s'" % (team.name))
208
        self.usi_submission = team.create_submission()
209
210
        # track submission_id in table
211
        self.usi_submission_name = self.usi_submission.name
212
213
        # return USI submission document
214
        return self.usi_submission
215
216
    def read_samples(self):
217
        """Read sample in a USI submission document and set submitted_samples
218
        attribute"""
219
220
        # read already submitted samples
221
        logger.debug("Getting info on samples...")
222
        samples = self.usi_submission.get_samples()
223
        logger.debug("Got %s samples" % (len(samples)))
224
225
        for sample in samples:
226
            self.submitted_samples[sample.alias] = sample
227
228
        return self.submitted_samples
229
230
    def create_or_update_sample(self, model):
231
        """Add or patch a sample into USI submission document. Can be
232
        animal or sample"""
233
234
        # alias is used to reference the same objects
235
        alias = model.biosample_alias
236
237
        # check in my submitted samples
238
        if alias in self.submitted_samples:
239
            # patch sample
240
            logger.info("Patching %s" % (alias))
241
242
            # get usi sample
243
            sample = self.submitted_samples[alias]
244
            sample.patch(model.to_biosample())
245
246
        else:
247
            self.usi_submission.create_sample(
248
                model.to_biosample())
249
250
        # update sample status
251
        model.name.status = SUBMITTED
252
        model.name.last_submitted = timezone.now()
253
        model.name.save()
254
255
    def add_samples(self):
256
        """Iterate over sample data (animal/sample) and call
257
        create_or_update_sample"""
258
259
        # iterate over sample data
260
        for submission_data in self.submission_obj.submission_data.all():
261
            # get model for simplicity
262
            model = submission_data.content_object
263
264
            if model.name.status == READY:
265
                logger.debug("Adding %s %s to submission %s" % (
266
                    model._meta.verbose_name,
267
                    model,
268
                    self.usi_submission_name))
269
                self.create_or_update_sample(model)
270
271
            else:
272
                logger.debug("Ignoring %s %s" % (
273
                    model._meta.verbose_name, model))
274
275
    def mark_submission(self, status, message):
276
        self.submission_obj.status = status
277
        self.submission_obj.message = message
278
        self.submission_obj.save()
279
280
    def mark_fail(self, message):
281
        """Set a ERROR status for biosample.models.Submission and a message"""
282
283
        self.mark_submission(ERROR, message)
284
285
    def mark_success(self, message="Waiting for biosample validation"):
286
        """Set a ERROR status for biosample.models.Submission and a message"""
287
288
        self.mark_submission(SUBMITTED, message)
289
290
291
class SubmitTask(MyTask):
292
    name = "Submit to Biosample"
293
    description = """Submit to Biosample using USI"""
294
295
    def run(self, usi_submission_id):
296
        # get a submission helper object
297
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
298
299
        # No retries, we expect always success
300
        try:
301
            submission_helper.read_token()
302
            submission_helper.start_submission()
303
            submission_helper.add_samples()
304
            submission_helper.mark_success()
305
306
        except ConnectionError as exc:
307
            logger.error("Error in biosample submission: %s" % exc)
308
            message = "Errors in EBI API endpoints. Please try again later"
309
            logger.error(message)
310
311
            # track message in submission object
312
            submission_helper.mark_submission(READY, message)
313
314
        # TODO: should I rename this exception with a more informative name
315
        # when token expires during a submission?
316
        except RuntimeError as exc:
317
            logger.error("Error in biosample submission: %s" % exc)
318
            message = (
319
                "Your token is expired: please submit again to resume "
320
                "submission")
321
            logger.error(message)
322
323
            # track message in submission object
324
            submission_helper.mark_submission(READY, message)
325
326
        except Exception as exc:
327
            logger.error("Unmanaged error: %s" % exc)
328
            # get exception info
329
            einfo = traceback.format_exc()
330
331
            # track traceback in message
332
            submission_helper.mark_fail(einfo)
333
334
        return "success", usi_submission_id
335
336
337
# HINT: move into helper module?
338
class SplitSubmissionHelper():
339
    def __init__(self, uid_submission):
340
        self.counter = 0
341
        self.uid_submission = uid_submission
342
        self.usi_submission = None
343
        self.submission_ids = []
344
345
    def process_data(self):
346
        """Add animal and its samples to a submission"""
347
348
        for animal in Animal.objects.filter(
349
                name__submission=self.uid_submission):
350
351
            # add animal if not yet submitted, or patch it
352
            if animal.name.status == READY:
353
                self.add_to_submission_data(animal)
354
355
            else:
356
                # already submitted, so could be ignored
357
                logger.debug("Ignoring animal %s" % (animal))
358
359
            # Add their specimen
360
            for sample in animal.sample_set.all():
361
                # add sample if not yet submitted
362
                if sample.name.status == READY:
363
                    self.add_to_submission_data(sample)
364
365
                else:
366
                    # already submittes, so could be ignored
367
                    logger.debug("Ignoring sample %s" % (sample))
368
369
            # end of cicle for animal.
370
371
    def create_submission(self):
372
        """Create a new database object and reset counter"""
373
374
        self.usi_submission = USISubmission.objects.create(
375
            uid_submission=self.uid_submission,
376
            status=READY)
377
378
        # track object pks
379
        self.usi_submission.refresh_from_db()
380
        self.submission_ids.append(self.usi_submission.id)
381
382
        logger.debug("Created submission %s" % (self.usi_submission))
383
384
        # reset couter object
385
        self.counter = 0
386
387
    def model_in_submission(self, model):
388
        """check if model is already in an opened submission"""
389
390
        logger.debug("Searching %s %s in submissions" % (
391
            model._meta.verbose_name,
392
            model))
393
394
        # get content type
395
        ct = ContentType.objects.get_for_model(model)
396
397
        # define a queryset
398
        data_qs = USISubmissionData.objects.filter(
399
            content_type=ct,
400
            object_id=model.id)
401
402
        # exclude opened submission
403
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
404
405
        if data_qs.count() == 1:
406
            usi_submission = data_qs.first().submission
407
408
            logger.debug("Found %s %s in %s" % (
409
                model._meta.verbose_name,
410
                model,
411
                usi_submission))
412
413
            # mark this batch to be called like it was created
414
            if usi_submission.id not in self.submission_ids:
415
                self.submission_ids.append(usi_submission.id)
416
417
                logger.debug(
418
                    "Reset status for submission %s" % (usi_submission))
419
                usi_submission.status = READY
420
                usi_submission.save()
421
422
            return True
423
424
        elif data_qs.count() >= 1:
425
            raise SubmissionError(
426
                "More than one submission opened for %s %s" % (
427
                    model._meta.verbose_name,
428
                    model))
429
430
        else:
431
            # no sample in data. I could append model into submission
432
            logger.debug("No %s %s in submission data" % (
433
                model._meta.verbose_name,
434
                model))
435
            return False
436
437
    def add_to_submission_data(self, model):
438
        # get model type (animal or sample)
439
        model_type = model._meta.verbose_name
440
441
        # check if model is already in an opened submission
442
        if self.model_in_submission(model):
443
            logger.info("Ignoring %s %s: already in a submission" % (
444
                model_type,
445
                model))
446
            return
447
448
        # Create a new submission if necessary
449
        if self.usi_submission is None:
450
            self.create_submission()
451
452
        # every time I split data in chunks I need to call the
453
        # submission task. Do it only on animals, to prevent
454
        # to put samples in a different submission
455
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
456
            self.create_submission()
457
458
        logger.info("Appending %s %s to %s" % (
459
            model._meta.verbose_name,
460
            model,
461
            self.usi_submission))
462
463
        # add object to submission data and updating counter
464
        USISubmissionData.objects.create(
465
            submission=self.usi_submission,
466
            content_object=model)
467
468
        self.counter += 1
469
470
471
class SplitSubmissionTask(TaskFailureMixin, MyTask):
472
    """Split submission data in chunks in order to submit data through
473
    multiple tasks/processes and with smaller submissions"""
474
475
    name = "Split submission data"
476
    description = """Split submission data in chunks"""
477
478
    def run(self, submission_id):
479
        """This function is called when delay is called"""
480
481
        logger.info("Starting %s for submission %s" % (
482
            self.name, submission_id))
483
484
        uid_submission = Submission.objects.get(
485
            pk=submission_id)
486
487
        # call an helper class to create database objects
488
        submission_data_helper = SplitSubmissionHelper(uid_submission)
489
490
        # iterate over animal and samples
491
        submission_data_helper.process_data()
492
493
        # prepare to launch chord tasks
494
        submissioncomplete = SubmissionCompleteTask()
495
496
        # assign kwargs to chord
497
        callback = submissioncomplete.s(uid_submission_id=submission_id)
498
499
        submit = SubmitTask()
500
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
501
502
        logger.debug("Preparing chord for %s tasks" % len(header))
503
504
        # call chord task. Chord will be called only after all tasks
505
        res = chord(header)(callback)
506
507
        logger.info(
508
            "Start submission chord process for %s with task %s" % (
509
                uid_submission,
510
                res.task_id))
511
512
        logger.info("%s completed" % self.name)
513
514
        # return a status
515
        return "success"
516
517
518
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
519
    """Update submission status after batch submission"""
520
521
    name = "Complete Submission Process"
522
    description = """Check submission status and update stuff"""
523
524
    def run(self, *args, **kwargs):
525
        """Fetch submission data and then update UID submission status"""
526
527
        submission_statuses = args[0]
528
529
        # submission_statuses will be an array like this
530
        # [("success", 1), ("success"), 2]
531
        usi_submission_ids = [status[1] for status in submission_statuses]
532
533
        # fetch data from database
534
        submission_qs = USISubmission.objects.filter(
535
            pk__in=usi_submission_ids)
536
537
        # get UID submission
538
        uid_submission = Submission.objects.get(
539
            pk=kwargs['uid_submission_id'])
540
541
        # annotate biosample submission by statuses
542
        statuses = {}
543
544
        for res in submission_qs.values('status').annotate(
545
                count=Count('status')):
546
            statuses[res['status']] = res['count']
547
548
        # check for errors in submission. Those are statuses setted by
549
        # SubmitTask
550
        if ERROR in statuses:
551
            # submission failed
552
            logger.info("Submission %s failed" % uid_submission)
553
554
            self.__update_message(uid_submission, submission_qs, ERROR)
555
556
            # send a mail to the user
557
            uid_submission.owner.email_user(
558
                "Error in biosample submission %s" % (
559
                    uid_submission.id),
560
                ("Something goes wrong with biosample submission. Please "
561
                 "report this to InjectTool team\n\n"
562
                 "%s" % uid_submission.message),
563
            )
564
565
        elif READY in statuses:
566
            # submission failed
567
            logger.info("Temporary error for %s" % uid_submission)
568
569
            self.__update_message(uid_submission, submission_qs, READY)
570
571
            # send a mail to the user
572
            uid_submission.owner.email_user(
573
                "Temporary error in biosample submission %s" % (
574
                    uid_submission.id),
575
                ("Something goes wrong with biosample submission. Please "
576
                 "try again\n\n"
577
                 "%s" % uid_submission.message),
578
            )
579
580
        else:
581
            # Update submission status: a completed but not yet finalized
582
            # submission
583
            logger.info("Submission %s success" % uid_submission)
584
585
            self.__update_message(uid_submission, submission_qs, SUBMITTED)
586
587
        # send async message
588
        send_message(uid_submission)
589
590
        return "success"
591
592
    def __update_message(self, uid_submission, submission_qs, status):
593
        """Read biosample.models.Submission message and set
594
        image_app.models.Submission message"""
595
596
        # get error messages for submission
597
        message = []
598
599
        for submission in submission_qs.filter(status=status):
600
            message.append(submission.message)
601
602
        uid_submission.status = status
603
        uid_submission.message = "\n".join(set(message))
604
        uid_submission.save()
605
606
607
# register explicitly tasks
608
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
609
celery_app.tasks.register(SubmitTask)
610
celery_app.tasks.register(SplitSubmissionTask)
611
celery_app.tasks.register(SubmissionCompleteTask)
612