Completed
Pull Request — master (#44)
by Paolo
05:54
created

biosample.tasks.submission   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 563
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 45
eloc 282
dl 0
loc 563
rs 8.8
c 0
b 0
f 0

23 Methods

Rating   Name   Duplication   Size   Complexity  
A SubmissionHelper.read_token() 0 28 1
A SubmissionHelper.usi_submission_name() 0 5 1
A SubmissionHelper.__init__() 0 20 1
A SplitSubmissionHelper.__init__() 0 5 1
A SubmissionHelper.mark_submission() 0 4 1
A SubmissionHelper.recover_submission() 0 27 3
A SubmissionHelper.mark_fail() 0 4 1
A SubmitTask.run() 0 40 4
A SubmissionHelper.create_submission() 0 16 1
A SplitSubmissionHelper.model_in_submission() 0 49 4
A SplitSubmissionHelper.create_submission() 0 13 1
A SubmissionHelper.create_or_update_sample() 0 24 2
A SubmissionHelper.team_name() 0 5 1
A SplitSubmissionHelper.add_to_submission_data() 0 28 4
A SubmissionHelper.read_samples() 0 13 2
A SubmissionHelper.start_submission() 0 11 2
A SplitSubmissionTask.run() 0 38 1
A SubmissionHelper.mark_success() 0 4 1
A SubmissionHelper.add_samples() 0 19 3
A TaskFailureMixin.on_failure() 0 20 1
A SubmissionCompleteTask.run() 0 40 2
A SplitSubmissionHelper.process_data() 0 21 5
A SubmissionHelper.owner() 0 6 1

How to fix   Complexity   

Complexity

Complex classes like biosample.tasks.submission often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.utils import timezone
19
20
from common.constants import (
21
    ERROR, READY, SUBMITTED, COMPLETED)
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
51
52
        # get submission object
53
        submission_obj = Submission.objects.get(pk=args[0])
54
55
        submission_obj.status = ERROR
56
        submission_obj.message = (
57
            "Error in biosample submission: %s" % str(exc))
58
        submission_obj.save()
59
60
        # send async message
61
        send_message(submission_obj)
62
63
        # send a mail to the user with the stacktrace (einfo)
64
        submission_obj.owner.email_user(
65
            "Error in biosample submission %s" % (
66
                submission_obj.id),
67
            ("Something goes wrong with biosample submission. Please report "
68
             "this to InjectTool team\n\n %s" % str(einfo)),
69
        )
70
71
        # TODO: submit mail to admin
72
73
74
# HINT: move into helper module?
75
class SubmissionHelper():
76
    """
77
    An helper class for submission task, used to deal with pyUSIrest
78
    """
79
80
    # define my class attributes
81
    def __init__(self, submission_id):
82
83
        # ok those are my default class attributes
84
        self.submission_id = submission_id
85
        self.submission_obj = None
86
        self.token = None
87
88
        # here are pyUSIrest object
89
        self.auth = None
90
        self.root = None
91
92
        # here I will track a USI submission
93
        self.usi_submission = None
94
95
        # here I will store samples already submitted
96
        self.submitted_samples = {}
97
98
        # get a submission object
99
        self.submission_obj = USISubmission.objects.get(
100
            pk=self.submission_id)
101
102
        # HINT: should I check my status?
103
104
    @property
105
    def owner(self):
106
        """Recover owner from a submission object related with a UID
107
        Submission"""
108
109
        return self.submission_obj.uid_submission.owner
110
111
    @property
112
    def team_name(self):
113
        """Recover team_name from a submission object"""
114
115
        return self.owner.biosample_account.team.name
116
117
    @property
118
    def usi_submission_name(self):
119
        """Get biosample submission id from database"""
120
121
        return self.submission_obj.usi_submission_name
122
123
    @usi_submission_name.setter
124
    def usi_submission_name(self, name):
125
        """Get biosample submission id from database"""
126
127
        self.submission_obj.usi_submission_name = name
128
        self.submission_obj.save()
129
130
    def read_token(self):
131
        """Read token from REDIS database and set root attribute with a
132
        pyUSIrest.client.Root instance"""
133
134
        # read biosample token from redis database
135
        client = redis.StrictRedis(
136
            host=settings.REDIS_HOST,
137
            port=settings.REDIS_PORT,
138
            db=settings.REDIS_DB)
139
140
        # infere key from submission data
141
        key = "token:submission:{submission_id}:{user}".format(
142
            submission_id=self.submission_obj.uid_submission.id,
143
            user=self.owner)
144
145
        # create a new auth object
146
        logger.debug("Reading token for '%s'" % self.owner)
147
148
        # getting token from redis db and set submission data
149
        self.token = client.get(key).decode("utf8")
150
151
        # get a root object with auth
152
        self.auth = get_auth(token=self.token)
153
154
        logger.debug("getting biosample root")
155
        self.root = pyUSIrest.client.Root(auth=self.auth)
156
157
        return self.token
158
159
    def start_submission(self):
160
        """
161
        Get a USI submission document. Recover submission if possible,
162
        create a new one if not defined. If recovered submission is
163
        closed, raise an error
164
        """
165
166
        if not self.recover_submission():
167
            self.create_submission()
168
169
        return self.usi_submission
170
171
    def recover_submission(self):
172
        """Try to recover a USI submission document or raise Exception. If
173
        not defined, return false"""
174
175
        # If no usi_submission_name, return False
176
        if not self.usi_submission_name:
177
            return False
178
179
        logger.info("Recovering submission %s for team %s" % (
180
            self.usi_submission_name,
181
            self.team_name))
182
183
        # get the same submission object
184
        self.usi_submission = self.root.get_submission_by_name(
185
            submission_name=self.usi_submission_name)
186
187
        # check that a submission is still editable
188
        if self.usi_submission.status != "Draft":
189
            raise SubmissionError(
190
                "Cannot recover submission '%s': current status is '%s'" % (
191
                   self.usi_submission_name,
192
                   self.usi_submission.status))
193
194
        # read already submitted samples
195
        self.read_samples()
196
197
        return True
198
199
    def create_submission(self):
200
        """Create a nre USI submission object"""
201
202
        # getting team
203
        logger.debug("getting team '%s'" % (self.team_name))
204
        team = self.root.get_team_by_name(self.team_name)
205
206
        # create a new submission
207
        logger.info("Creating a new submission for '%s'" % (team.name))
208
        self.usi_submission = team.create_submission()
209
210
        # track submission_id in table
211
        self.usi_submission_name = self.usi_submission.name
212
213
        # return USI submission document
214
        return self.usi_submission
215
216
    def read_samples(self):
217
        """Read sample in a USI submission document and set submitted_samples
218
        attribute"""
219
220
        # read already submitted samples
221
        logger.debug("Getting info on samples...")
222
        samples = self.usi_submission.get_samples()
223
        logger.debug("Got %s samples" % (len(samples)))
224
225
        for sample in samples:
226
            self.submitted_samples[sample.alias] = sample
227
228
        return self.submitted_samples
229
230
    def create_or_update_sample(self, model):
231
        """Add or patch a sample into USI submission document. Can be
232
        animal or sample"""
233
234
        # alias is used to reference the same objects
235
        alias = model.biosample_alias
236
237
        # check in my submitted samples
238
        if alias in self.submitted_samples:
239
            # patch sample
240
            logger.info("Patching %s" % (alias))
241
242
            # get usi sample
243
            sample = self.submitted_samples[alias]
244
            sample.patch(model.to_biosample())
245
246
        else:
247
            self.usi_submission.create_sample(
248
                model.to_biosample())
249
250
        # update sample status
251
        model.name.status = SUBMITTED
252
        model.name.last_submitted = timezone.now()
253
        model.name.save()
254
255
    def add_samples(self):
256
        """Iterate over sample data (animal/sample) and call
257
        create_or_update_sample"""
258
259
        # iterate over sample data
260
        for submission_data in self.submission_obj.submission_data.all():
261
            # get model for simplicity
262
            model = submission_data.content_object
263
264
            if model.name.status == READY:
265
                logger.debug("Adding %s %s to submission %s" % (
266
                    model._meta.verbose_name,
267
                    model,
268
                    self.usi_submission_name))
269
                self.create_or_update_sample(model)
270
271
            else:
272
                logger.debug("Ignoring %s %s" % (
273
                    model._meta.verbose_name, model))
274
275
    def mark_submission(self, status, message):
276
        self.submission_obj.status = status
277
        self.submission_obj.message = message
278
        self.submission_obj.save()
279
280
    def mark_fail(self, message):
281
        """Set a ERROR status for biosample.models.Submission and a message"""
282
283
        self.mark_submission(ERROR, message)
284
285
    def mark_success(self, message="Submitted to biosample"):
286
        """Set a ERROR status for biosample.models.Submission and a message"""
287
288
        self.mark_submission(SUBMITTED, message)
289
290
291
class SubmitTask(MyTask):
292
    name = "Submit to Biosample"
293
    description = """Submit to Biosample using USI"""
294
295
    def run(self, usi_submission_id):
296
        # get a submission helper object
297
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
298
299
        # No retries, we expect always success
300
        try:
301
            submission_helper.read_token()
302
            submission_helper.start_submission()
303
            submission_helper.add_samples()
304
            submission_helper.mark_success()
305
306
        except ConnectionError as exc:
307
            logger.error("Error in biosample submission: %s" % exc)
308
            message = "Errors in EBI API endpoints. Please try again later"
309
            logger.error(message)
310
311
            # track message in submission object
312
            submission_helper.mark_submission(READY, message)
313
314
        # TODO: should I rename this exception with a more informative name
315
        # when token expires during a submission?
316
        except RuntimeError as exc:
317
            logger.error("Error in biosample submission: %s" % exc)
318
            message = (
319
                "Your token is expired: please submit again to resume "
320
                "submission")
321
            logger.error(message)
322
323
            # track message in submission object
324
            submission_helper.mark_submission(READY, message)
325
326
        except Exception as exc:
327
            logger.error("Unmanaged error: %s" % exc)
328
            # get exception info
329
            einfo = traceback.format_exc()
330
331
            # track traceback in message
332
            submission_helper.mark_fail(einfo)
333
334
        return "success", usi_submission_id
335
336
337
# HINT: move into helper module?
338
class SplitSubmissionHelper():
339
    def __init__(self, uid_submission):
340
        self.counter = 0
341
        self.uid_submission = uid_submission
342
        self.usi_submission = None
343
        self.submission_ids = []
344
345
    def process_data(self):
346
        for animal in Animal.objects.filter(
347
                name__submission=self.uid_submission):
348
349
            # add animal if not yet submitted, or patch it
350
            if animal.name.status == READY:
351
                self.add_to_submission_data(animal)
352
353
            else:
354
                # already submitted, so could be ignored
355
                logger.debug("Ignoring animal %s" % (animal))
356
357
            # Add their specimen
358
            for sample in animal.sample_set.all():
359
                # add sample if not yet submitted
360
                if sample.name.status == READY:
361
                    self.add_to_submission_data(sample)
362
363
                else:
364
                    # already submittes, so could be ignored
365
                    logger.debug("Ignoring sample %s" % (sample))
366
367
            # end of cicle for animal.
368
369
    def create_submission(self):
370
        """Create a new database object and reset counter"""
371
372
        self.usi_submission = USISubmission.objects.create(
373
            uid_submission=self.uid_submission,
374
            status=READY)
375
376
        # track object pks
377
        self.usi_submission.refresh_from_db()
378
        self.submission_ids.append(self.usi_submission.id)
379
380
        # reset couter object
381
        self.counter = 0
382
383
    def model_in_submission(self, model):
384
        """check if model is already in an opened submission"""
385
386
        logger.debug("Searching %s %s in submissions" % (
387
            model._meta.verbose_name,
388
            model))
389
390
        # get content type
391
        ct = ContentType.objects.get_for_model(model)
392
393
        # define a queryset
394
        data_qs = USISubmissionData.objects.filter(
395
            content_type=ct,
396
            object_id=model.id)
397
398
        # exclude opened submission
399
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
400
401
        if data_qs.count() == 1:
402
            usi_submission = data_qs.first().submission
403
404
            logger.debug("Found %s %s in %s" % (
405
                model._meta.verbose_name,
406
                model,
407
                usi_submission))
408
409
            # mark this batch to be called like it was created
410
            if usi_submission.id not in self.submission_ids:
411
                self.submission_ids.append(usi_submission.id)
412
413
                logger.debug(
414
                    "Reset status for submission %s" % (usi_submission))
415
                usi_submission.status = READY
416
                usi_submission.save()
417
418
            return True
419
420
        elif data_qs.count() >= 1:
421
            raise SubmissionError(
422
                "More than one submission opened for %s %s" % (
423
                    model._meta.verbose_name,
424
                    model))
425
426
        else:
427
            # no sample in data. I could append model into submission
428
            logger.debug("No %s %s in submission data" % (
429
                model._meta.verbose_name,
430
                model))
431
            return False
432
433
    def add_to_submission_data(self, model):
434
        # check if model is already in an opened submission
435
        if self.model_in_submission(model):
436
            logger.info("Ignoring %s %s: already in a submission" % (
437
                model._meta.verbose_name,
438
                model))
439
            return
440
441
        # Create a new submission if necessary
442
        if self.usi_submission is None:
443
            self.create_submission()
444
445
        # every time I split data in chunks I need to call the
446
        # submission task
447
        if self.counter >= MAX_SAMPLES:
448
            self.create_submission()
449
450
        logger.info("Appending %s %s to %s" % (
451
            model._meta.verbose_name,
452
            model,
453
            self.usi_submission))
454
455
        # add object to submission data and updating counter
456
        USISubmissionData.objects.create(
457
            submission=self.usi_submission,
458
            content_object=model)
459
460
        self.counter += 1
461
462
463
class SplitSubmissionTask(TaskFailureMixin, MyTask):
464
    """Split submission data in chunks in order to submit data through
465
    multiple tasks/processes and with smaller submissions"""
466
467
    name = "Split submission data"
468
    description = """Split submission data in chunks"""
469
470
    def run(self, submission_id):
471
        """This function is called when delay is called"""
472
473
        logger.info("Starting %s for submission %s" % (
474
            self.name, submission_id))
475
476
        uid_submission = Submission.objects.get(
477
            pk=submission_id)
478
479
        # call an helper class to create database objects
480
        submission_data_helper = SplitSubmissionHelper(uid_submission)
481
482
        # iterate over animal and samples
483
        submission_data_helper.process_data()
484
485
        # prepare to launch chord tasks
486
        submissioncomplete = SubmissionCompleteTask()
487
488
        # assign kwargs to chord
489
        callback = submissioncomplete.s(uid_submission_id=submission_id)
490
491
        submit = SubmitTask()
492
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
493
494
        logger.debug("Preparing chord for %s tasks" % len(header))
495
496
        # call chord task. Chord will be called only after all tasks
497
        res = chord(header)(callback)
498
499
        logger.info(
500
            "Start submission chord process for %s with task %s" % (
501
                uid_submission,
502
                res.task_id))
503
504
        logger.info("%s completed" % self.name)
505
506
        # return a status
507
        return "success"
508
509
510
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
511
    """Update submission status after batch submission"""
512
513
    name = "Complete Submission"
514
    description = """Check submission status and update stuff"""
515
516
    def run(self, *args, **kwargs):
517
        """Fetch submission data and then update UID submission status"""
518
519
        submission_statuses = args[0]
520
521
        logger.debug(args)
522
        logger.debug(kwargs)
523
        logger.debug(submission_statuses)
524
525
        # submission_statuses will be an array like this
526
        # [("success", 1), ("success"), 2]
527
        usi_submission_ids = [status[1] for status in submission_statuses]
528
529
        # fetch data from database
530
        submission_qs = USISubmission.objects.filter(
531
            pk__in=usi_submission_ids)
532
533
        # get UID submission
534
        uid_submission = Submission.objects.get(
535
            pk=kwargs['uid_submission_id'])
536
537
        # check for errors in submission
538
        if submission_qs.filter(status=ERROR).count() > 0:
539
            # submission failed
540
            logger.info("Submission %s failed" % uid_submission)
541
542
        else:
543
            # Update submission status: a completed but not yet finalized
544
            # submission
545
            logger.info("Submission %s success" % uid_submission)
546
547
            uid_submission.status = SUBMITTED
548
            uid_submission.message = (
549
                "Waiting for biosample validation")
550
            uid_submission.save()
551
552
        # send async message
553
        send_message(uid_submission)
554
555
        return "success"
556
557
558
# register explicitly tasks
559
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
560
celery_app.tasks.register(SubmitTask)
561
celery_app.tasks.register(SplitSubmissionTask)
562
celery_app.tasks.register(SubmissionCompleteTask)
563