celery_worker.setup_periodic_tasks()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
from celery import Celery
2
3
from api.app import create_app
4
from api.tasks import sample
5
6
7
def create_celery(app):
8
    celery = Celery(app.import_name,
9
                    backend=app.config['CELERY_RESULT_BACKEND'],
10
                    broker=app.config['BROKER_URL'])
11
    celery.conf.update(app.config)
12
    TaskBase = celery.Task
13
14
    class ContextTask(TaskBase):
15
        abstract = True
16
17
        def __call__(self, *args, **kwargs):
18
            with app.app_context():
19
                return TaskBase.__call__(self, *args, **kwargs)
20
    celery.Task = ContextTask
21
    return celery
22
23
24
flask_app = create_app()
25
celery = create_celery(flask_app)
26
27
28
@celery.on_after_configure.connect
29
def setup_periodic_tasks(sender, **kwargs):
30
    # Calls sample every 1 minutes
31
    sender.add_periodic_task(60.0, sample, name='sample every 1 min')
32