Completed
Pull Request — master (#44)
by Paolo
05:54
created

biosample.tasks.submission   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 485
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 42
eloc 236
dl 0
loc 485
rs 9.0399
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A SplitSubmissionTask.create_submission() 0 13 1
A SplitSubmissionTask.add_to_submission_data() 0 29 4
A SubmissionHelper.read_token() 0 28 1
A SubmissionHelper.usi_submission_name() 0 5 1
A SubmissionHelper.__init__() 0 20 1
A SubmissionHelper.mark_submission() 0 4 1
A SubmissionHelper.recover_submission() 0 27 3
A SubmissionHelper.mark_fail() 0 4 1
A SplitSubmissionTask.__init__() 0 5 1
A SubmitTask.run() 0 40 4
A SubmissionHelper.create_submission() 0 20 1
A SubmissionHelper.create_or_update_sample() 0 24 2
A SubmissionHelper.team_name() 0 5 1
A SubmissionHelper.read_samples() 0 13 2
A SubmissionHelper.start_submission() 0 11 2
B SplitSubmissionTask.run() 0 52 5
A SubmissionHelper.mark_success() 0 4 1
A SplitSubmissionTask.model_in_submission() 0 21 2
A SubmissionHelper.add_samples() 0 19 3
A TaskFailureMixin.on_failure() 0 20 1
A SubmissionHelper.owner() 0 6 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A submissioncomplete() 0 8 2

How to fix   Complexity   

Complexity

Complex classes like biosample.tasks.submission often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.utils import timezone
19
20
from common.constants import (
21
    ERROR, READY, SUBMITTED, COMPLETED)
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
# TODO: promote this to become a common mixin for tasks
38
class TaskFailureMixin():
39
    """Mixin candidate to become the common behaviour of task failure which
40
    mark submission with ERROR status and send message to owner"""
41
42
    # Ovverride default on failure method
43
    def on_failure(self, exc, task_id, args, kwargs, einfo):
44
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
45
46
        # get submission object
47
        submission_obj = Submission.objects.get(pk=args[0])
48
49
        submission_obj.status = ERROR
50
        submission_obj.message = (
51
            "Error in biosample submission: %s" % str(exc))
52
        submission_obj.save()
53
54
        # send async message
55
        send_message(submission_obj)
56
57
        # send a mail to the user with the stacktrace (einfo)
58
        submission_obj.owner.email_user(
59
            "Error in biosample submission %s" % (
60
                submission_obj.id),
61
            ("Something goes wrong with biosample submission. Please report "
62
             "this to InjectTool team\n\n %s" % str(einfo)),
63
        )
64
65
        # TODO: submit mail to admin
66
67
68
# HINT: move into helper module?
69
class SubmissionHelper():
70
    """
71
    An helper class for submission task, used to deal with pyUSIrest
72
    """
73
74
    # define my class attributes
75
    def __init__(self, submission_id):
76
77
        # ok those are my default class attributes
78
        self.submission_id = submission_id
79
        self.submission_obj = None
80
        self.token = None
81
82
        # here are pyUSIrest object
83
        self.auth = None
84
        self.root = None
85
86
        # here I will track a USI submission
87
        self.usi_submission = None
88
89
        # here I will store samples already submitted
90
        self.submitted_samples = {}
91
92
        # get a submission object
93
        self.submission_obj = USISubmission.objects.get(
94
            pk=self.submission_id)
95
96
        # HINT: should I check my status?
97
98
    @property
99
    def owner(self):
100
        """Recover owner from a submission object related with a UID
101
        Submission"""
102
103
        return self.submission_obj.uid_submission.owner
104
105
    @property
106
    def team_name(self):
107
        """Recover team_name from a submission object"""
108
109
        return self.owner.biosample_account.team.name
110
111
    @property
112
    def usi_submission_name(self):
113
        """Get biosample submission id from database"""
114
115
        return self.submission_obj.usi_submission_name
116
117
    @usi_submission_name.setter
118
    def usi_submission_name(self, name):
119
        """Get biosample submission id from database"""
120
121
        self.submission_obj.usi_submission_name = name
122
        self.submission_obj.save()
123
124
    def read_token(self):
125
        """Read token from REDIS database and set root attribute with a
126
        pyUSIrest.client.Root instance"""
127
128
        # read biosample token from redis database
129
        client = redis.StrictRedis(
130
            host=settings.REDIS_HOST,
131
            port=settings.REDIS_PORT,
132
            db=settings.REDIS_DB)
133
134
        # infere key from submission data
135
        key = "token:submission:{submission_id}:{user}".format(
136
            submission_id=self.submission_obj.uid_submission.id,
137
            user=self.owner)
138
139
        # create a new auth object
140
        logger.debug("Reading token for '%s'" % self.owner)
141
142
        # getting token from redis db and set submission data
143
        self.token = client.get(key).decode("utf8")
144
145
        # get a root object with auth
146
        self.auth = get_auth(token=self.token)
147
148
        logger.debug("getting biosample root")
149
        self.root = pyUSIrest.client.Root(auth=self.auth)
150
151
        return self.token
152
153
    def start_submission(self):
154
        """
155
        Get a USI submission document. Recover submission if possible,
156
        create a new one if not defined. If recovered submission is
157
        closed, raise an error
158
        """
159
160
        if not self.recover_submission():
161
            self.create_submission()
162
163
        return self.usi_submission
164
165
    def recover_submission(self):
166
        """Try to recover a USI submission document or raise Exception. If
167
        not defined, return false"""
168
169
        # If no usi_submission_name, return False
170
        if not self.usi_submission_name:
171
            return False
172
173
        logger.info("Recovering submission %s for team %s" % (
174
            self.usi_submission_name,
175
            self.team_name))
176
177
        # get the same submission object
178
        self.usi_submission = self.root.get_submission_by_name(
179
            submission_name=self.usi_submission_name)
180
181
        # check that a submission is still editable
182
        if self.usi_submission.status != "Draft":
183
            raise Exception(
184
                "Cannot recover submission '%s': current status is '%s'" % (
185
                   self.usi_submission_name,
186
                   self.usi_submission.status))
187
188
        # read already submitted samples
189
        self.read_samples()
190
191
        return True
192
193
    def create_submission(self):
194
        """Create a nre USI submission object"""
195
196
        # need to create an empty submission
197
        # Set self.usi_submission attribute with created document
198
        # Set a usi_submission_name attribute and returns it
199
200
        # getting team
201
        logger.debug("getting team '%s'" % (self.team_name))
202
        team = self.root.get_team_by_name(self.team_name)
203
204
        # create a new submission
205
        logger.info("Creating a new submission for '%s'" % (team.name))
206
        self.usi_submission = team.create_submission()
207
208
        # track submission_id in table
209
        self.usi_submission_name = self.usi_submission.name
210
211
        # return USI submission document
212
        return self.usi_submission
213
214
    def read_samples(self):
215
        """Read sample in a USI submission document and set submitted_samples
216
        attribute"""
217
218
        # read already submitted samples
219
        logger.debug("Getting info on samples...")
220
        samples = self.usi_submission.get_samples()
221
        logger.debug("Got %s samples" % (len(samples)))
222
223
        for sample in samples:
224
            self.submitted_samples[sample.alias] = sample
225
226
        return self.submitted_samples
227
228
    def create_or_update_sample(self, model):
229
        """Add or patch a sample into USI submission document. Can be
230
        animal or sample"""
231
232
        # alias is used to reference the same objects
233
        alias = model.biosample_alias
234
235
        # check in my submitted samples
236
        if alias in self.submitted_samples:
237
            # patch sample
238
            logger.info("Patching %s" % (alias))
239
240
            # get usi sample
241
            sample = self.submitted_samples[alias]
242
            sample.patch(model.to_biosample())
243
244
        else:
245
            self.usi_submission.create_sample(
246
                model.to_biosample())
247
248
        # update sample status
249
        model.name.status = SUBMITTED
250
        model.name.last_submitted = timezone.now()
251
        model.name.save()
252
253
    def add_samples(self):
254
        """Iterate over sample data (animal/sample) and call
255
        create_or_update_sample"""
256
257
        # iterate over sample data
258
        for submission_data in self.submission_obj.submission_data.all():
259
            # get model for simplicity
260
            model = submission_data.content_object
261
262
            if model.name.status == READY:
263
                logger.debug("Adding %s %s to submission %s" % (
264
                    model._meta.verbose_name,
265
                    model,
266
                    self.usi_submission_name))
267
                self.create_or_update_sample(model)
268
269
            else:
270
                logger.debug("Ignoring %s %s" % (
271
                    model._meta.verbose_name, model))
272
273
    def mark_submission(self, status, message):
274
        self.submission_obj.status = status
275
        self.submission_obj.message = message
276
        self.submission_obj.save()
277
278
    def mark_fail(self, message):
279
        """Set a ERROR status for biosample.models.Submission and a message"""
280
281
        self.mark_submission(ERROR, message)
282
283
    def mark_success(self, message="Submitted to biosample"):
284
        """Set a ERROR status for biosample.models.Submission and a message"""
285
286
        self.mark_submission(SUBMITTED, message)
287
288
289
class SubmitTask(MyTask):
290
    name = "Submit to Biosample"
291
    description = """Submit to Biosample using USI"""
292
293
    def run(self, usi_submission_id):
294
        # get a submission helper object
295
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
296
297
        # No retries, we expect always success
298
        try:
299
            submission_helper.read_token()
300
            submission_helper.start_submission()
301
            submission_helper.add_samples()
302
            submission_helper.mark_success()
303
304
        except ConnectionError as exc:
305
            logger.error("Error in biosample submission: %s" % exc)
306
            message = "Errors in EBI API endpoints. Please try again later"
307
            logger.error(message)
308
309
            # track message in submission object
310
            submission_helper.mark_submission(READY, message)
311
312
        # TODO: should I rename this execption with a more informative name
313
        # when token expires during a submission?
314
        except RuntimeError as exc:
315
            logger.error("Error in biosample submission: %s" % exc)
316
            message = (
317
                "Your token is expired: please submit again to resume "
318
                "submission")
319
            logger.error(message)
320
321
            # track message in submission object
322
            submission_helper.mark_submission(READY, message)
323
324
        except Exception as exc:
325
            logger.error("Unmanaged error: %s" % exc)
326
            # get exception info
327
            einfo = traceback.format_exc()
328
329
            # track traceback in message
330
            submission_helper.mark_fail(einfo)
331
332
        return "success", usi_submission_id
333
334
335
class SplitSubmissionTask(TaskFailureMixin, MyTask):
336
    """Split a submission data in chunks in order to submit data through
337
    multiple tasks/processes and with smaller submissions"""
338
339
    name = "Split submission data"
340
    description = """Split submission data in chunks"""
341
342
    # HINT: move into helper module?
343
    class Helper():
344
        def __init__(self, uid_submission):
345
            self.counter = 0
346
            self.uid_submission = uid_submission
347
            self.usi_submission = None
348
            self.created_ids = []
349
350
        def create_submission(self):
351
            """Create a new database object and reset counter"""
352
353
            self.usi_submission = USISubmission.objects.create(
354
                uid_submission=self.uid_submission,
355
                status=READY)
356
357
            # track object pks
358
            self.usi_submission.refresh_from_db()
359
            self.created_ids.append(self.usi_submission.id)
360
361
            # reset couter object
362
            self.counter = 0
363
364
        def model_in_submission(self, model):
365
            """check if model is already in an opened submission"""
366
367
            # get content type
368
            ct = ContentType.objects.get_for_model(model)
369
370
            # define a queryset
371
            data_qs = USISubmissionData.objects.filter(
372
                content_type=ct,
373
                object_id=model.id)
374
375
            # exclude opened submission
376
            data_qs.exclude(submission__status__in=[COMPLETED])
377
378
            if data_qs.count() > 0:
379
                # TODO: mark this batch to be called
380
                return True
381
382
            else:
383
                # no sample in data. I could append model into submission
384
                return False
385
386
        def add_to_submission_data(self, model):
387
            # check if model is already in an opened submission
388
            if self.model_in_submission(model):
389
                logger.info("Ignoring %s %s: already in a submission" % (
390
                    model._meta.verbose_name,
391
                    model))
392
                return
393
394
            # Create a new submission if necessary
395
            if self.usi_submission is None:
396
                self.create_submission()
397
398
            # TODO: every time I split data in chunks I need to call the
399
            # submission task. I should call this in a chord process, in
400
            # order to value if submission was completed or not
401
            if self.counter >= MAX_SAMPLES:
402
                self.create_submission()
403
404
            logger.info("Appending %s %s to %s" % (
405
                model._meta.verbose_name,
406
                model,
407
                self.usi_submission))
408
409
            # add object to submission data and updating counter
410
            USISubmissionData.objects.create(
411
                submission=self.usi_submission,
412
                content_object=model)
413
414
            self.counter += 1
415
416
    def run(self, submission_id):
417
        """This function is called when delay is called"""
418
419
        logger.info("Starting %s for submission %s" % (
420
            self.name, submission_id))
421
422
        uid_submission = Submission.objects.get(
423
            pk=submission_id)
424
425
        # call an helper class to create database objects
426
        submission_data_helper = self.Helper(uid_submission)
427
428
        for animal in Animal.objects.filter(
429
                name__submission=uid_submission):
430
431
            # add animal if not yet submitted, or patch it
432
            if animal.name.status == READY:
433
                submission_data_helper.add_to_submission_data(animal)
434
435
            else:
436
                # already submittes, so could be ignored
437
                logger.debug("Ignoring animal %s" % (animal))
438
439
            # Add their specimen
440
            for sample in animal.sample_set.all():
441
                # add sample if not yet submitted
442
                if sample.name.status == READY:
443
                    submission_data_helper.add_to_submission_data(sample)
444
445
                else:
446
                    # already submittes, so could be ignored
447
                    logger.debug("Ignoring sample %s" % (sample))
448
449
            # end of cicle for animal.
450
451
        # prepare to launch chord tasks
452
        callback = submissioncomplete.s()
453
        submit = SubmitTask()
454
        header = [submit.s(pk) for pk in submission_data_helper.created_ids]
455
456
        # call chord task. Chord will be called only after all tasks
457
        res = chord(header)(callback)
458
459
        logger.info(
460
            "Start submission chord process for %s with task %s" % (
461
                uid_submission,
462
                res.task_id))
463
464
        logger.info("%s completed" % self.name)
465
466
        # return a status
467
        return "success"
468
469
470
# TODO: costomize and fix this task placeholder
471
@celery_app.task
472
def submissioncomplete(submission_statuses):
473
    """A placeholder for complete submission object"""
474
475
    for submission_status in submission_statuses:
476
        logger.debug(submission_status)
477
478
    return "success"
479
480
481
# register explicitly tasks
482
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
483
celery_app.tasks.register(SubmitTask)
484
celery_app.tasks.register(SplitSubmissionTask)
485