使用multiprocesss模块进程通信采用队列方式,子进程run执行完一直不能退出的问题
2017-09-30 13:32
381 查看
进程间通信常用的方法有信号量、共享内存、消息队列,python的
在项目中使用
项目背景:
由于在线程对象里不能创建 logger 对象,子进程产生的日志如果需要保存,一种不太推荐的方法是把日志放到消息队列里,子进程结束后主进程把日志从信息队列中读出来。 以下是重现项目中出现问题的代码:
运行结果如下:
程序中创建了8个进程的进程池,往进程池里放入15个任务,每个任务往消息队列
修改 run() 方法里的
从上面的结果不难看出:
虽然说构造 Queue( )如果不指定队列
通过消息队列在进程间传递日志是一种不好的做法,日志内容量大时容易出现各种各样的问题
multiprocesss模块提供了与平台无关的进程相关的API
在项目中使用
multiprocesss多进程编程时遇到奇怪的问题是: 使用消息队列在不同进程间通信,子进程run方法执行后进程一直不能退出,主进程里调用join方法等待子进程结束,子进程一直不能退出从而导致主进程不能退出
项目背景:
由于在线程对象里不能创建 logger 对象,子进程产生的日志如果需要保存,一种不太推荐的方法是把日志放到消息队列里,子进程结束后主进程把日志从信息队列中读出来。 以下是重现项目中出现问题的代码:
import ctypes import re from multiprocessing import Queue from multiprocessing import Pool import random import time from multiprocessing import Value from multiprocessing import Process from src.spider.logmanage import loadLogger from src.util.mongodbhelper import MongoDBCRUD class Counsumer(Process): def __init__(self, queue: Queue, name: str, logqueue: Queue): super().__init__() self.queue = queue self.name = name self.flag = Value(ctypes.c_bool, False) self.logqueue = logqueue def run(self): print("{} process stared!".format(self.name)) arraylist = [] while True: # self.flag 为共享内存里的boolean变量, # 主进程把该变量设置为true时退出循环 if self.flag.value: print("counsumer.flag = {}".format(self.flag.value)) # 往日志队列里写入 2432B 字符 for j in range(1): self.logqueue.put("*" * (2048+256+128)) # 从MongoDb里查询集合中文档数量 count = MongoDBCRUD.query_collection_count("person") print("collection conference has {} documents".format(count)) try: # 往MongoDb中插入 50 条数据 for i in range(50): MongoDBCRUD.insert({"name": "Sam{}".format(i), "age": 29+i}, "person") except Exception as e: print("操作数据库时发生异常:\n{}\n".format(e)) self.logqueue.put("操作数据库时发生异常:\n{}\n".format(e)) while not MongoDBCRUD.execresultqueue.empty(): self.logqueue.put(MongoDBCRUD.execresultqueue.get()) break while not self.queue.empty(): arraylist.append(self.queue.get()) if len(arraylist) >= 6: print("*" * 50) print(arraylist) arraylist.clear() print("comsumer process exit!") def doMatch(s: str, queue: Queue): index = random.randint(0, len(s)-4) queue.put(s[index:index+2]) time.sleep(1) if __name__ == '__main__': processpool = Pool(processes=8) logger = loadLogger("../testunit/applogconfig.ini") queue = Queue(10) logqueue = Queue() counsumer = Counsumer(queue, "consumer_process", logqueue) # 启动 counsumer 进程 counsumer.start() param = "0123456789abcdefghijklmnopqrstuvwxyz" for i in range(15): processpool.apply_async(doMatch(param, queue)) processpool.close() processpool.join() # 进程池里的任务全部结束,把共享内存里的flag标志位置为True counsumer.flag.value = True # 等待 counsumer 进程结束 counsumer.join() # 读取子进程里产生的日志队列数据 while not logqueue.empty(): logger.debug(logqueue.get()) print("main process done!")
运行结果如下:
程序中创建了8个进程的进程池,往进程池里放入15个任务,每个任务往消息队列
queue里放一个运算完成的结果,在子进程counsumer 里每当检测到
queue里消息数量大于等于6就把消息读取出来处理,当进程池里的所有任务处理完,把
counsumer对象的共享内存变量 flag 置为 true ,让子进程退出循环,退出循环前往日志队列里写入2.4k字节的数据; 退出循环后可以看到在控制台里打印了run() 方法最后一句,但程序没有完全退出,使用任务管理器查看可以看到还有两个进程一直没有退出,由于进程池结束了,所以这两个进程应该是 主进程 和 counsumer进程
修改 run() 方法里的
self.logqueue.put("*" * (2048+256+128))这句, 把这句改为
self.logqueue.put("*" * (2048+256))后再运行的结果如下:
从上面的结果不难看出:
虽然说构造 Queue( )如果不指定队列
maxsize默认是队列是无大小限制,但从上面奇怪的问题来看,如果往队列里放入大对象 将会导致进程无法正常退出,目前还是不清楚进程为什么一直处于阻塞状态
通过消息队列在进程间传递日志是一种不好的做法,日志内容量大时容易出现各种各样的问题
相关文章推荐
- web集群--代码问题--判断定时任务退出的变量不能采用本地变量!
- Android中application 全局变量 && 使用TAB页不能退出的问题
- 金牌信通V6无法打开,报错,出现问题,不能使用,更新失败,请求失败等问题处理方式
- 采用自执行的匿名函数解决for循环使用闭包的问题
- 使用crontab不能正常执行的问题
- 使用crontab不能正常执行的问题
- CGYWIN 编译的可执行程序在WINDOWS平台中运行时,解决system函数不能使用的问题
- U盘或者移动银盘退出时一直显示占用中问题解决--最粗暴解决方式
- 使用ODBC和OLEDB两种方式执行同一个存储过程,结果不同的问题!
- teamcity执行jmeter脚本使用Executable with parameters方式不能正确运行解决思路
- 使用VHD方式安装Win7不能启动的问题
- 解决使用快捷方式执行的程序需要依赖特定环境变量的问题
- css使用text-align: justify不能实现两段对其的问题解决方式
- 解决VC++在WIN7下使用ADO方式连接ACCESS数据库到XP不能运行的问题
- CGYWIN 编译的可执行程序在WINDOWS平台中运行时,解决system函数不能使用的问题
- centos源码方式安装ipython2.7,setuptools、pip并解决ipython不能不能使用方向键的问题
- Fragment 中使用 SwipeRefreshLayout 导致的不能退出问题
- 解决linux系统不能使用方向键执行命令的问题
- 使用HttpURLConnection采用get方式请求数据-----乱码问题
- CentOS6.3采用server方式安装后网卡不能加载或启动时不能连接网卡的问题