1
|
|
|
#!/usr/bin/env python3 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
""" |
4
|
|
|
Created on Tue Oct 2 16:07:58 2018 |
5
|
|
|
|
6
|
|
|
@author: Paolo Cozzi <[email protected]> |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
import os |
10
|
|
|
import redis |
11
|
|
|
import traceback |
12
|
|
|
|
13
|
|
|
from decouple import AutoConfig |
14
|
|
|
from celery.utils.log import get_task_logger |
15
|
|
|
from celery import chord |
16
|
|
|
|
17
|
|
|
import pyUSIrest.client |
18
|
|
|
|
19
|
|
|
from django.conf import settings |
20
|
|
|
from django.utils import timezone |
21
|
|
|
from django.contrib.contenttypes.models import ContentType |
22
|
|
|
|
23
|
|
|
from image.celery import app as celery_app, MyTask |
24
|
|
|
from image_app.models import Submission, Animal |
25
|
|
|
from common.constants import ( |
26
|
|
|
ERROR, READY, SUBMITTED, COMPLETED) |
27
|
|
|
from submissions.helpers import send_message |
28
|
|
|
|
29
|
|
|
from ..helpers import get_auth |
30
|
|
|
from ..models import ( |
31
|
|
|
Submission as USISubmission, SubmissionData as USISubmissionData) |
32
|
|
|
|
33
|
|
|
# Get an instance of a logger |
34
|
|
|
logger = get_task_logger(__name__) |
35
|
|
|
|
36
|
|
|
# define a decouple config object |
37
|
|
|
settings_dir = os.path.join(settings.BASE_DIR, 'image') |
38
|
|
|
config = AutoConfig(search_path=settings_dir) |
39
|
|
|
|
40
|
|
|
# a threshold of days to determine a very long task |
41
|
|
|
MAX_DAYS = 5 |
42
|
|
|
|
43
|
|
|
# how many sample for submission |
44
|
|
|
MAX_SAMPLES = 100 |
45
|
|
|
|
46
|
|
|
# When the status is in this list, I can't submit this sample, since |
47
|
|
|
# is already submitted by this submission or by a previous one |
48
|
|
|
# and I don't want to submit the same thing if is not necessary |
49
|
|
|
DONT_SUBMIT_STATUSES = [SUBMITTED, COMPLETED] |
50
|
|
|
|
51
|
|
|
|
52
|
|
|
class SubmissionData(object): |
53
|
|
|
""" |
54
|
|
|
An helper class for submission task, useful to pass parameters like |
55
|
|
|
submission data between tasks""" |
56
|
|
|
|
57
|
|
|
# define my class attributes |
58
|
|
|
def __init__(self, *args, **kwargs): |
59
|
|
|
|
60
|
|
|
# ok those are my default class attributes |
61
|
|
|
self.submission_id = None |
62
|
|
|
self.submission_obj = None |
63
|
|
|
self.token = None |
64
|
|
|
|
65
|
|
|
# here I will store samples already submitted |
66
|
|
|
self.submitted_samples = {} |
67
|
|
|
|
68
|
|
|
# here I will track a USI submission |
69
|
|
|
self.usi_submission = None |
70
|
|
|
self.usi_root = None |
71
|
|
|
|
72
|
|
|
if 'submission_id' in kwargs: |
73
|
|
|
self.submission_id = kwargs['submission_id'] |
74
|
|
|
|
75
|
|
|
elif len(args) >= 1 and type(args[0]) == int: |
76
|
|
|
self.submission_id = args[0] |
77
|
|
|
|
78
|
|
|
if self.submission_id: |
79
|
|
|
# get submission object |
80
|
|
|
self.submission_obj = Submission.objects.get( |
81
|
|
|
pk=self.submission_id) |
82
|
|
|
|
83
|
|
|
@property |
84
|
|
|
def owner(self): |
85
|
|
|
"""Recover owner from a submission object""" |
86
|
|
|
|
87
|
|
|
return self.submission_obj.owner |
88
|
|
|
|
89
|
|
|
@property |
90
|
|
|
def team_name(self): |
91
|
|
|
"""Recover team_name from a submission object""" |
92
|
|
|
|
93
|
|
|
return self.owner.biosample_account.team.name |
94
|
|
|
|
95
|
|
|
@property |
96
|
|
|
def biosample_submission_id(self): |
97
|
|
|
"""Get biosample submission id from database""" |
98
|
|
|
|
99
|
|
|
return self.submission_obj.biosample_submission_id |
100
|
|
|
|
101
|
|
|
|
102
|
|
|
class SubmitTask(SubmissionTaskMixin, MyTask): |
|
|
|
|
103
|
|
|
name = "Submit to Biosample" |
104
|
|
|
description = """Submit to Biosample using USI""" |
105
|
|
|
|
106
|
|
|
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation |
107
|
|
|
# A task is not instantiated for every request, but is registered in |
108
|
|
|
# the task registry as a global instance. This means that the __init__ |
109
|
|
|
# constructor will only be called once per process, and that the |
110
|
|
|
# task class is semantically closer to an Actor. if you have a task and |
111
|
|
|
# you route every request to the same process, then it will keep state |
112
|
|
|
# between requests. This can also be useful to cache resources, For |
113
|
|
|
# example, a base Task class that caches a database connection |
114
|
|
|
|
115
|
|
|
def run(self, submission_id): |
116
|
|
|
"""This function is called when delay is called""" |
117
|
|
|
|
118
|
|
|
# create a instance to store submissison data from a submission_id |
119
|
|
|
submission_data = SubmissionData(submission_id=submission_id) |
120
|
|
|
|
121
|
|
|
# call innner merthod and return results |
122
|
|
|
return self.submit(submission_data) |
123
|
|
|
|
124
|
|
|
# a function to submit data into biosample |
125
|
|
|
def submit(self, submission_data): |
126
|
|
|
logger.info("Starting submission for user %s" % ( |
127
|
|
|
submission_data.owner.biosample_account)) |
128
|
|
|
|
129
|
|
|
# read biosample token from redis database |
130
|
|
|
client = redis.StrictRedis( |
131
|
|
|
host=settings.REDIS_HOST, |
132
|
|
|
port=settings.REDIS_PORT, |
133
|
|
|
db=settings.REDIS_DB) |
134
|
|
|
|
135
|
|
|
# infere key from submission data |
136
|
|
|
key = "token:submission:{submission_id}:{user}".format( |
137
|
|
|
submission_id=submission_data.submission_id, |
138
|
|
|
user=submission_data.owner) |
139
|
|
|
|
140
|
|
|
# create a new auth object |
141
|
|
|
logger.debug("Reading token for '%s'" % submission_data.owner) |
142
|
|
|
|
143
|
|
|
# getting token from redis db and set submission data |
144
|
|
|
submission_data.token = client.get(key).decode("utf8") |
145
|
|
|
|
146
|
|
|
# call a method to submit data to biosample |
147
|
|
|
try: |
148
|
|
|
self.submit_biosample(submission_data) |
149
|
|
|
|
150
|
|
|
except ConnectionError as exc: |
151
|
|
|
logger.error("Error in biosample submission: %s" % exc) |
152
|
|
|
|
153
|
|
|
message = "Errors in EBI API endpoints. Please try again later" |
154
|
|
|
logger.error(message) |
155
|
|
|
|
156
|
|
|
# add message to submission. Change status to READY |
157
|
|
|
submission_data.submission_obj.status = READY |
158
|
|
|
submission_data.submission_obj.message = message |
159
|
|
|
submission_data.submission_obj.save() |
160
|
|
|
|
161
|
|
|
# send async message |
162
|
|
|
send_message(submission_data.submission_obj) |
163
|
|
|
|
164
|
|
|
# get exception info |
165
|
|
|
einfo = traceback.format_exc() |
166
|
|
|
|
167
|
|
|
# send a mail to the user with the stacktrace (einfo) |
168
|
|
|
submission_data.owner.email_user( |
169
|
|
|
"Error in biosample submission %s" % ( |
170
|
|
|
submission_data.submission_id), |
171
|
|
|
("Something goes wrong with biosample submission. Please " |
172
|
|
|
"report this to InjectTool team\n\n %s" % str(einfo)), |
173
|
|
|
) |
174
|
|
|
|
175
|
|
|
return "success" |
176
|
|
|
|
177
|
|
|
# TODO: should I rename this execption with a more informative name |
178
|
|
|
# when token expires during a submission? |
179
|
|
|
except RuntimeError as exc: |
180
|
|
|
logger.error("Error in biosample submission: %s" % exc) |
181
|
|
|
|
182
|
|
|
message = ( |
183
|
|
|
"Your token is expired: please submit again to resume " |
184
|
|
|
"submission") |
185
|
|
|
|
186
|
|
|
logger.error(message) |
187
|
|
|
|
188
|
|
|
# add message to submission. Change status to READY |
189
|
|
|
submission_data.submission_obj.status = READY |
190
|
|
|
submission_data.submission_obj.message = message |
191
|
|
|
submission_data.submission_obj.save() |
192
|
|
|
|
193
|
|
|
# send async message |
194
|
|
|
send_message(submission_data.submission_obj) |
195
|
|
|
|
196
|
|
|
# send a mail to the user with the stacktrace (einfo) |
197
|
|
|
submission_data.owner.email_user( |
198
|
|
|
"Error in biosample submission %s" % ( |
199
|
|
|
submission_data.submission_id), |
200
|
|
|
("Your token is expired during submission. Click on submit " |
201
|
|
|
"button to generate a new token and resume your submission"), |
202
|
|
|
) |
203
|
|
|
|
204
|
|
|
return "success" |
205
|
|
|
|
206
|
|
|
# retry a task under errors |
207
|
|
|
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying |
208
|
|
|
except Exception as exc: |
209
|
|
|
raise self.retry(exc=exc) |
210
|
|
|
|
211
|
|
|
logger.info("database updated and task finished") |
212
|
|
|
|
213
|
|
|
# return a status |
214
|
|
|
return "success" |
215
|
|
|
|
216
|
|
|
def submit_biosample(self, submission_data): |
217
|
|
|
# reading token in auth |
218
|
|
|
auth = get_auth(token=submission_data.token) |
219
|
|
|
|
220
|
|
|
logger.debug("getting biosample root") |
221
|
|
|
submission_data.usi_root = pyUSIrest.client.Root(auth=auth) |
222
|
|
|
|
223
|
|
|
# if I'm recovering a submission, get the same submission id |
224
|
|
|
if (submission_data.biosample_submission_id is not None and |
225
|
|
|
submission_data.biosample_submission_id != ''): |
226
|
|
|
|
227
|
|
|
usi_submission_name = self.__recover_submission(submission_data) |
228
|
|
|
|
229
|
|
|
else: |
230
|
|
|
# get a new USI submission |
231
|
|
|
usi_submission_name = self.__create_submission(submission_data) |
232
|
|
|
|
233
|
|
|
logger.info("Fetching data and add to submission %s" % ( |
234
|
|
|
usi_submission_name)) |
235
|
|
|
|
236
|
|
|
# HINT: what happen if a token expire while submitting? |
237
|
|
|
for animal in Animal.objects.filter( |
238
|
|
|
name__submission=submission_data.submission_obj): |
239
|
|
|
|
240
|
|
|
# add animal if not yet submitted, or patch it |
241
|
|
|
if animal.name.status not in DONT_SUBMIT_STATUSES: |
242
|
|
|
logger.info("Appending animal %s" % (animal)) |
243
|
|
|
|
244
|
|
|
# check if animal is already submitted, otherwise patch |
245
|
|
|
self.__create_or_update(animal, submission_data) |
246
|
|
|
|
247
|
|
|
else: |
248
|
|
|
# already submittes, so could be ignored |
249
|
|
|
logger.debug("Ignoring animal %s" % (animal)) |
250
|
|
|
|
251
|
|
|
# Add their specimen |
252
|
|
|
for sample in animal.sample_set.all(): |
253
|
|
|
# add sample if not yet submitted |
254
|
|
|
if sample.name.status not in DONT_SUBMIT_STATUSES: |
255
|
|
|
logger.info("Appending sample %s" % (sample)) |
256
|
|
|
|
257
|
|
|
# check if sample is already submitted, otherwise patch |
258
|
|
|
self.__create_or_update(sample, submission_data) |
259
|
|
|
|
260
|
|
|
else: |
261
|
|
|
# already submittes, so could be ignored |
262
|
|
|
logger.debug("Ignoring sample %s" % (sample)) |
263
|
|
|
|
264
|
|
|
logger.info("submission completed") |
265
|
|
|
|
266
|
|
|
# Update submission status: a completed but not yet finalized |
267
|
|
|
# submission |
268
|
|
|
submission_data.submission_obj.status = SUBMITTED |
269
|
|
|
submission_data.submission_obj.message = ( |
270
|
|
|
"Waiting for biosample validation") |
271
|
|
|
submission_data.submission_obj.save() |
272
|
|
|
|
273
|
|
|
# send async message |
274
|
|
|
send_message(submission_data.submission_obj) |
275
|
|
|
|
276
|
|
|
# helper function to create or update a biosample record |
277
|
|
|
def __create_or_update(self, sample_obj, submission_data): |
278
|
|
|
"""Create or update a sample (or a animal) in USI""" |
279
|
|
|
|
280
|
|
|
# alias is used to reference the same objects |
281
|
|
|
alias = sample_obj.biosample_alias |
282
|
|
|
|
283
|
|
|
# check in my submitted samples |
284
|
|
|
if alias in submission_data.submitted_samples: |
285
|
|
|
# patch sample |
286
|
|
|
logger.info("Patching %s" % (alias)) |
287
|
|
|
|
288
|
|
|
# get usi sample |
289
|
|
|
sample = submission_data.submitted_samples[alias] |
290
|
|
|
sample.patch(sample_obj.to_biosample()) |
291
|
|
|
|
292
|
|
|
else: |
293
|
|
|
submission_data.usi_submission.create_sample( |
294
|
|
|
sample_obj.to_biosample()) |
295
|
|
|
|
296
|
|
|
# update sample status |
297
|
|
|
sample_obj.name.status = SUBMITTED |
298
|
|
|
sample_obj.name.last_submitted = timezone.now() |
299
|
|
|
sample_obj.name.save() |
300
|
|
|
|
301
|
|
|
def __recover_submission(self, submission_data): |
302
|
|
|
logger.info("Recovering submission %s for team %s" % ( |
303
|
|
|
submission_data.biosample_submission_id, |
304
|
|
|
submission_data.team_name)) |
305
|
|
|
|
306
|
|
|
# get the same submission object |
307
|
|
|
usi_submission_name = submission_data.biosample_submission_id |
308
|
|
|
|
309
|
|
|
submission_data.usi_submission = \ |
310
|
|
|
submission_data.usi_root.get_submission_by_name( |
311
|
|
|
submission_name=usi_submission_name) |
312
|
|
|
|
313
|
|
|
# check that a submission is still editable |
314
|
|
|
if submission_data.usi_submission.status != "Draft": |
315
|
|
|
logger.warning( |
316
|
|
|
"Cannot recover submission '%s': current status is '%s'" % ( |
317
|
|
|
usi_submission_name, |
318
|
|
|
submission_data.usi_submission.status)) |
319
|
|
|
|
320
|
|
|
# I can't modify this submission so: |
321
|
|
|
return self.__create_submission(submission_data) |
322
|
|
|
|
323
|
|
|
# read already submitted samples |
324
|
|
|
logger.debug("Getting info on samples...") |
325
|
|
|
samples = submission_data.usi_submission.get_samples() |
326
|
|
|
logger.debug("Got %s samples" % (len(samples))) |
327
|
|
|
|
328
|
|
|
for sample in samples: |
329
|
|
|
submission_data.submitted_samples[sample.alias] = sample |
330
|
|
|
|
331
|
|
|
# return usi biosample id |
332
|
|
|
return usi_submission_name |
333
|
|
|
|
334
|
|
|
def __create_submission(self, submission_data): |
335
|
|
|
# getting team |
336
|
|
|
logger.debug("getting team '%s'" % (submission_data.team_name)) |
337
|
|
|
team = submission_data.usi_root.get_team_by_name( |
338
|
|
|
submission_data.team_name) |
339
|
|
|
|
340
|
|
|
# create a new submission |
341
|
|
|
logger.info("Creating a new submission for '%s'" % (team.name)) |
342
|
|
|
submission_data.usi_submission = team.create_submission() |
343
|
|
|
|
344
|
|
|
# track submission_id in table |
345
|
|
|
usi_submission_name = submission_data.usi_submission.name |
346
|
|
|
|
347
|
|
|
submission_data.submission_obj.biosample_submission_id = \ |
348
|
|
|
usi_submission_name |
349
|
|
|
submission_data.submission_obj.save() |
350
|
|
|
|
351
|
|
|
# return usi biosample id |
352
|
|
|
return usi_submission_name |
353
|
|
|
|
354
|
|
|
|
355
|
|
|
# register explicitly tasks |
356
|
|
|
# https://github.com/celery/celery/issues/3744#issuecomment-271366923 |
357
|
|
|
celery_app.tasks.register(SubmitTask) |
358
|
|
|
|