您的位置:首页 > 编程语言 > Go语言

django-celery提供给顾客使用实例

2014-12-29 17:03 447 查看
导入数据库

from
djcelery
import
models as celery_models


更新日期

def create(self, request, *args, **kwargs):
enabled = request.POST.get('enabled', None)
if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]:
return self.operate_fail('无效参数')
if enabled == self.DISABLED_POST_VALUE:
self.disable_task(self.TASK_NAME)
return self.operate_success()
else:
try:
day_of_month = int(request.POST.get('day_of_month', ''))
if day_of_month > 28 or day_of_month < 1:
return self.operate_fail('日期必须在1-28日之间')
task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading",
task="mrs_app.my_celery.tasks.monthly_reading_task")
if created:
crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month,
hour=0,
minute=0)
crontab.save()
task.crontab = crontab
task.enabled = True
task.save()
else:
task.crontab.day_of_month = day_of_month
task.crontab.save()
task.enabled = True
task.save()
return self.operate_success()
except ValueError:
return self.operate_fail('抄表日不能为空')


关闭定时

def disable_task(self, name):
try:
task = celery_models.PeriodicTask.objects.get(name=name)
task.enabled = False
task.save()
return True
except celery_models.PeriodicTask.DoesNotExist:
return True


定义任务的两种格式

类定义:一个继承了celery.app.task的类并实现了run方法

函数定义:@task装饰的函数   

from celery import task

#第一种,函数方式
@task(name='monthly_reading')
def monthly_reading_task():
task_obj = MonthlyReading(debug=False)
task_obj.start()

#第二种,类定义
class MonthlyReadingTask(Task):
name='monthly_reading'
def run(*args, **kwargs):
task_obj = MonthlyReading(debug=False)
task_obj.start()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐