Celery-RabbitMQ-Django-Cron
2017-01-01 11:40
417 查看
摘要: Celery-RabbitMQ-Django-Cron
celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。
0. 安装
0.1 install
1. celery 基础命令
1.1. 启动 celery
1.2. 后台进程启动celery worker
1.3. 重启 celery worker 后台进程
1.4. 停止 celery worker 后台进程
1.5. 等待 停止 celery worker 后台进程
1.6. 指定 pidfile & logfile
1.7. 启动 多个 worker 并且指定不同参数
1.8. 手动杀死所有worker进程
1.9. 较完整的celery的启动命令
2. 管理相关命令
2.1. 启动 flower
2.2. 需要 目录
2.3. 使用 librabbitmq
0.1 install
访问 http://host:15672 即可进入管理界面。
默认用户名,密码都是 guest
1.1. 启动 celery
启动时指定要使用的 queue
1.2. 后台进程启动celery worker
可通过命令查看后台启动的进程:
可以看到默认添加了几个参数:
1.3. 重启 celery worker 后台进程
1.4. 停止 celery worker 后台进程
stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,
并且不会写停止worker相关的日志
1.5. 等待 停止 celery worker 后台进程
这个停止命令,会等待正在运行的所有任务都完成再停止。
1.6. 指定 pidfile & logfile
1.7. 启动 多个 worker 并且指定不同参数
启动了10个worker:
worker 1,2,3 使用了队列 images, video
worker 4,5 使用了队列 data
worker 其他 使用了队列 default
-L 是什么参数?
1.8. 手动杀死所有worker进程
1.9. 较完整的celery的启动命令
2.1 启动 flower
2.2 需要目录
2.3 使用 librabbitmq
注意
1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。
2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。
代码示例
配置文件:
任务文件:
添加任务代码:
celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。
0. 安装
0.1 install
1. celery 基础命令
1.1. 启动 celery
1.2. 后台进程启动celery worker
1.3. 重启 celery worker 后台进程
1.4. 停止 celery worker 后台进程
1.5. 等待 停止 celery worker 后台进程
1.6. 指定 pidfile & logfile
1.7. 启动 多个 worker 并且指定不同参数
1.8. 手动杀死所有worker进程
1.9. 较完整的celery的启动命令
2. 管理相关命令
2.1. 启动 flower
2.2. 需要 目录
2.3. 使用 librabbitmq
0.1 install
sudo apt-get install build-essential python-dev sudo pip install celery sudo pip install librabbitmq sudo pip install flower sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management sudo service rabbitmq-server restart sudo rabbitmq-plugins disable rabbitmq_management
访问 http://host:15672 即可进入管理界面。
默认用户名,密码都是 guest
1.1. 启动 celery
celery -A task worker -l info
启动时指定要使用的 queue
celery -A task worker -l info -Q brand_queue,new_queue -E
1.2. 后台进程启动celery worker
celery multi start w1 -A task -l info -Q brand_queue,new_queue -E
可通过命令查看后台启动的进程:
ps -aux | grep celery [celeryd: w1@x :MainProcess] -active- (worker -E -A task -l info -Q brand_queue,new_queue --logfile=w1.log --pidfile=w1.pid --hostname=w1@x)
可以看到默认添加了几个参数:
--logfile=w1.log 默认在当前文件夹新建 w1.log 文件 --pidfile=w1.pid 默认在当前文件夹新建 w1.pid 文件 --hostname=w1@x 默认实例名 woker_name/机器名
1.3. 重启 celery worker 后台进程
celery multi restart w1 -A task -l info -Q brand_queue,new_queue -E
1.4. 停止 celery worker 后台进程
celery multi stop w1 -A task -l info -Q brand_queue,new_queue -E
stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,
并且不会写停止worker相关的日志
1.5. 等待 停止 celery worker 后台进程
celery multi stopwait w1 -A task -l info -Q brand_queue,new_queue -E
这个停止命令,会等待正在运行的所有任务都完成再停止。
1.6. 指定 pidfile & logfile
celery multi start w1 -A task -l info -Q brand_queue,new_queue -E --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log
1.7. 启动 多个 worker 并且指定不同参数
celery multi start 10 -A task -l info -E -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 debug
启动了10个worker:
worker 1,2,3 使用了队列 images, video
worker 4,5 使用了队列 data
worker 其他 使用了队列 default
-L 是什么参数?
1.8. 手动杀死所有worker进程
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
1.9. 较完整的celery的启动命令
celery multi start w1 -A task -l info -Q brand_queue,new_queue,time_queue,cron_queue -E -B -s /var/www/api/space/run/celerybeat-schedule --pidfile=/var/www/api/space/run/%n.pid --logfile=/var/www/api/space/logs/%n%I.log
2.1 启动 flower
celery flower --broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger --address=192.168.0.4 --port=5555 --broker_api=http://tiger:tiger@192.168.0.6:15672/api/ --basic_auth=tiger:tiger rabbitmq 主机地址: 192.168.0.6 本机地址 : 192.168.0.4 本地监听端口 : 5555
2.2 需要目录
run/ log/
2.3 使用 librabbitmq
If you’re using RabbitMQ (AMQP) as the broker then you can install the librabbitmq module to use an optimized client written in C: $ pip install librabbitmq
注意
1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。
2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。
代码示例
配置文件:
# -*- coding=utf-8 -*- # File Name: task_conf.py from __future__ import absolute_import from celery import Celery from datetime import timedelta from celery.schedules import crontab ''' task_random: 是任务的名称 broker: 通过 amqp://用户名:密码@ip/虚拟主机连接 amqp include: 任务程序 ''' # 消息队列配置 mq_host = '192.168.0.6' mq_name = 'tiger' mq_pass = 'tiger' mq_vr = 'vr_tiger' broker = 'amqp://%s:%s@%s/%s' % (mq_name, mq_pass, mq_host, mq_vr) # 初始化 app app = Celery('name_wash', broker=broker, include=['task']) # 指定任务存储队列 app.conf.update( CELERY_ROUTES = { 'task.exe_task':{'queue':'brand_queue'}, 'task.task_sms_send':{'queue':'new_queue'}, 'task.task_sec':{'queue':'time_queue'}, 'task.task_cron':{'queue':'cron_queue'} }, CELERYBEAT_SCHEDULE = { 'exe-every-10-seconds': { 'task': 'task.task_sec', 'schedule': timedelta(seconds=30), 'args': [1], }, 'add-every-monday-morning': { 'task': 'task.task_cron', 'schedule': crontab(hour=15, minute=47, day_of_week=5), 'args': (15232897835,), }, }, #CELERY_TASK_SERIALIZER = 'json', #CELERY_ACCEPT_CONTENT = ['json'], # Ignore other content #CELERY_RESULT_SERIALIZER = 'json', CELERY_EVENT_QUEUE_TTL = 5, CELERY_TIMEZONE = 'Asia/Shanghai', CELERY_ENABLE_UTC = True, CELERY_DISABLE_RATE_LIMITS = True, CELERY_IGNORE_RESULT = True ) if __name__ == '__main__': app.start()
任务文件:
# -*- coding=utf-8 -*- # File Name: task.py ''' task ''' from __future__ import absolute_import import time import traceback from job.task_conf import app @app.task(ignore_result=True) def exe_task(task_id, number): ''' 根据参数执行任务 ''' try: print 'exe task: ', task_id time.sleep(number) except: traceback.print_exc() return (task_id, number, -1) return 'true :)' @app.task def task_sms_send(mobile, content): ''' 任务-发送短信 ''' try: print 'send sms: mobile-> %s , content-> %s' % (mobile, content) except: traceback.print_exc() return 'Fail :(' return 'Success :)' @app.task def task_sec(mobile): ''' 测试 任务 时间 定制 ''' try: print 'send sms: mobile-> %s.' % mobile except: traceback.print_exc() return 'F' return 'S' @app.task def task_cron(mobile): ''' 测试 任务 时间 定制 Cron ''' try: print 'send sms: mobile-> %s.' % mobile except: traceback.print_exc() return 'F - cron' return 'S - cron' def main(): res = exe_task(2, 2) print 'res: ', res if __name__ == '__main__': main()
添加任务代码:
# -*- coding=utf-8 -*- # File Name: add_task.py import time import traceback import random from task import exe_task, task_sms_send def action(): tries = 0 while 1: try: tries += 1 if tries >= 20: break task_id = tries number = random.randint(1, 5) exe_task.apply_async(args=[task_id, number], queue='brand_queue') print 'added one task' time.sleep(1) except: traceback.print_exc() pass print 'add task done' def add_task_by_django(task_id, number): ''' 测试 从 django 添加 任务 ''' exe_task.apply_async(args=[task_id, number], queue='brand_queue') def add_task_sms(mobile, content): ''' 添加 发送 短信 任务 ''' # 列表参数或者字典参数 # task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) task_sms_send.apply_async(args=[mobile, content], queue='new_queue') print 'task added: sms' def main(): action() if __name__ == '__main__': main()
相关文章推荐
- 使用django+celery+RabbitMQ实现异步执行
- python安装包制作|Django+Celery+Rabbitmq
- 使用django+celery+RabbitMQ实现异步执行
- 使用 django+celery+RabbitMQ 实现异步执行
- 使用django+celery+RabbitMQ实现异步执行
- Python Django Celery 实现异步任务(二)使用rabbitmq 作为broker
- 原创安装大全啊,安装配置:celery,rabbitMQ,NodeJs,Maven + Nexus,Django,Python,RabbitMQ,TTserver,Easy_install,redis
- 使用django+celery+RabbitMQ实现异步执行
- 使用 django+celery+RabbitMQ 实现异步执行
- 使用django+celery+RabbitMQ实现异步执行
- 关于windows下django使用celery实现异步调用(RabbitMQ)
- django框架如何集成celery进行开发
- python3+django使用celery执行某些任务失败的解决方案
- django-cron
- Celery与RabbitMQ、Redis
- 初探:celery在django下的应用.md
- django+celery+docker搭建记录(3)- 使用django定时执行celery任务
- Django 使用 Celery 实现异步任务
- django中使用celery(二)
- Django集成celery实战小项目