您的位置:首页 > 运维架构 > Docker

django+celery+docker搭建记录(2)-Django中使用Celery

2018-03-19 18:58 357 查看
官方文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

1. 创建Django项目

如果没安装Django先安装

pip install django

创建一个新的django项目proj

django-admin startproject proj

新建出的目录结构

2. 加入Celery

创建文件proj/proj/celery_app.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

设置celery默认使用Django的settings文件作为配置文件

namespace='CELERY'声明celery相关的配置都应该以CELERY_开头

app.autodiscover_tasks()表示从Django的所有app中获取task

修改proj/proj/__init__.py

from celery_app import app as celery_app

__all__ = ['celery_app']

修改后确保Django启动时app被加载

3. 增加app

在proj项目中添加一个app,命名为app1

django-admin startapp app1

在app1目录下创建tasks.py

from celery import shared_task
import time

@shared_task(bind=True)
def add(self, x, y)
for i in range(10):
self.update_state(state='PROGRESS', meta={'current': i, 'total': 10})
time.sleep(1)
print 'add', x, y
return x + y

这里的task还是上篇文章中的task,注意这里使用的是shared_task

修改proj/proj/settings.py文件

CELERY_BROKER_URL = 'amqp://vanya:12345@localhost:5672/vanya'
CELERY_RESULT_BACKEND = 'redis://:vanya@localhost:6380/0'

ALLOWED_HOSTS = ['*']

# Application definition

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app1',
]

添加app1到INSTALLED_APPS中,设置ALLOWED_HOSTS为所有主机,加入Celery的broker和backend配置

修改proj/app1/views.py,加入请求处理函数

from django.http import HttpResponse, JsonResponse
from tasks import add
import time

def async_add_func(request):
if request.method != 'GET':
return HttpResponse('method error !')
x = request.GET.get('x')
y = request.GET.get('y')
if x and y:
task = add.delay(x, y)
return HttpResponse('task_id is {0}'.format(task.task_id))
return HttpResponse("param error ! ")

def add_func(request):
if request.method != 'GET':
return HttpResponse('method error !')
x = request.GET.get('x')
y = request.GET.get('y')
if x and y:
task = add.delay(x, y)
while not task.ready():
time.sleep(1)
return HttpResponse('result is {0}'.format(task.get()))
return HttpResponse("param error ! ")

def query_func(request):
if request.method != 'GET':
return HttpResponse('method error !')
q = request.GET.get('q')
if not q:
return HttpResponse("Hello world ! ")
task = add.AsyncResult(q)  # 输入id获取任务实例
status = task.state
progress = 0
result = ''
    if status == u'SUCCESS':
        progress = '100'
        result = task.get()
    elif status == u'FAILURE':
        progress = '0'
    elif status == 'PROGRESS':
        progress = '%d / %d' % (task.info['current'], task.info['total'])
    return JsonResponse({'status': status, 'progress': progress, 'result': result})

这里加入三个函数:

add_func:同步获取任务结果,结果直接显示

async_add_func:异步调用任务,页面返回任务id

query_func:查询异步任务状态及结果

修改proj/proj/urls.py,加入路由

from django.conf.urls import url
from django.contrib import admin
from app1.views import add_func, async_add_func, query_func

urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^add$', add_func),
url(r'^async_add$', async_add_func),
url(r'^query$', query_func),
]

分别加入三个路由对应三个处理函数

4. 启动Celery Worker及Django

在最外层proj目录下执行celery worker启动命令

celery -A proj worker -l info

创建Django数据库

python manage.py makemigrations
python manage.py migrate

启动Django服务

python manage.py runserver 0.0.0.0:8888

直接浏览器请求,提示Page not found,因为url路由未配置空路径,但说明Django服务已经成功启动

5. 同步请求

浏览器请求

http://yourip:8888/add?x=2&y=5

浏览器会在等待10秒左右后返回结果(这里因为浏览器输入的参数为字符串,所以task按照字符串相加得到'25')

后台worker输出

6. 异步请求

浏览器请求

http://yourip:8888/async_add?x=2&y=5


浏览器返回任务id,使用id请求状态

http://yourip:8888/query?q=20f58fed-d4c1-4a40-8b09-3bf6e13a6fb2


十秒后再次请求
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Django Celery Python