Completed
Pull Request — master (#44)
by Paolo
05:54
created

biosample.tasks.submit   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 358
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 26
eloc 172
dl 0
loc 358
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A SubmissionData.team_name() 0 5 1
A SubmissionData.owner() 0 5 1
A SubmitTask.run() 0 8 1
A SubmitTask.__create_or_update() 0 23 2
B SubmitTask.submit() 0 90 4
A SubmitTask.__create_submission() 0 19 1
A SubmissionData.__init__() 0 24 5
A SubmissionData.biosample_submission_id() 0 5 1
A SubmitTask.__recover_submission() 0 32 3
B SubmitTask.submit_biosample() 0 59 7
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Oct  2 16:07:58 2018
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import redis
11
import traceback
12
13
from decouple import AutoConfig
14
from celery.utils.log import get_task_logger
15
from celery import chord
16
17
import pyUSIrest.client
18
19
from django.conf import settings
20
from django.utils import timezone
21
from django.contrib.contenttypes.models import ContentType
22
23
from image.celery import app as celery_app, MyTask
24
from image_app.models import Submission, Animal
25
from common.constants import (
26
    ERROR, READY, SUBMITTED, COMPLETED)
27
from submissions.helpers import send_message
28
29
from ..helpers import get_auth
30
from ..models import (
31
    Submission as USISubmission, SubmissionData as USISubmissionData)
32
33
# Get an instance of a logger
34
logger = get_task_logger(__name__)
35
36
# define a decouple config object
37
settings_dir = os.path.join(settings.BASE_DIR, 'image')
38
config = AutoConfig(search_path=settings_dir)
39
40
# a threshold of days to determine a very long task
41
MAX_DAYS = 5
42
43
# how many sample for submission
44
MAX_SAMPLES = 100
45
46
# When the status is in this list, I can't submit this sample, since
47
# is already submitted by this submission or by a previous one
48
# and I don't want to submit the same thing if is not necessary
49
DONT_SUBMIT_STATUSES = [SUBMITTED, COMPLETED]
50
51
52
class SubmissionData(object):
53
    """
54
    An helper class for submission task, useful to pass parameters like
55
    submission data between tasks"""
56
57
    # define my class attributes
58
    def __init__(self, *args, **kwargs):
59
60
        # ok those are my default class attributes
61
        self.submission_id = None
62
        self.submission_obj = None
63
        self.token = None
64
65
        # here I will store samples already submitted
66
        self.submitted_samples = {}
67
68
        # here I will track a USI submission
69
        self.usi_submission = None
70
        self.usi_root = None
71
72
        if 'submission_id' in kwargs:
73
            self.submission_id = kwargs['submission_id']
74
75
        elif len(args) >= 1 and type(args[0]) == int:
76
            self.submission_id = args[0]
77
78
        if self.submission_id:
79
            # get submission object
80
            self.submission_obj = Submission.objects.get(
81
                pk=self.submission_id)
82
83
    @property
84
    def owner(self):
85
        """Recover owner from a submission object"""
86
87
        return self.submission_obj.owner
88
89
    @property
90
    def team_name(self):
91
        """Recover team_name from a submission object"""
92
93
        return self.owner.biosample_account.team.name
94
95
    @property
96
    def biosample_submission_id(self):
97
        """Get biosample submission id from database"""
98
99
        return self.submission_obj.biosample_submission_id
100
101
102
class SubmitTask(SubmissionTaskMixin, MyTask):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SubmissionTaskMixin does not seem to be defined.
Loading history...
103
    name = "Submit to Biosample"
104
    description = """Submit to Biosample using USI"""
105
106
    # http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation
107
    # A task is not instantiated for every request, but is registered in
108
    # the task registry as a global instance. This means that the __init__
109
    # constructor will only be called once per process, and that the
110
    # task class is semantically closer to an Actor. if you have a task and
111
    # you route every request to the same process, then it will keep state
112
    # between requests. This can also be useful to cache resources, For
113
    # example, a base Task class that caches a database connection
114
115
    def run(self, submission_id):
116
        """This function is called when delay is called"""
117
118
        # create a instance to store submissison data from a submission_id
119
        submission_data = SubmissionData(submission_id=submission_id)
120
121
        # call innner merthod and return results
122
        return self.submit(submission_data)
123
124
    # a function to submit data into biosample
125
    def submit(self, submission_data):
126
        logger.info("Starting submission for user %s" % (
127
            submission_data.owner.biosample_account))
128
129
        # read biosample token from redis database
130
        client = redis.StrictRedis(
131
            host=settings.REDIS_HOST,
132
            port=settings.REDIS_PORT,
133
            db=settings.REDIS_DB)
134
135
        # infere key from submission data
136
        key = "token:submission:{submission_id}:{user}".format(
137
            submission_id=submission_data.submission_id,
138
            user=submission_data.owner)
139
140
        # create a new auth object
141
        logger.debug("Reading token for '%s'" % submission_data.owner)
142
143
        # getting token from redis db and set submission data
144
        submission_data.token = client.get(key).decode("utf8")
145
146
        # call a method to submit data to biosample
147
        try:
148
            self.submit_biosample(submission_data)
149
150
        except ConnectionError as exc:
151
            logger.error("Error in biosample submission: %s" % exc)
152
153
            message = "Errors in EBI API endpoints. Please try again later"
154
            logger.error(message)
155
156
            # add message to submission. Change status to READY
157
            submission_data.submission_obj.status = READY
158
            submission_data.submission_obj.message = message
159
            submission_data.submission_obj.save()
160
161
            # send async message
162
            send_message(submission_data.submission_obj)
163
164
            # get exception info
165
            einfo = traceback.format_exc()
166
167
            # send a mail to the user with the stacktrace (einfo)
168
            submission_data.owner.email_user(
169
                "Error in biosample submission %s" % (
170
                    submission_data.submission_id),
171
                ("Something goes wrong with biosample submission. Please "
172
                 "report this to InjectTool team\n\n %s" % str(einfo)),
173
                )
174
175
            return "success"
176
177
        # TODO: should I rename this execption with a more informative name
178
        # when token expires during a submission?
179
        except RuntimeError as exc:
180
            logger.error("Error in biosample submission: %s" % exc)
181
182
            message = (
183
                "Your token is expired: please submit again to resume "
184
                "submission")
185
186
            logger.error(message)
187
188
            # add message to submission. Change status to READY
189
            submission_data.submission_obj.status = READY
190
            submission_data.submission_obj.message = message
191
            submission_data.submission_obj.save()
192
193
            # send async message
194
            send_message(submission_data.submission_obj)
195
196
            # send a mail to the user with the stacktrace (einfo)
197
            submission_data.owner.email_user(
198
                "Error in biosample submission %s" % (
199
                    submission_data.submission_id),
200
                ("Your token is expired during submission. Click on submit "
201
                 "button to generate a new token and resume your submission"),
202
                )
203
204
            return "success"
205
206
        # retry a task under errors
207
        # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
208
        except Exception as exc:
209
            raise self.retry(exc=exc)
210
211
        logger.info("database updated and task finished")
212
213
        # return a status
214
        return "success"
215
216
    def submit_biosample(self, submission_data):
217
        # reading token in auth
218
        auth = get_auth(token=submission_data.token)
219
220
        logger.debug("getting biosample root")
221
        submission_data.usi_root = pyUSIrest.client.Root(auth=auth)
222
223
        # if I'm recovering a submission, get the same submission id
224
        if (submission_data.biosample_submission_id is not None and
225
                submission_data.biosample_submission_id != ''):
226
227
            usi_submission_name = self.__recover_submission(submission_data)
228
229
        else:
230
            # get a new USI submission
231
            usi_submission_name = self.__create_submission(submission_data)
232
233
        logger.info("Fetching data and add to submission %s" % (
234
            usi_submission_name))
235
236
        # HINT: what happen if a token expire while submitting?
237
        for animal in Animal.objects.filter(
238
                name__submission=submission_data.submission_obj):
239
240
            # add animal if not yet submitted, or patch it
241
            if animal.name.status not in DONT_SUBMIT_STATUSES:
242
                logger.info("Appending animal %s" % (animal))
243
244
                # check if animal is already submitted, otherwise patch
245
                self.__create_or_update(animal, submission_data)
246
247
            else:
248
                # already submittes, so could be ignored
249
                logger.debug("Ignoring animal %s" % (animal))
250
251
            # Add their specimen
252
            for sample in animal.sample_set.all():
253
                # add sample if not yet submitted
254
                if sample.name.status not in DONT_SUBMIT_STATUSES:
255
                    logger.info("Appending sample %s" % (sample))
256
257
                    # check if sample is already submitted, otherwise patch
258
                    self.__create_or_update(sample, submission_data)
259
260
                else:
261
                    # already submittes, so could be ignored
262
                    logger.debug("Ignoring sample %s" % (sample))
263
264
        logger.info("submission completed")
265
266
        # Update submission status: a completed but not yet finalized
267
        # submission
268
        submission_data.submission_obj.status = SUBMITTED
269
        submission_data.submission_obj.message = (
270
            "Waiting for biosample validation")
271
        submission_data.submission_obj.save()
272
273
        # send async message
274
        send_message(submission_data.submission_obj)
275
276
    # helper function to create or update a biosample record
277
    def __create_or_update(self, sample_obj, submission_data):
278
        """Create or update a sample (or a animal) in USI"""
279
280
        # alias is used to reference the same objects
281
        alias = sample_obj.biosample_alias
282
283
        # check in my submitted samples
284
        if alias in submission_data.submitted_samples:
285
            # patch sample
286
            logger.info("Patching %s" % (alias))
287
288
            # get usi sample
289
            sample = submission_data.submitted_samples[alias]
290
            sample.patch(sample_obj.to_biosample())
291
292
        else:
293
            submission_data.usi_submission.create_sample(
294
                sample_obj.to_biosample())
295
296
        # update sample status
297
        sample_obj.name.status = SUBMITTED
298
        sample_obj.name.last_submitted = timezone.now()
299
        sample_obj.name.save()
300
301
    def __recover_submission(self, submission_data):
302
        logger.info("Recovering submission %s for team %s" % (
303
            submission_data.biosample_submission_id,
304
            submission_data.team_name))
305
306
        # get the same submission object
307
        usi_submission_name = submission_data.biosample_submission_id
308
309
        submission_data.usi_submission = \
310
            submission_data.usi_root.get_submission_by_name(
311
                submission_name=usi_submission_name)
312
313
        # check that a submission is still editable
314
        if submission_data.usi_submission.status != "Draft":
315
            logger.warning(
316
                "Cannot recover submission '%s': current status is '%s'" % (
317
                    usi_submission_name,
318
                    submission_data.usi_submission.status))
319
320
            # I can't modify this submission so:
321
            return self.__create_submission(submission_data)
322
323
        # read already submitted samples
324
        logger.debug("Getting info on samples...")
325
        samples = submission_data.usi_submission.get_samples()
326
        logger.debug("Got %s samples" % (len(samples)))
327
328
        for sample in samples:
329
            submission_data.submitted_samples[sample.alias] = sample
330
331
        # return usi biosample id
332
        return usi_submission_name
333
334
    def __create_submission(self, submission_data):
335
        # getting team
336
        logger.debug("getting team '%s'" % (submission_data.team_name))
337
        team = submission_data.usi_root.get_team_by_name(
338
            submission_data.team_name)
339
340
        # create a new submission
341
        logger.info("Creating a new submission for '%s'" % (team.name))
342
        submission_data.usi_submission = team.create_submission()
343
344
        # track submission_id in table
345
        usi_submission_name = submission_data.usi_submission.name
346
347
        submission_data.submission_obj.biosample_submission_id = \
348
            usi_submission_name
349
        submission_data.submission_obj.save()
350
351
        # return usi biosample id
352
        return usi_submission_name
353
354
355
# register explicitly tasks
356
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
357
celery_app.tasks.register(SubmitTask)
358