1
|
|
|
#!/usr/bin/env python3 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
""" |
4
|
|
|
Created on Thu Oct 25 11:27:52 2018 |
5
|
|
|
|
6
|
|
|
@author: Paolo Cozzi <[email protected]> |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
import time |
10
|
|
|
|
11
|
|
|
from celery import group |
12
|
|
|
from celery.result import allow_join_result |
13
|
|
|
from celery.utils.log import get_task_logger |
14
|
|
|
|
15
|
|
|
from common.tasks import redis_lock |
16
|
|
|
from image.celery import app as celery_app, MyTask |
17
|
|
|
from image_app.models import ( |
18
|
|
|
DictCountry, DictBreed, DictSpecie, DictUberon, DictDevelStage, |
19
|
|
|
DictPhysioStage) |
20
|
|
|
|
21
|
|
|
from .helpers import ( |
22
|
|
|
annotate_country, annotate_breed, annotate_specie, annotate_uberon, |
23
|
|
|
annotate_dictdevelstage, annotate_dictphysiostage) |
24
|
|
|
|
25
|
|
|
# Get an instance of a logger |
26
|
|
|
logger = get_task_logger(__name__) |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
class AnnotateTaskMixin(): |
30
|
|
|
name = None |
31
|
|
|
descripttion = None |
32
|
|
|
model = None |
33
|
|
|
annotate_func = None |
34
|
|
|
|
35
|
|
|
def run(self): |
36
|
|
|
"""This function is called when delay is called""" |
37
|
|
|
|
38
|
|
|
logger.debug("Starting %s" % self.name.lower()) |
39
|
|
|
|
40
|
|
|
# get all countries without a term |
41
|
|
|
for term in self.model.objects.filter(term__isnull=True): |
42
|
|
|
self.annotate_func(term) |
43
|
|
|
|
44
|
|
|
logger.debug("%s completed" % self.name.lower()) |
45
|
|
|
|
46
|
|
|
return "success" |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
class AnnotateCountries(AnnotateTaskMixin, MyTask): |
50
|
|
|
name = "Annotate Countries" |
51
|
|
|
description = """Annotate countries with ontologies using Zooma tools""" |
52
|
|
|
model = DictCountry |
53
|
|
|
annotate_func = staticmethod(annotate_country) |
54
|
|
|
|
55
|
|
|
|
56
|
|
|
class AnnotateBreeds(MyTask): |
57
|
|
|
name = "Annotate Breeds" |
58
|
|
|
description = """Annotate breeds with ontologies using Zooma tools""" |
59
|
|
|
|
60
|
|
|
def run(self): |
61
|
|
|
"""This function is called when delay is called""" |
62
|
|
|
|
63
|
|
|
logger.debug("Starting %s" % self.name.lower()) |
64
|
|
|
|
65
|
|
|
# get all breeds without a term |
66
|
|
|
for breed in DictBreed.objects.filter(mapped_breed_term__isnull=True): |
67
|
|
|
annotate_breed(breed) |
68
|
|
|
|
69
|
|
|
logger.debug("%s completed" % self.name.lower()) |
70
|
|
|
|
71
|
|
|
return "success" |
72
|
|
|
|
73
|
|
|
|
74
|
|
|
class AnnotateSpecies(AnnotateTaskMixin, MyTask): |
75
|
|
|
name = "Annotate Species" |
76
|
|
|
description = """Annotate species with ontologies using Zooma tools""" |
77
|
|
|
model = DictSpecie |
78
|
|
|
annotate_func = staticmethod(annotate_specie) |
79
|
|
|
|
80
|
|
|
|
81
|
|
|
class AnnotateUberon(AnnotateTaskMixin, MyTask): |
82
|
|
|
name = "Annotate Uberon" |
83
|
|
|
description = "Annotate organism parts with ontologies using Zooma tools" |
84
|
|
|
model = DictUberon |
85
|
|
|
annotate_func = staticmethod(annotate_uberon) |
86
|
|
|
|
87
|
|
|
|
88
|
|
|
class AnnotateDictDevelStage(AnnotateTaskMixin, MyTask): |
89
|
|
|
name = "Annotate DictDevelStage" |
90
|
|
|
description = ( |
91
|
|
|
"Annotate developmental stages with ontologies using Zooma tools") |
92
|
|
|
model = DictDevelStage |
93
|
|
|
annotate_func = staticmethod(annotate_dictdevelstage) |
94
|
|
|
|
95
|
|
|
|
96
|
|
|
class AnnotateDictPhysioStage(AnnotateTaskMixin, MyTask): |
97
|
|
|
name = "Annotate DictPhysioStage" |
98
|
|
|
description = ( |
99
|
|
|
"Annotate physiological stages with ontologies using Zooma tools") |
100
|
|
|
model = DictPhysioStage |
101
|
|
|
annotate_func = staticmethod(annotate_dictphysiostage) |
102
|
|
|
|
103
|
|
|
|
104
|
|
|
class AnnotateAll(MyTask): |
105
|
|
|
name = "Annotate All" |
106
|
|
|
description = """Annotate all dict tables using Zooma""" |
107
|
|
|
lock_id = "AnnotateAll" |
108
|
|
|
|
109
|
|
|
def run(self): |
110
|
|
|
""" |
111
|
|
|
This function is called when delay is called. It will acquire a lock |
112
|
|
|
in redis, so those tasks are mutually exclusive |
113
|
|
|
|
114
|
|
|
Returns: |
115
|
|
|
str: success if everything is ok. Different messages if task is |
116
|
|
|
already running or exception is caught""" |
117
|
|
|
|
118
|
|
|
# debugging instance |
119
|
|
|
self.debug_task() |
120
|
|
|
|
121
|
|
|
# blocking condition: get a lock or exit with statement |
122
|
|
|
with redis_lock( |
123
|
|
|
self.lock_id, blocking=False, expire=False) as acquired: |
124
|
|
|
if acquired: |
125
|
|
|
# do stuff and return something |
126
|
|
|
return self.call_zooma() |
127
|
|
|
|
128
|
|
|
message = "%s already running!" % (self.name) |
129
|
|
|
|
130
|
|
|
logger.warning(message) |
131
|
|
|
|
132
|
|
|
return message |
133
|
|
|
|
134
|
|
|
def call_zooma(self): |
135
|
|
|
"""Start all task in a group and wait for a reply""" |
136
|
|
|
|
137
|
|
|
tasks = [ |
138
|
|
|
AnnotateCountries(), AnnotateBreeds(), AnnotateSpecies(), |
139
|
|
|
AnnotateUberon(), AnnotateDictDevelStage(), |
140
|
|
|
AnnotateDictPhysioStage() |
141
|
|
|
] |
142
|
|
|
|
143
|
|
|
# instantiate the group |
144
|
|
|
annotate_task = group([task.s() for task in tasks]) |
145
|
|
|
|
146
|
|
|
logger.debug("Starting task %s" % (annotate_task)) |
147
|
|
|
|
148
|
|
|
# start the group task - shortcut for apply_asyinc |
149
|
|
|
result = annotate_task.delay() |
150
|
|
|
|
151
|
|
|
logger.debug(result) |
152
|
|
|
|
153
|
|
|
# in order to avoid: Never call result.get() within a task! |
154
|
|
|
# https://stackoverflow.com/a/39975099/4385116 |
155
|
|
|
with allow_join_result(): |
156
|
|
|
while result.waiting() is True: |
157
|
|
|
logger.debug("Waiting for zooma tasks to complete") |
158
|
|
|
time.sleep(10) |
159
|
|
|
|
160
|
|
|
# get results |
161
|
|
|
results = result.join() |
162
|
|
|
|
163
|
|
|
for i, task in enumerate(tasks): |
164
|
|
|
logger.debug("%s returned %s" % (task.name, results[i])) |
165
|
|
|
|
166
|
|
|
return "success" |
167
|
|
|
|
168
|
|
|
|
169
|
|
|
# --- task registering |
170
|
|
|
|
171
|
|
|
|
172
|
|
|
# register explicitly tasks |
173
|
|
|
# https://github.com/celery/celery/issues/3744#issuecomment-271366923 |
174
|
|
|
celery_app.tasks.register(AnnotateCountries) |
175
|
|
|
celery_app.tasks.register(AnnotateBreeds) |
176
|
|
|
celery_app.tasks.register(AnnotateSpecies) |
177
|
|
|
celery_app.tasks.register(AnnotateUberon) |
178
|
|
|
celery_app.tasks.register(AnnotateDictDevelStage) |
179
|
|
|
celery_app.tasks.register(AnnotateDictPhysioStage) |
180
|
|
|
celery_app.tasks.register(AnnotateAll) |
181
|
|
|
|