Completed
Pull Request — master (#51)
by Paolo
05:47
created

zooma.tasks   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 170
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 95
dl 0
loc 170
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A AnnotateTaskMixin.run() 0 12 2
A AnnotateAll.call_zooma() 0 33 4
A AnnotateAll.run() 0 24 3
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu Oct 25 11:27:52 2018
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import time
10
11
from celery import group
12
from celery.result import allow_join_result
13
from celery.utils.log import get_task_logger
14
15
from common.tasks import redis_lock
16
from image.celery import app as celery_app, MyTask
17
from image_app.models import (
18
    DictCountry, DictBreed, DictSpecie, DictUberon, DictDevelStage,
19
    DictPhysioStage)
20
21
from .helpers import (
22
    annotate_country, annotate_breed, annotate_specie, annotate_uberon,
23
    annotate_dictdevelstage, annotate_dictphysiostage)
24
25
# Get an instance of a logger
26
logger = get_task_logger(__name__)
27
28
29
class AnnotateTaskMixin():
30
    name = None
31
    descripttion = None
32
    model = None
33
    annotate_func = None
34
35
    def run(self):
36
        """This function is called when delay is called"""
37
38
        logger.debug("Starting %s" % self.name.lower())
39
40
        # get all countries without a term
41
        for term in self.model.objects.filter(term__isnull=True):
42
            self.annotate_func(term)
43
44
        logger.debug("%s completed" % self.name.lower())
45
46
        return "success"
47
48
49
class AnnotateCountries(AnnotateTaskMixin, MyTask):
50
    name = "Annotate Countries"
51
    description = """Annotate countries with ontologies using Zooma tools"""
52
    model = DictCountry
53
    annotate_func = staticmethod(annotate_country)
54
55
56
class AnnotateBreeds(AnnotateTaskMixin, MyTask):
57
    name = "Annotate Breeds"
58
    description = """Annotate breeds with ontologies using Zooma tools"""
59
    model = DictBreed
60
    annotate_func = staticmethod(annotate_breed)
61
62
63
class AnnotateSpecies(AnnotateTaskMixin, MyTask):
64
    name = "Annotate Species"
65
    description = """Annotate species with ontologies using Zooma tools"""
66
    model = DictSpecie
67
    annotate_func = staticmethod(annotate_specie)
68
69
70
class AnnotateUberon(AnnotateTaskMixin, MyTask):
71
    name = "Annotate Uberon"
72
    description = "Annotate organism parts with ontologies using Zooma tools"
73
    model = DictUberon
74
    annotate_func = staticmethod(annotate_uberon)
75
76
77
class AnnotateDictDevelStage(AnnotateTaskMixin, MyTask):
78
    name = "Annotate DictDevelStage"
79
    description = (
80
        "Annotate developmental stages with ontologies using Zooma tools")
81
    model = DictDevelStage
82
    annotate_func = staticmethod(annotate_dictdevelstage)
83
84
85
class AnnotateDictPhysioStage(AnnotateTaskMixin, MyTask):
86
    name = "Annotate DictPhysioStage"
87
    description = (
88
        "Annotate physiological stages with ontologies using Zooma tools")
89
    model = DictPhysioStage
90
    annotate_func = staticmethod(annotate_dictphysiostage)
91
92
93
class AnnotateAll(MyTask):
94
    name = "Annotate All"
95
    description = """Annotate all dict tables using Zooma"""
96
    lock_id = "AnnotateAll"
97
98
    def run(self):
99
        """
100
        This function is called when delay is called. It will acquire a lock
101
        in redis, so those tasks are mutually exclusive
102
103
        Returns:
104
            str: success if everything is ok. Different messages if task is
105
            already running or exception is caught"""
106
107
        # debugging instance
108
        self.debug_task()
109
110
        # blocking condition: get a lock or exit with statement
111
        with redis_lock(
112
                self.lock_id, blocking=False, expire=False) as acquired:
113
            if acquired:
114
                # do stuff and return something
115
                return self.call_zooma()
116
117
        message = "%s already running!" % (self.name)
118
119
        logger.warning(message)
120
121
        return message
122
123
    def call_zooma(self):
124
        """Start all task in a group and wait for a reply"""
125
126
        tasks = [
127
            AnnotateCountries(), AnnotateBreeds(), AnnotateSpecies(),
128
            AnnotateUberon(), AnnotateDictDevelStage(),
129
            AnnotateDictPhysioStage()
130
        ]
131
132
        # instantiate the group
133
        annotate_task = group([task.s() for task in tasks])
134
135
        logger.debug("Starting task %s" % (annotate_task))
136
137
        # start the group task - shortcut for apply_asyinc
138
        result = annotate_task.delay()
139
140
        logger.debug(result)
141
142
        # in order to avoid: Never call result.get() within a task!
143
        # https://stackoverflow.com/a/39975099/4385116
144
        with allow_join_result():
145
            while result.waiting() is True:
146
                logger.debug("Waiting for zooma tasks to complete")
147
                time.sleep(10)
148
149
            # get results
150
            results = result.join()
151
152
        for i, task in enumerate(tasks):
153
            logger.debug("%s returned %s" % (task.name, results[i]))
154
155
        return "success"
156
157
158
# --- task registering
159
160
161
# register explicitly tasks
162
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
163
celery_app.tasks.register(AnnotateCountries)
164
celery_app.tasks.register(AnnotateBreeds)
165
celery_app.tasks.register(AnnotateSpecies)
166
celery_app.tasks.register(AnnotateUberon)
167
celery_app.tasks.register(AnnotateDictDevelStage)
168
celery_app.tasks.register(AnnotateDictPhysioStage)
169
celery_app.tasks.register(AnnotateAll)
170