您的位置:首页 > 其它

celery beat实战

2015-08-14 16:44 232 查看
测试CELERYBEAT的例子:

celery_test

|proj

|__init__.py

|celery.py

|email_task.py

|calcu_tasks.py

start_server.sh

|proj_v1

|xxx_task.py

__init__.py为空文件。

celery.py,

[code]#-*-coding=utf-8-*-
from__future__importabsolute_import
fromceleryimportCelery
fromcelery.schedulesimportcrontab
fromkombuimportQueue
app=Celery("proj",
broker="redis://10.121.84.90:16379/6",
include=['proj.email_task','proj.calcu_tasks']#!!!!!
)
app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(Queue('hipri'),),
#CELERY_ROUTES={
#"proj.email_task.do_email":{'queue':'hipri'},
#},
CELERYBEAT_SCHEDULE={
"do_email":{
"task":"proj.email_task.do_email",
"schedule":crontab(minute="*/1"),
"args":(),
"options":{'queue':'default'}
},
"do_email_new":{
"task":"proj.email_task.do_email_new",
"schedule":crontab(minute="*/1"),
"args":(),
"options":{'queue':'hipri'}
},
"add":{
"task":"proj.calcu_tasks.add",
"schedule":crontab(minute="*/1"),
"args":(3,4),
"options":{'queue':'hipri'}
},
},
)
if__name__=='__main__':
pass
#app.start()
[/code]

email_task.py,

[code]from__future__importabsolute_import
importsys
importos
importhashlib
importtime
fromproj.celeryimportapp
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append(os.path.join(os.path.dirname(__file__),"./"))
@app.task()
defdo_email():
print'begintoemail'
time.sleep(5)
print'emailcomplete'
@app.task()
defdo_email_new():
print'begintoemailnew'
time.sleep(5)
print'emailnewcomplete'
[/code]

calcu_tasks.py,

[code]from__future__importabsolute_import
importsys
importos
importhashlib
importtime
fromproj.celeryimportapp
@app.task
defadd(x,y):
print'%d+%d=%d'%(x,y,x+y)
[/code]

start_server.sh,

[code]#!/bin/bash
#nohupcelery-Aprojworker-ndefault_worker-c2-B-Qdefault-ldebug&
#nohupcelery-Aprojworker-nhipri_worker-c2-B-Qhipri-ldebug&
#celerymultistartdefault_workerhipri_worker-Aproj-c2-B-Q:default_workerdefault-Q:hipri_workerhipri-ldebug
nohupcelery-Aprojworker-ndefault_worker-c2-Qdefault-ldebug&
nohupcelery-Aprojworker-nhipri_worker-c2-Qhipri-ldebug&
nohupcelery-Aprojbeat&
[/code]

xxx_task.py为随便定义的类似email_task.py的一个celery服务程序。

1.经验证,不同目录共享同一个app=Celery()对象是不行的,比如上面例子中proj目录下celery.py中的app不能共享给proj_v1目录下的xxx_task.py使用。

2.

nohupcelery-Aprojworker-ndefault_worker-c2-B-Qdefault-ldebug&
nohupcelery-Aprojworker-nhipri_worker-c2-B-Qhipri-ldebug&

上面用-B选项开启多个worker的做法,会导致一个任务被执行两次,因为celerybeat实例开启了两个。正确的做法,

nohupcelery-Aprojworker-ndefault_worker-c2-Qdefault-ldebug&
nohupcelery-Aprojworker-nhipri_worker-c2-Qhipri-ldebug&
nohupcelery-Aprojbeat&

也就是说,celerybeat实例只应该开启一个。

3.CELERYBEAT_SCHEDULE中实现routing似乎比较简单。

4.添加celery定时任务,除了手动配置CELERYBEAT_SCHEDULE,也可以通过函数add_periodic_task(...)来完成,详见http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: