您的位置:首页 > 编程语言 > Go语言

Celery-RabbitMQ-Django-Cron

2017-01-01 11:40 417 查看
摘要: Celery-RabbitMQ-Django-Cron

celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。

0. 安装

0.1 install

1. celery 基础命令

1.1. 启动 celery

1.2. 后台进程启动celery worker

1.3. 重启 celery worker 后台进程

1.4. 停止 celery worker 后台进程

1.5. 等待 停止 celery worker 后台进程

1.6. 指定 pidfile & logfile

1.7. 启动 多个 worker 并且指定不同参数

1.8. 手动杀死所有worker进程

1.9. 较完整的celery的启动命令

2. 管理相关命令

2.1. 启动 flower

2.2. 需要 目录

2.3. 使用 librabbitmq

0.1 install

sudo apt-get install build-essential python-dev
sudo pip install celery
sudo pip install librabbitmq
sudo pip install flower
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart
sudo rabbitmq-plugins disable rabbitmq_management

访问 http://host:15672 即可进入管理界面。

默认用户名,密码都是 guest

1.1. 启动 celery

celery -A task worker -l info

启动时指定要使用的 queue

celery -A task worker -l info -Q brand_queue,new_queue -E


1.2. 后台进程启动celery worker

celery multi start w1 -A task -l info -Q brand_queue,new_queue -E

可通过命令查看后台启动的进程:

ps -aux | grep celery
[celeryd: w1@x :MainProcess] -active- (worker -E -A task -l info -Q brand_queue,new_queue --logfile=w1.log --pidfile=w1.pid --hostname=w1@x)

可以看到默认添加了几个参数:

--logfile=w1.log    默认在当前文件夹新建 w1.log 文件
--pidfile=w1.pid    默认在当前文件夹新建 w1.pid 文件
--hostname=w1@x     默认实例名 woker_name/机器名

1.3. 重启 celery worker 后台进程

celery multi restart w1 -A task -l info -Q brand_queue,new_queue -E

1.4. 停止 celery worker 后台进程

celery multi stop w1 -A task -l info -Q brand_queue,new_queue -E

stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,

并且不会写停止worker相关的日志

1.5. 等待 停止 celery worker 后台进程

celery multi stopwait w1 -A task -l info -Q brand_queue,new_queue -E

这个停止命令,会等待正在运行的所有任务都完成再停止。

1.6. 指定 pidfile & logfile

celery multi start w1 -A task -l info -Q brand_queue,new_queue -E --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log

1.7. 启动 多个 worker 并且指定不同参数

celery multi start 10 -A task -l info -E -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 debug

启动了10个worker:

worker 1,2,3 使用了队列 images, video

worker 4,5 使用了队列 data

worker 其他 使用了队列 default

-L 是什么参数?

1.8. 手动杀死所有worker进程

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

1.9. 较完整的celery的启动命令

celery multi start w1 -A task -l info -Q brand_queue,new_queue,time_queue,cron_queue -E -B -s /var/www/api/space/run/celerybeat-schedule --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log


2.1 启动 flower

celery flower --broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger --address=192.168.0.4 --port=5555 --broker_api=http://tiger:tiger@192.168.0.6:15672/api/ --basic_auth=tiger:tiger
rabbitmq 主机地址: 192.168.0.6
本机地址         : 192.168.0.4
本地监听端口      : 5555

2.2 需要目录

run/
log/

2.3 使用 librabbitmq

If you’re using RabbitMQ (AMQP) as the broker then you can install the librabbitmq module to use an optimized client written in C:
$ pip install librabbitmq

注意

1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。

2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。

代码示例

配置文件:

# -*- coding=utf-8 -*-
# File Name: task_conf.py

from __future__ import absolute_import

from celery   import Celery
from datetime import timedelta
from celery.schedules import crontab

'''
task_random: 是任务的名称
broker: 通过 amqp://用户名:密码@ip/虚拟主机连接 amqp
include: 任务程序
'''

# 消息队列配置
mq_host = '192.168.0.6'
mq_name = 'tiger'
mq_pass = 'tiger'
mq_vr   = 'vr_tiger'

broker  = 'amqp://%s:%s@%s/%s' % (mq_name, mq_pass, mq_host, mq_vr)

# 初始化 app
app = Celery('name_wash', broker=broker, include=['task'])

# 指定任务存储队列
app.conf.update(

CELERY_ROUTES = {
'task.exe_task':{'queue':'brand_queue'},
'task.task_sms_send':{'queue':'new_queue'},
'task.task_sec':{'queue':'time_queue'},
'task.task_cron':{'queue':'cron_queue'}
},

CELERYBEAT_SCHEDULE = {
'exe-every-10-seconds': {
'task': 'task.task_sec',
'schedule': timedelta(seconds=30),
'args': [1],
},

'add-every-monday-morning': {
'task': 'task.task_cron',
'schedule': crontab(hour=15, minute=47, day_of_week=5),
'args': (15232897835,),
},
},

#CELERY_TASK_SERIALIZER     = 'json',
#CELERY_ACCEPT_CONTENT      = ['json'],  # Ignore other content
#CELERY_RESULT_SERIALIZER   = 'json',
CELERY_EVENT_QUEUE_TTL      = 5,
CELERY_TIMEZONE             = 'Asia/Shanghai',
CELERY_ENABLE_UTC           = True,
CELERY_DISABLE_RATE_LIMITS  = True,
CELERY_IGNORE_RESULT        = True

)

if __name__ == '__main__':
app.start()


任务文件:

# -*- coding=utf-8 -*-
# File Name: task.py

'''
task
'''

from __future__ import absolute_import

import time
import traceback

from job.task_conf import app

@app.task(ignore_result=True)
def exe_task(task_id, number):
''' 根据参数执行任务 '''

try:
print 'exe task: ', task_id
time.sleep(number)
except:
traceback.print_exc()
return (task_id, number, -1)

return 'true :)'

@app.task
def task_sms_send(mobile, content):
''' 任务-发送短信 '''

try:
print 'send sms: mobile-> %s , content-> %s' % (mobile, content)
except:
traceback.print_exc()
return 'Fail :('

return 'Success :)'

@app.task
def task_sec(mobile):
''' 测试 任务 时间 定制 '''

try:
print 'send sms: mobile-> %s.' % mobile
except:
traceback.print_exc()
return 'F'

return 'S'

@app.task
def task_cron(mobile):
''' 测试 任务 时间 定制 Cron '''

try:
print 'send sms: mobile-> %s.' % mobile
except:
traceback.print_exc()
return 'F - cron'

return 'S - cron'

def main():
res = exe_task(2, 2)
print 'res: ', res

if __name__ == '__main__':
main()


添加任务代码:

# -*- coding=utf-8 -*-
# File Name: add_task.py

import time
import traceback
import random

from task import exe_task, task_sms_send

def action():
tries = 0

while 1:
try:
tries += 1
if tries >= 20:
break

task_id = tries
number = random.randint(1, 5)

exe_task.apply_async(args=[task_id, number], queue='brand_queue')
print 'added one task'
time.sleep(1)
except:
traceback.print_exc()
pass

print 'add task done'

def add_task_by_django(task_id, number):
''' 测试 从 django 添加 任务 '''

exe_task.apply_async(args=[task_id, number], queue='brand_queue')

def add_task_sms(mobile, content):
''' 添加 发送 短信 任务 '''

# 列表参数或者字典参数
# task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

task_sms_send.apply_async(args=[mobile, content], queue='new_queue')
print 'task added: sms'

def main():

action()

if __name__ == '__main__':

main()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息