您的位置:首页 > 编程语言 > Go语言

django使用celery做异步执行过程

2017-11-15 14:41 579 查看

执行过程持续返回

简要

1. 挑用插件:
django-celery  jenkins
2. 基于Python版本:
Python2.7
3. 基于django版本:
django1.8.18


主要问题:

需要把jenkins代码发布的执行过程持续输出,后台执行完成后消息数据返回到前台。


解决方案:

使用django-celery的异步执行操作,把异步执行的数据返回到rabbitmq,从rabbitmq读取数据写入数据库,页面中返回数据库的信息,页面使用定时器,当返回结果不为失败或者成功时,5秒刷新页面,读取最新的操作内容。


具体代码如下:

1. django setting.py配置:

djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
BROKER_POOL_LIMIT = 0
#CELERY_TASK_SERIALIZER = 'json'                    (由于设置Json以后输入也必须要使用Json格式,暂时未启用)
#CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_RESULT_BACKEND= 'amqp'

INSTALLED_APPS = (
'djcelery',
'app',
)

2. django celery.py配置:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from app.backend.tasks import *
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CMDB.settings')

app = Celery('CMDB')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

3. app view配置:

def job_release(request):
if request.method == 'GET':
try:
global job_enable                                 (执行开关,防止多次执行)
id = request.GET.get('id')                        (获取指定的ID)
name =  request.GET.get('name')                    (执行会带name,返回到执行页面不会带name,防止返回被执行)
all_result = jenkins_return.objects.filter(id=id) (获取当前执行的项目信息)
job_name = all_result[0]                          (执行的项目名称)
full_name = all_result.values()[0]['full_name']   (执行的项目全称,jenkins需要指定到子目录才能build)
except:
all_result = job_release_return.objects.filter(id=id)    (返回,或者刷新,还是获取到当前的内容)
return render_to_response("job_release.html",locals())
else:
if job_enable == 1:                                     (执行开关,为1的时候开启执行功能)
res = goto_html.delay(full_name,job_name)           (异步执行第一步,准备执行)
import_goto_html.delay(res.get())                   (导入到数据库)
ret_in = get_job_runinng_in.delay(full_name,res.get()) (异步执行第二步,获取执行中状态内容)
ret_end =get_job_runinng_end.delay(full_name,res.get()) (异步执行第三部,取得执行结果)
job_enable = 0                                          (执行完成后功能取消,当返回到执行页面时复位)
time.sleep(0.5)                                             (返回内容)
all_result = job_release_return.objects.all()
return render_to_response("job_release.html",locals())

4. js配置(很简单,不是执行成功或者执行失败时刷新页面):

<script language="JavaScript">
function myrefresh(){
window.location.reload();
}
function show(){
var mytable = document.getElementById("dataTables-example").rows[1].cells[7].innerHTML;
if(mytable == "执行成功" || mytable == "执行失败"){
return true;}
else{
myrefresh();}
}
setTimeout('show()',5000);
</script>

5. app task.py配置:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import task
from app.backend.fb_jenkins import *
import urllib
import jenkins
import time
from datetime import datetime
import sys

jenkins_server_url=""                         (jenkins地址)
host=""                                       (主机IP)
user_id = ""                                  (jenkins用户)
api_token = ""                                (jenkins token)
#server=jenkins.Jenkins(jenkins_server_url, username=user_id, password=api_token)   (这里两种方式,一直使用oken,另一种使用passwd)
server=jenkins.Jenkins(jenkins_server_url, username=user_id, password="*****")
joblist=[]

#(jenkins的方法,获取项目构建过程)
@task
def get_job_output(jobName):
last = server.get_job_info(jobName)['lastBuild']['number']
return server.get_build_console_output(jobName,last)

# (进入执行页面,并执行job任务)
@task
def goto_html(full_name,jobName):
result = get_job_output(full_name)
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
result_log = " {0} {1} 程序开始构建".format(start_time, full_name)
parameters = {}
parameters['project0001'] = 'test001'
server.build_job(full_name,parameters)
results = []
workspace = result.split('Building in workspace')[1].strip().split('\n')[0]
next_build_number = server.get_job_info(full_name)['nextBuildNumber']
results.extend([jobName,host,workspace,start_time,"end_time",next_build_number,next_build_number,"执行开始","result",result_log])
return results

# (写入数据库)
@task
def import_goto_html(ret):
import_release_sql(ret)
return True

# (执行中状态)
@task
def get_job_runinng_in(full_name,ret):
result = get_job_output(full_name)
results = []
jobName = ret[0]
start_time = ret[3]
workspace = ret[2]
next_build_number = ret[5]
time_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for i in range(20):
builds = server.get_running_builds()
if len(builds)!=0:break
time.sleep(0.2)
result_log = ret[9] + "\n {0} {1} 程序构建中".format(time_now, full_name)
results.extend(
[jobName, host, workspace, start_time, "end_time", next_build_number, next_build_number, "执行中", "result",
result_log])
import_goto_html(results)
return results

# (执行完成状态)
@task
def get_job_runinng_end(full_name,ret):
results = []
jobName = ret[0]
start_time = ret[3]
workspace = ret[2]
next_build_number = ret[5]
time_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time.sleep(3)
while True:
try:
build_info = server.get_build_info(full_name, next_build_number)
except:
continue
else:
if build_info[u'building'] == False:
break
time.sleep(1)

end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
result = get_job_output(full_name)
result_log = ret[9] + "\n {0} {1} 程序构建中".format(time_now, full_name) + "\n {0} {1} 程序构建完成".format(end_time,full_name)
results.extend(
[jobName, host, workspace, start_time, end_time, next_build_number, next_build_number, "执行成功", result,
result_log])
import_goto_html(results)
return results
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: