Passed
Pull Request — master (#44)
by Paolo
05:38
created

SubmissionHelper.create_or_update_sample()   A

Complexity

Conditions 2

Size

Total Lines 28
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 28
rs 9.85
c 0
b 0
f 0
cc 2
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Jul 18 14:14:06 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
import pyUSIrest.client
12
13
from celery import chord
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.contrib.contenttypes.models import ContentType
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from common.constants import ERROR, READY, SUBMITTED, COMPLETED
22
from image.celery import app as celery_app, MyTask
23
from image_app.models import Submission, Animal
24
from submissions.helpers import send_message
25
26
from ..helpers import get_auth
27
from ..models import (
28
    Submission as USISubmission, SubmissionData as USISubmissionData)
29
30
# Get an instance of a logger
31
logger = get_task_logger(__name__)
32
33
# how many sample for submission
34
MAX_SAMPLES = 100
35
36
37
class SubmissionError(Exception):
38
    """Exception call for Error with submissions"""
39
40
    pass
41
42
43
# TODO: promote this to become a common mixin for tasks
44
class TaskFailureMixin():
45
    """Mixin candidate to become the common behaviour of task failure which
46
    mark submission with ERROR status and send message to owner"""
47
48
    # Ovverride default on failure method
49
    def on_failure(self, exc, task_id, args, kwargs, einfo):
50
        """Mark submission with error status, send async message and a
51
        mail to the owner or such submission"""
52
53
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
54
55
        # get submission object
56
        submission_obj = Submission.objects.get(pk=args[0])
57
58
        submission_obj.status = ERROR
59
        submission_obj.message = (
60
            "Error in biosample submission: %s" % str(exc))
61
        submission_obj.save()
62
63
        # send async message
64
        send_message(submission_obj)
65
66
        # send a mail to the user with the stacktrace (einfo)
67
        submission_obj.owner.email_user(
68
            "Error in biosample submission %s" % (
69
                submission_obj.id),
70
            ("Something goes wrong with biosample submission. Please report "
71
             "this to InjectTool team\n\n %s" % str(einfo)),
72
        )
73
74
        # TODO: submit mail to admin
75
76
77
# HINT: move into helper module?
78
class SubmissionHelper():
79
    """
80
    An helper class for submission task, used to deal with pyUSIrest
81
    """
82
83
    # define my class attributes
84
    def __init__(self, submission_id):
85
86
        # ok those are my default class attributes
87
        self.submission_id = submission_id
88
        self.submission_obj = None
89
        self.token = None
90
91
        # here are pyUSIrest object
92
        self.auth = None
93
        self.root = None
94
95
        # here I will track a USI submission
96
        self.usi_submission = None
97
98
        # here I will store samples already submitted
99
        self.submitted_samples = {}
100
101
        # get a submission object
102
        self.submission_obj = USISubmission.objects.get(
103
            pk=self.submission_id)
104
105
        # HINT: should I check my status?
106
107
    @property
108
    def owner(self):
109
        """Recover owner from a submission object related with a UID
110
        Submission
111
112
        Returns:
113
            :py:attr:`Submission.owner`: a django
114
            :py:class:`django.contrib.auth.models.User` object
115
        """
116
117
        return self.submission_obj.uid_submission.owner
118
119
    @property
120
    def team_name(self):
121
        """Recover team_name from a submission object
122
123
        Returns:
124
            str: the team name"""
125
126
        return self.owner.biosample_account.team.name
127
128
    @property
129
    def usi_submission_name(self):
130
        """Get/set biosample submission id from database
131
132
        Returns:
133
            str: the biosample USI submission identifier"""
134
135
        return self.submission_obj.usi_submission_name
136
137
    @usi_submission_name.setter
138
    def usi_submission_name(self, name):
139
140
        self.submission_obj.usi_submission_name = name
141
        self.submission_obj.save()
142
143
    def read_token(self):
144
        """Read token from REDIS database and set root attribute with a
145
        pyUSIrest.client.Root instance
146
147
        Returns:
148
            str: the read token"""
149
150
        # read biosample token from redis database
151
        client = redis.StrictRedis(
152
            host=settings.REDIS_HOST,
153
            port=settings.REDIS_PORT,
154
            db=settings.REDIS_DB)
155
156
        # infere key from submission data
157
        key = "token:submission:{submission_id}:{user}".format(
158
            submission_id=self.submission_obj.uid_submission.id,
159
            user=self.owner)
160
161
        # create a new auth object
162
        logger.debug("Reading token for '%s'" % self.owner)
163
164
        # getting token from redis db and set submission data
165
        self.token = client.get(key).decode("utf8")
166
167
        # get a root object with auth
168
        self.auth = get_auth(token=self.token)
169
170
        logger.debug("getting biosample root")
171
        self.root = pyUSIrest.client.Root(auth=self.auth)
172
173
        return self.token
174
175
    def start_submission(self):
176
        """
177
        Get a USI submission document. Recover submission if possible,
178
        create a new one if not defined. If recovered submission is
179
        closed, raise an error
180
        """
181
182
        if not self.recover_submission():
183
            self.create_submission()
184
185
        return self.usi_submission
186
187
    def recover_submission(self):
188
        """Try to recover a USI submission document or raise Exception. If
189
        not defined, return false"""
190
191
        # If no usi_submission_name, return False
192
        if not self.usi_submission_name:
193
            return False
194
195
        logger.info("Recovering submission %s for team %s" % (
196
            self.usi_submission_name,
197
            self.team_name))
198
199
        # get the same submission object
200
        self.usi_submission = self.root.get_submission_by_name(
201
            submission_name=self.usi_submission_name)
202
203
        # check that a submission is still editable
204
        if self.usi_submission.status != "Draft":
205
            raise SubmissionError(
206
                "Cannot recover submission '%s': current status is '%s'" % (
207
                   self.usi_submission_name,
208
                   self.usi_submission.status))
209
210
        # read already submitted samples
211
        self.read_samples()
212
213
        return True
214
215
    def create_submission(self):
216
        """Create a new USI submission object
217
218
        Returns:
219
            :py:class:`pyUSIrest.client.Submission` a pyUSIrest submission
220
            object
221
        """
222
223
        # getting team
224
        logger.debug("getting team '%s'" % (self.team_name))
225
        team = self.root.get_team_by_name(self.team_name)
226
227
        # create a new submission
228
        logger.info("Creating a new submission for '%s'" % (team.name))
229
        self.usi_submission = team.create_submission()
230
231
        # track submission_id in table
232
        self.usi_submission_name = self.usi_submission.name
233
234
        # return USI submission document
235
        return self.usi_submission
236
237
    def read_samples(self):
238
        """Read sample in a USI submission document and set submitted_samples
239
        attribute"""
240
241
        # read already submitted samples
242
        logger.debug("Getting info on samples...")
243
        samples = self.usi_submission.get_samples()
244
        logger.debug("Got %s samples" % (len(samples)))
245
246
        for sample in samples:
247
            self.submitted_samples[sample.alias] = sample
248
249
        return self.submitted_samples
250
251
    def create_or_update_sample(self, model):
252
        """Add or patch a sample into USI submission document. Can be
253
        animal or sample
254
255
        Args:
256
            model (:py:class:`image_app.mixins.BioSampleMixin`): An animal or
257
                sample object"""
258
259
        # alias is used to reference the same objects
260
        alias = model.biosample_alias
261
262
        # check in my submitted samples
263
        if alias in self.submitted_samples:
264
            # patch sample
265
            logger.info("Patching %s" % (alias))
266
267
            # get usi sample
268
            sample = self.submitted_samples[alias]
269
            sample.patch(model.to_biosample())
270
271
        else:
272
            self.usi_submission.create_sample(
273
                model.to_biosample())
274
275
        # update sample status
276
        model.name.status = SUBMITTED
277
        model.name.last_submitted = timezone.now()
278
        model.name.save()
279
280
    def add_samples(self):
281
        """Iterate over sample data (animal/sample) and call
282
        create_or_update_sample"""
283
284
        # iterate over sample data
285
        for submission_data in self.submission_obj.submission_data\
286
                .order_by('id'):
287
            # get model for simplicity
288
            model = submission_data.content_object
289
290
            if model.name.status == READY:
291
                logger.debug("Adding %s %s to submission %s" % (
292
                    model._meta.verbose_name,
293
                    model,
294
                    self.usi_submission_name))
295
                self.create_or_update_sample(model)
296
297
            else:
298
                logger.debug("Ignoring %s %s" % (
299
                    model._meta.verbose_name, model))
300
301
    def mark_submission(self, status, message):
302
        self.submission_obj.status = status
303
        self.submission_obj.message = message
304
        self.submission_obj.save()
305
306
    def mark_fail(self, message):
307
        """Set a :py:const:`ERROR <common.constants.ERROR>` status for
308
        :py:class:`biosample.models.Submission` and a message"""
309
310
        self.mark_submission(ERROR, message)
311
312
    def mark_success(self, message="Waiting for biosample validation"):
313
        """Set a :py:const:`SUBMITTED <common.constants.SUBMITTED>`
314
        :py:class:`biosample.models.Submission` and a message"""
315
316
        self.mark_submission(SUBMITTED, message)
317
318
319
class SubmitTask(MyTask):
320
    name = "Submit to Biosample"
321
    description = """Submit to Biosample using USI"""
322
323
    def run(self, usi_submission_id):
324
        """Run task. Instantiate a SubmissionHelper with the provided
325
        :py:class:`biosample.models.Submission` id. Read token from database,
326
        start or recover a submission, add samples to it and then mark a
327
        status for it
328
        """
329
330
        # get a submission helper object
331
        submission_helper = SubmissionHelper(submission_id=usi_submission_id)
332
333
        # No retries, we expect always success
334
        try:
335
            submission_helper.read_token()
336
            submission_helper.start_submission()
337
            submission_helper.add_samples()
338
            submission_helper.mark_success()
339
340
        except ConnectionError as exc:
341
            logger.error("Error in biosample submission: %s" % exc)
342
            message = "Errors in EBI API endpoints. Please try again later"
343
            logger.error(message)
344
345
            # track message in submission object
346
            submission_helper.mark_submission(READY, message)
347
348
        # TODO: should I rename this exception with a more informative name
349
        # when token expires during a submission?
350
        except RuntimeError as exc:
351
            logger.error("Error in biosample submission: %s" % exc)
352
            message = (
353
                "Your token is expired: please submit again to resume "
354
                "submission")
355
            logger.error(message)
356
357
            # track message in submission object
358
            submission_helper.mark_submission(READY, message)
359
360
        except Exception as exc:
361
            logger.error("Unmanaged error: %s" % exc)
362
            # get exception info
363
            einfo = traceback.format_exc()
364
365
            # track traceback in message
366
            submission_helper.mark_fail(einfo)
367
368
        return "success", usi_submission_id
369
370
371
# HINT: move into helper module?
372
class SplitSubmissionHelper():
373
    """
374
    helper class to split py:class`image_app.models.Submission` data in
375
    bacthes limited in sizes"""
376
377
    def __init__(self, uid_submission):
378
        self.counter = 0
379
        self.uid_submission = uid_submission
380
        self.usi_submission = None
381
        self.submission_ids = []
382
383
    def process_data(self):
384
        """Add animal and its samples to a submission"""
385
386
        for animal in Animal.objects.filter(
387
                name__submission=self.uid_submission):
388
389
            # add animal if not yet submitted, or patch it
390
            if animal.name.status == READY:
391
                self.add_to_submission_data(animal)
392
393
            else:
394
                # already submitted, so could be ignored
395
                logger.debug("Ignoring animal %s" % (animal))
396
397
            # Add their specimen
398
            for sample in animal.sample_set.all():
399
                # add sample if not yet submitted
400
                if sample.name.status == READY:
401
                    self.add_to_submission_data(sample)
402
403
                else:
404
                    # already submittes, so could be ignored
405
                    logger.debug("Ignoring sample %s" % (sample))
406
407
            # end of cicle for animal.
408
409
    def create_submission(self):
410
        """
411
        Create a new :py:class:`biosample.models.Submission` object and
412
        set sample counter to 0"""
413
414
        self.usi_submission = USISubmission.objects.create(
415
            uid_submission=self.uid_submission,
416
            status=READY)
417
418
        # track object pks
419
        self.usi_submission.refresh_from_db()
420
        self.submission_ids.append(self.usi_submission.id)
421
422
        logger.debug("Created submission %s" % (self.usi_submission))
423
424
        # reset couter object
425
        self.counter = 0
426
427
    def model_in_submission(self, model):
428
        """
429
        Check if :py:class:`image_app.mixins.BioSampleMixin` is already in an
430
        opened submission"""
431
432
        logger.debug("Searching %s %s in submissions" % (
433
            model._meta.verbose_name,
434
            model))
435
436
        # get content type
437
        ct = ContentType.objects.get_for_model(model)
438
439
        # define a queryset
440
        data_qs = USISubmissionData.objects.filter(
441
            content_type=ct,
442
            object_id=model.id)
443
444
        # exclude opened submission
445
        data_qs = data_qs.exclude(submission__status__in=[COMPLETED])
446
447
        if data_qs.count() == 1:
448
            usi_submission = data_qs.first().submission
449
450
            logger.debug("Found %s %s in %s" % (
451
                model._meta.verbose_name,
452
                model,
453
                usi_submission))
454
455
            # mark this batch to be called like it was created
456
            if usi_submission.id not in self.submission_ids:
457
                self.submission_ids.append(usi_submission.id)
458
459
                logger.debug(
460
                    "Reset status for submission %s" % (usi_submission))
461
                usi_submission.status = READY
462
                usi_submission.save()
463
464
            return True
465
466
        elif data_qs.count() >= 1:
467
            raise SubmissionError(
468
                "More than one submission opened for %s %s" % (
469
                    model._meta.verbose_name,
470
                    model))
471
472
        else:
473
            # no sample in data. I could append model into submission
474
            logger.debug("No %s %s in submission data" % (
475
                model._meta.verbose_name,
476
                model))
477
            return False
478
479
    def add_to_submission_data(self, model):
480
        """Add a :py:class:`image_app.mixins.BioSampleMixin` to a
481
        :py:class:`biosample.models.Submission` object, or create a new
482
        one if there are more samples than required"""
483
484
        # get model type (animal or sample)
485
        model_type = model._meta.verbose_name
486
487
        # check if model is already in an opened submission
488
        if self.model_in_submission(model):
489
            logger.info("Ignoring %s %s: already in a submission" % (
490
                model_type,
491
                model))
492
            return
493
494
        # Create a new submission if necessary
495
        if self.usi_submission is None:
496
            self.create_submission()
497
498
        # every time I split data in chunks I need to call the
499
        # submission task. Do it only on animals, to prevent
500
        # to put samples in a different submission
501
        if model_type == 'animal' and self.counter >= MAX_SAMPLES:
502
            self.create_submission()
503
504
        logger.info("Appending %s %s to %s" % (
505
            model._meta.verbose_name,
506
            model,
507
            self.usi_submission))
508
509
        # add object to submission data and updating counter
510
        USISubmissionData.objects.create(
511
            submission=self.usi_submission,
512
            content_object=model)
513
514
        self.counter += 1
515
516
517
class SplitSubmissionTask(TaskFailureMixin, MyTask):
518
    """Split submission data in chunks in order to submit data through
519
    multiple tasks/processes and with smaller submissions"""
520
521
    name = "Split submission data"
522
    description = """Split submission data in chunks"""
523
524
    def run(self, submission_id):
525
        """Call :py:class:`SplitSubmissionHelper` to split
526
        :py:class:`image_app.models.Submission` data.
527
        Call :py:class:`SubmitTask` for each
528
        batch of data and then call :py:class:`SubmissionCompleteTask` after
529
        all data were submitted"""
530
531
        logger.info("Starting %s for submission %s" % (
532
            self.name, submission_id))
533
534
        uid_submission = Submission.objects.get(
535
            pk=submission_id)
536
537
        # call an helper class to create database objects
538
        submission_data_helper = SplitSubmissionHelper(uid_submission)
539
540
        # iterate over animal and samples
541
        submission_data_helper.process_data()
542
543
        # prepare to launch chord tasks
544
        submissioncomplete = SubmissionCompleteTask()
545
546
        # assign kwargs to chord
547
        callback = submissioncomplete.s(uid_submission_id=submission_id)
548
549
        submit = SubmitTask()
550
        header = [submit.s(pk) for pk in submission_data_helper.submission_ids]
551
552
        logger.debug("Preparing chord for %s tasks" % len(header))
553
554
        # call chord task. Chord will be called only after all tasks
555
        res = chord(header)(callback)
556
557
        logger.info(
558
            "Start submission chord process for %s with task %s" % (
559
                uid_submission,
560
                res.task_id))
561
562
        logger.info("%s completed" % self.name)
563
564
        # return a status
565
        return "success"
566
567
568
class SubmissionCompleteTask(TaskFailureMixin, MyTask):
569
    """Update submission status after batch submission"""
570
571
    name = "Complete Submission Process"
572
    description = """Check submission status and update stuff"""
573
574
    def run(self, *args, **kwargs):
575
        """Fetch submission data and then update
576
        :py:class:`image_app.models.Submission` status"""
577
578
        submission_statuses = args[0]
579
580
        # submission_statuses will be an array like this
581
        # [("success", 1), ("success"), 2]
582
        usi_submission_ids = [status[1] for status in submission_statuses]
583
584
        # fetch data from database
585
        submission_qs = USISubmission.objects.filter(
586
            pk__in=usi_submission_ids)
587
588
        # get UID submission
589
        uid_submission = Submission.objects.get(
590
            pk=kwargs['uid_submission_id'])
591
592
        # annotate biosample submission by statuses
593
        statuses = {}
594
595
        for res in submission_qs.values('status').annotate(
596
                count=Count('status')):
597
            statuses[res['status']] = res['count']
598
599
        # check for errors in submission. Those are statuses setted by
600
        # SubmitTask
601
        if ERROR in statuses:
602
            # submission failed
603
            logger.info("Submission %s failed" % uid_submission)
604
605
            self.__update_message(uid_submission, submission_qs, ERROR)
606
607
            # send a mail to the user
608
            uid_submission.owner.email_user(
609
                "Error in biosample submission %s" % (
610
                    uid_submission.id),
611
                ("Something goes wrong with biosample submission. Please "
612
                 "report this to InjectTool team\n\n"
613
                 "%s" % uid_submission.message),
614
            )
615
616
        elif READY in statuses:
617
            # submission failed
618
            logger.info("Temporary error for %s" % uid_submission)
619
620
            self.__update_message(uid_submission, submission_qs, READY)
621
622
            # send a mail to the user
623
            uid_submission.owner.email_user(
624
                "Temporary error in biosample submission %s" % (
625
                    uid_submission.id),
626
                ("Something goes wrong with biosample submission. Please "
627
                 "try again\n\n"
628
                 "%s" % uid_submission.message),
629
            )
630
631
        else:
632
            # Update submission status: a completed but not yet finalized
633
            # submission
634
            logger.info("Submission %s success" % uid_submission)
635
636
            self.__update_message(uid_submission, submission_qs, SUBMITTED)
637
638
        # send async message
639
        send_message(uid_submission)
640
641
        return "success"
642
643
    def __update_message(self, uid_submission, submission_qs, status):
644
        """Read biosample.models.Submission message and set
645
        image_app.models.Submission message"""
646
647
        # get error messages for submission
648
        message = []
649
650
        for submission in submission_qs.filter(status=status):
651
            message.append(submission.message)
652
653
        uid_submission.status = status
654
        uid_submission.message = "\n".join(set(message))
655
        uid_submission.save()
656
657
658
# register explicitly tasks
659
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
660
celery_app.tasks.register(SubmitTask)
661
celery_app.tasks.register(SplitSubmissionTask)
662
celery_app.tasks.register(SubmissionCompleteTask)
663