您的位置:首页 > 编程语言 > Python开发

python多进程并发中,解决数据共享问题Value+Array

2018-02-24 21:50 666 查看
参考文章:http://www.jb51.net/article/57666.htm之前多线程运行的时候,全部用的全局变量,代码如下:
#!/usr/bin/env python
#encoding: utf-8

import requestSender as AB
import random
import threading, time
import os
TOTAL = 0
SUCC = 0
FAIL = 0
EXCEPT = 0
MAXTIME = 0
MINTIME = 100
GT3 = 0
LT3 = 0
TOTAL_TIME=0
lock=threading.Lock()
class RequestThread(threading.Thread):
def __init__(self, thread_name):
threading.Thread.__init__(self)
self.test_count = 0
self.name=thread_name

def run(self):
global lock
if lock.acquire():
print self.name
self.test_performace()
lock.release()

def test_performace(self):
global TOTAL
global SUCC
global FAIL
global EXCEPT
global GT3
global LT3
global TOTAL_TIME
try:
st = time.time()
#AB.ingress_test1_0()
if AB.ingress_test1_0()== "OK":
TOTAL += 1
SUCC += 1
else:
TOTAL += 1
FAIL += 1
time_span = time.time() - st
TOTAL_TIME+=time_span
self.maxtime(time_span)
self.mintime(time_span)
if time_span > 3:
GT3 += 1
else:
LT3 += 1
except Exception as e:
print('Error:',e)
TOTAL += 1
EXCEPT += 1

def maxtime(self, ts):
global MAXTIME
if ts > MAXTIME:
MAXTIME = ts

def mintime(self, ts):
global MINTIME
if ts < MINTIME:
MINTIME = ts

def main(thread_count):
print('===========task start===========')
start_time = time.ctime()
thread_count = thread_count
threads = []
for i in range(thread_count):
t = RequestThread("thread" + str(i))
threads.append(t)
t.start()
for t in threads:
t.join()
print('===========task end===========')
print(start_time)
print("total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT))
print('response total_time:', TOTAL_TIME)
print('response average_time:', TOTAL_TIME /TOTAL)
print('response maxtime:', MAXTIME)
print('response mintime', MINTIME)
print('great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL))
print('less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
time.sleep(60 * random.randint(1, 5))
AB.ingress_test1_1()
time.sleep(60 * random.randint(1, 5))
for x in xrange(10):
main(10)
if not os.path.isdir("zxtest"):
os.mkdir("zxtest")
AB.myLog('zxtest/test1_0.log',"test1_0:","total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT),'response total_time:', TOTAL_TIME,
'response maxtime:', MAXTIME,'response mintime', MINTIME,'average time',TOTAL_TIME/TOTAL,'great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL),
'less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))

为了更好利用多核资源,改成多进程+携程,但是python的多进程默认无法共享全局变量,所以问题来了。解决方案就是利用Value+Array,在多进程间共享全局变量,代码贴出来如下;
# !/user/bin/env python
# encoding:utf-8
# code by Iris
from multiprocessing import Process, Value, Array
import requestSender as AB
from gevent import monkey; monkey.patch_all()
import gevent
import random
import time
import os
def test_performace(func,n,a):
'''
TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME=a
:param func:
:return:
'''
n.value = n.value + 1
try:
st = time.time()
if func() == "OK":
a[0]+= 1
a[1] += 1
else:
a[0] += 1
a[2] += 1
time_span = time.time() - st
a[8] += time_span
if time_span>a[4]:
a[4]=time_span
else:
pass
if time_span<a[5]:
a[5]=time_span
else:
pass
if time_span > 3:
a[6] += 1
else:
a[7] += 1
except Exception as e:
print('Error:', e)
a[0] += 1
a[3] += 1
def process_start(Coroutine_num,func,n,a):
tasks = []
for i in xrange(Coroutine_num):
tasks.append(gevent.spawn(test_performace,func,n,a))
gevent.joinall(tasks)  # 使用协程来执行
def task_start(progress_num,Coroutine_num,func,num, arr):
for i in xrange(progress_num):
p = Process(target=process_start, args=(Coroutine_num,func,num, arr))
print('===========task start===========')
start_time = time.ctime()
print(start_time)
p.start()
p.join()
print('===========task end===========')
if __name__=="__main__":
num = Value('f', 0)
arr = Array('f', [0]*9)
arr[5]=100
for x in xrange(1):
task_start(2, 3, AB.ingress_test1_0,num,arr)
TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME=arr
print TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME
print("total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT))
print('response total_time:', TOTAL_TIME)
print('response average_time:', TOTAL_TIME / TOTAL)
print('response maxtime:', MAXTIME)
print('response mintime', MINTIME)
print('great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL))
print('less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
if not os.path.isdir("zxtest"):
os.mkdir("zxtest")
AB.myLog('zxtest/test1_0.log', "test1_0:", "total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT),
'response total_time:', TOTAL_TIME,
'response maxtime:', MAXTIME, 'response mintime', MINTIME, 'average time', TOTAL_TIME / TOTAL,
'great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL),
'less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐