|
1
|
|
|
#!/usr/bin/env python3 |
|
2
|
|
|
# -*- coding: utf-8 -*- |
|
3
|
|
|
""" |
|
4
|
|
|
Created on Thu Oct 25 11:27:52 2018 |
|
5
|
|
|
|
|
6
|
|
|
@author: Paolo Cozzi <[email protected]> |
|
7
|
|
|
""" |
|
8
|
|
|
|
|
9
|
|
|
import time |
|
10
|
|
|
|
|
11
|
|
|
from celery import group |
|
12
|
|
|
from celery.result import allow_join_result |
|
13
|
|
|
from celery.utils.log import get_task_logger |
|
14
|
|
|
|
|
15
|
|
|
from common.tasks import redis_lock |
|
16
|
|
|
from image.celery import app as celery_app, MyTask |
|
17
|
|
|
from image_app.models import ( |
|
18
|
|
|
DictCountry, DictBreed, DictSpecie, DictUberon, DictDevelStage, |
|
19
|
|
|
DictPhysioStage) |
|
20
|
|
|
|
|
21
|
|
|
from .helpers import ( |
|
22
|
|
|
annotate_country, annotate_breed, annotate_specie, annotate_uberon, |
|
23
|
|
|
annotate_dictdevelstage, annotate_dictphysiostage) |
|
24
|
|
|
|
|
25
|
|
|
# Get an instance of a logger |
|
26
|
|
|
logger = get_task_logger(__name__) |
|
27
|
|
|
|
|
28
|
|
|
|
|
29
|
|
|
class AnnotateTaskMixin(): |
|
30
|
|
|
name = None |
|
31
|
|
|
descripttion = None |
|
32
|
|
|
model = None |
|
33
|
|
|
annotate_func = None |
|
34
|
|
|
|
|
35
|
|
|
def run(self): |
|
36
|
|
|
"""This function is called when delay is called""" |
|
37
|
|
|
|
|
38
|
|
|
logger.debug("Starting %s" % self.name.lower()) |
|
39
|
|
|
|
|
40
|
|
|
# get all countries without a term |
|
41
|
|
|
for term in self.model.objects.filter(term__isnull=True): |
|
42
|
|
|
self.annotate_func(term) |
|
43
|
|
|
|
|
44
|
|
|
logger.debug("%s completed" % self.name.lower()) |
|
45
|
|
|
|
|
46
|
|
|
return "success" |
|
47
|
|
|
|
|
48
|
|
|
|
|
49
|
|
|
class AnnotateCountries(AnnotateTaskMixin, MyTask): |
|
50
|
|
|
name = "Annotate Countries" |
|
51
|
|
|
description = """Annotate countries with ontologies using Zooma tools""" |
|
52
|
|
|
model = DictCountry |
|
53
|
|
|
annotate_func = staticmethod(annotate_country) |
|
54
|
|
|
|
|
55
|
|
|
|
|
56
|
|
|
class AnnotateBreeds(AnnotateTaskMixin, MyTask): |
|
57
|
|
|
name = "Annotate Breeds" |
|
58
|
|
|
description = """Annotate breeds with ontologies using Zooma tools""" |
|
59
|
|
|
model = DictBreed |
|
60
|
|
|
annotate_func = staticmethod(annotate_breed) |
|
61
|
|
|
|
|
62
|
|
|
|
|
63
|
|
|
class AnnotateSpecies(AnnotateTaskMixin, MyTask): |
|
64
|
|
|
name = "Annotate Species" |
|
65
|
|
|
description = """Annotate species with ontologies using Zooma tools""" |
|
66
|
|
|
model = DictSpecie |
|
67
|
|
|
annotate_func = staticmethod(annotate_specie) |
|
68
|
|
|
|
|
69
|
|
|
|
|
70
|
|
|
class AnnotateUberon(AnnotateTaskMixin, MyTask): |
|
71
|
|
|
name = "Annotate Uberon" |
|
72
|
|
|
description = "Annotate organism parts with ontologies using Zooma tools" |
|
73
|
|
|
model = DictUberon |
|
74
|
|
|
annotate_func = staticmethod(annotate_uberon) |
|
75
|
|
|
|
|
76
|
|
|
|
|
77
|
|
|
class AnnotateDictDevelStage(AnnotateTaskMixin, MyTask): |
|
78
|
|
|
name = "Annotate DictDevelStage" |
|
79
|
|
|
description = ( |
|
80
|
|
|
"Annotate developmental stages with ontologies using Zooma tools") |
|
81
|
|
|
model = DictDevelStage |
|
82
|
|
|
annotate_func = staticmethod(annotate_dictdevelstage) |
|
83
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
class AnnotateDictPhysioStage(AnnotateTaskMixin, MyTask): |
|
86
|
|
|
name = "Annotate DictPhysioStage" |
|
87
|
|
|
description = ( |
|
88
|
|
|
"Annotate physiological stages with ontologies using Zooma tools") |
|
89
|
|
|
model = DictPhysioStage |
|
90
|
|
|
annotate_func = staticmethod(annotate_dictphysiostage) |
|
91
|
|
|
|
|
92
|
|
|
|
|
93
|
|
|
class AnnotateAll(MyTask): |
|
94
|
|
|
name = "Annotate All" |
|
95
|
|
|
description = """Annotate all dict tables using Zooma""" |
|
96
|
|
|
lock_id = "AnnotateAll" |
|
97
|
|
|
|
|
98
|
|
|
def run(self): |
|
99
|
|
|
""" |
|
100
|
|
|
This function is called when delay is called. It will acquire a lock |
|
101
|
|
|
in redis, so those tasks are mutually exclusive |
|
102
|
|
|
|
|
103
|
|
|
Returns: |
|
104
|
|
|
str: success if everything is ok. Different messages if task is |
|
105
|
|
|
already running or exception is caught""" |
|
106
|
|
|
|
|
107
|
|
|
# debugging instance |
|
108
|
|
|
self.debug_task() |
|
109
|
|
|
|
|
110
|
|
|
# blocking condition: get a lock or exit with statement |
|
111
|
|
|
with redis_lock( |
|
112
|
|
|
self.lock_id, blocking=False, expire=False) as acquired: |
|
113
|
|
|
if acquired: |
|
114
|
|
|
# do stuff and return something |
|
115
|
|
|
return self.call_zooma() |
|
116
|
|
|
|
|
117
|
|
|
message = "%s already running!" % (self.name) |
|
118
|
|
|
|
|
119
|
|
|
logger.warning(message) |
|
120
|
|
|
|
|
121
|
|
|
return message |
|
122
|
|
|
|
|
123
|
|
|
def call_zooma(self): |
|
124
|
|
|
"""Start all task in a group and wait for a reply""" |
|
125
|
|
|
|
|
126
|
|
|
tasks = [ |
|
127
|
|
|
AnnotateCountries(), AnnotateBreeds(), AnnotateSpecies(), |
|
128
|
|
|
AnnotateUberon(), AnnotateDictDevelStage(), |
|
129
|
|
|
AnnotateDictPhysioStage() |
|
130
|
|
|
] |
|
131
|
|
|
|
|
132
|
|
|
# instantiate the group |
|
133
|
|
|
annotate_task = group([task.s() for task in tasks]) |
|
134
|
|
|
|
|
135
|
|
|
logger.debug("Starting task %s" % (annotate_task)) |
|
136
|
|
|
|
|
137
|
|
|
# start the group task - shortcut for apply_asyinc |
|
138
|
|
|
result = annotate_task.delay() |
|
139
|
|
|
|
|
140
|
|
|
logger.debug(result) |
|
141
|
|
|
|
|
142
|
|
|
# in order to avoid: Never call result.get() within a task! |
|
143
|
|
|
# https://stackoverflow.com/a/39975099/4385116 |
|
144
|
|
|
with allow_join_result(): |
|
145
|
|
|
while result.waiting() is True: |
|
146
|
|
|
logger.debug("Waiting for zooma tasks to complete") |
|
147
|
|
|
time.sleep(10) |
|
148
|
|
|
|
|
149
|
|
|
# get results |
|
150
|
|
|
results = result.join() |
|
151
|
|
|
|
|
152
|
|
|
for i, task in enumerate(tasks): |
|
153
|
|
|
logger.debug("%s returned %s" % (task.name, results[i])) |
|
154
|
|
|
|
|
155
|
|
|
return "success" |
|
156
|
|
|
|
|
157
|
|
|
|
|
158
|
|
|
# --- task registering |
|
159
|
|
|
|
|
160
|
|
|
|
|
161
|
|
|
# register explicitly tasks |
|
162
|
|
|
# https://github.com/celery/celery/issues/3744#issuecomment-271366923 |
|
163
|
|
|
celery_app.tasks.register(AnnotateCountries) |
|
164
|
|
|
celery_app.tasks.register(AnnotateBreeds) |
|
165
|
|
|
celery_app.tasks.register(AnnotateSpecies) |
|
166
|
|
|
celery_app.tasks.register(AnnotateUberon) |
|
167
|
|
|
celery_app.tasks.register(AnnotateDictDevelStage) |
|
168
|
|
|
celery_app.tasks.register(AnnotateDictPhysioStage) |
|
169
|
|
|
celery_app.tasks.register(AnnotateAll) |
|
170
|
|
|
|