Completed
Pull Request — master (#51)
by Paolo
06:42
created

zooma.tasks   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 99
dl 0
loc 181
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A AnnotateTaskMixin.run() 0 12 2
A AnnotateBreeds.run() 0 12 2
A AnnotateAll.call_zooma() 0 33 4
A AnnotateAll.run() 0 24 3
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Oct 25 11:27:52 2018
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import time
10
11
from celery import group
12
from celery.result import allow_join_result
13
from celery.utils.log import get_task_logger
14
15
from common.tasks import redis_lock
16
from image.celery import app as celery_app, MyTask
17
from image_app.models import (
18
    DictCountry, DictBreed, DictSpecie, DictUberon, DictDevelStage,
19
    DictPhysioStage)
20
21
from .helpers import (
22
    annotate_country, annotate_breed, annotate_specie, annotate_uberon,
23
    annotate_dictdevelstage, annotate_dictphysiostage)
24
25
# Get an instance of a logger
26
logger = get_task_logger(__name__)
27
28
29
class AnnotateTaskMixin():
30
    name = None
31
    descripttion = None
32
    model = None
33
    annotate_func = None
34
35
    def run(self):
36
        """This function is called when delay is called"""
37
38
        logger.debug("Starting %s" % self.name.lower())
39
40
        # get all countries without a term
41
        for term in self.model.objects.filter(term__isnull=True):
42
            self.annotate_func(term)
43
44
        logger.debug("%s completed" % self.name.lower())
45
46
        return "success"
47
48
49
class AnnotateCountries(AnnotateTaskMixin, MyTask):
50
    name = "Annotate Countries"
51
    description = """Annotate countries with ontologies using Zooma tools"""
52
    model = DictCountry
53
    annotate_func = staticmethod(annotate_country)
54
55
56
class AnnotateBreeds(MyTask):
57
    name = "Annotate Breeds"
58
    description = """Annotate breeds with ontologies using Zooma tools"""
59
60
    def run(self):
61
        """This function is called when delay is called"""
62
63
        logger.debug("Starting %s" % self.name.lower())
64
65
        # get all breeds without a term
66
        for breed in DictBreed.objects.filter(mapped_breed_term__isnull=True):
67
            annotate_breed(breed)
68
69
        logger.debug("%s completed" % self.name.lower())
70
71
        return "success"
72
73
74
class AnnotateSpecies(AnnotateTaskMixin, MyTask):
75
    name = "Annotate Species"
76
    description = """Annotate species with ontologies using Zooma tools"""
77
    model = DictSpecie
78
    annotate_func = staticmethod(annotate_specie)
79
80
81
class AnnotateUberon(AnnotateTaskMixin, MyTask):
82
    name = "Annotate Uberon"
83
    description = "Annotate organism parts with ontologies using Zooma tools"
84
    model = DictUberon
85
    annotate_func = staticmethod(annotate_uberon)
86
87
88
class AnnotateDictDevelStage(AnnotateTaskMixin, MyTask):
89
    name = "Annotate DictDevelStage"
90
    description = (
91
        "Annotate developmental stages with ontologies using Zooma tools")
92
    model = DictDevelStage
93
    annotate_func = staticmethod(annotate_dictdevelstage)
94
95
96
class AnnotateDictPhysioStage(AnnotateTaskMixin, MyTask):
97
    name = "Annotate DictPhysioStage"
98
    description = (
99
        "Annotate physiological stages with ontologies using Zooma tools")
100
    model = DictPhysioStage
101
    annotate_func = staticmethod(annotate_dictphysiostage)
102
103
104
class AnnotateAll(MyTask):
105
    name = "Annotate All"
106
    description = """Annotate all dict tables using Zooma"""
107
    lock_id = "AnnotateAll"
108
109
    def run(self):
110
        """
111
        This function is called when delay is called. It will acquire a lock
112
        in redis, so those tasks are mutually exclusive
113
114
        Returns:
115
            str: success if everything is ok. Different messages if task is
116
            already running or exception is caught"""
117
118
        # debugging instance
119
        self.debug_task()
120
121
        # blocking condition: get a lock or exit with statement
122
        with redis_lock(
123
                self.lock_id, blocking=False, expire=False) as acquired:
124
            if acquired:
125
                # do stuff and return something
126
                return self.call_zooma()
127
128
        message = "%s already running!" % (self.name)
129
130
        logger.warning(message)
131
132
        return message
133
134
    def call_zooma(self):
135
        """Start all task in a group and wait for a reply"""
136
137
        tasks = [
138
            AnnotateCountries(), AnnotateBreeds(), AnnotateSpecies(),
139
            AnnotateUberon(), AnnotateDictDevelStage(),
140
            AnnotateDictPhysioStage()
141
        ]
142
143
        # instantiate the group
144
        annotate_task = group([task.s() for task in tasks])
145
146
        logger.debug("Starting task %s" % (annotate_task))
147
148
        # start the group task - shortcut for apply_asyinc
149
        result = annotate_task.delay()
150
151
        logger.debug(result)
152
153
        # in order to avoid: Never call result.get() within a task!
154
        # https://stackoverflow.com/a/39975099/4385116
155
        with allow_join_result():
156
            while result.waiting() is True:
157
                logger.debug("Waiting for zooma tasks to complete")
158
                time.sleep(10)
159
160
            # get results
161
            results = result.join()
162
163
        for i, task in enumerate(tasks):
164
            logger.debug("%s returned %s" % (task.name, results[i]))
165
166
        return "success"
167
168
169
# --- task registering
170
171
172
# register explicitly tasks
173
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
174
celery_app.tasks.register(AnnotateCountries)
175
celery_app.tasks.register(AnnotateBreeds)
176
celery_app.tasks.register(AnnotateSpecies)
177
celery_app.tasks.register(AnnotateUberon)
178
celery_app.tasks.register(AnnotateDictDevelStage)
179
celery_app.tasks.register(AnnotateDictPhysioStage)
180
celery_app.tasks.register(AnnotateAll)
181