for testing and deploying your application
for finding and fixing issues
for empowering human code reviews
from celery import Celery
from api.app import create_app
from api.tasks import sample
def create_celery(app):
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = create_app()
celery = create_celery(flask_app)
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls sample every 1 minutes
sender.add_periodic_task(60.0, sample, name='sample every 1 min')