《Python3程序开发指南(第二版)》例题之多线程文件查找关键词
2020-02-17 04:24
344 查看
今天继续《Python3程序开发指南(第二版)》的多线程例题讲解.
上一次我写到 例题之多进程文件查找单词,然后这篇博文来讲解如何用多线程文件查找关键词
自己的尝试
在了解到多进程解决多文件的查找关键字的解决思路后,我尝试自己写了一下多线程的解决方法(这个方法并没提高多少效率,想看原书的解决方请直接跳到后面)
大概思路
- 查找函数
grep_word()
进行参数的处理,获取查找单词,查找文件列表 grep_word()
生成线程对象(threading.Thread的子类)列表- 给每个线程分配固定数量的文件,使用块方法进行单词查找
- 便于测试,使用Thread.join()使得主程序比子线程晚结束
代码没什么复杂,就是将多进程解决方法中的进程改成了线程,直接贴代码,同时也补充了一些注释
但事实上和多进程没有什么区别,因为多进程也是多线程的一种,只不过多进程不能使用共有变量而已.
补充一点,这样子好像会造成死锁,很幸运,我遇到了o(╯□╰)o,具体情况我还需要研究一下,所以看书上用的方法吧
""" 作者: 子狼 日期: 2019/8/8 17:38 项目名称: programming_in_python3 文件名称: grepword """ import os import sys import time import optparse from threading import Thread # Thread子类,相比直接使用Thread类可以更好的扩展 class GrepWordThread(Thread): def __init__(self, number, word='default'): Thread.__init__(self) self.number = number # 用于调试 self.BLOCK_SIZE = 8000 # 分块读取的大小 self.filenames = [] # 需要处理的文件列表 self.word = word # 需要查找的单词 def add_filename(self, filename): self.filenames.append(filename) def set_word(self, word): self.word = word def run(self): for filename in self.filenames: try: # 以二进制模式进行读操作 with open(filename, 'rb') as fh: # print("begin find file {0}{1}:".format(self.number, filename)) previous = fh.read(0).decode('utf-8', 'ignore') while True: # 如果文件很大,一次性读取这些文件会出现问题,所以我们以块的形式读入每个文件, # 这个方法的另一个好处是如果提前出现了可以不用往下找,因为我们目的只是判断是否存在 current = fh.read(self.BLOCK_SIZE) # 文件读取完毕 if not current: break # 假定所有文件都是用utf-8编码 current = current.decode('utf-8', 'ignore') if (self.word in current) or (self.word in previous[-len(self.word):] + current[:len(self.word)]): print("{0}{1}".format(self.number, filename)) break if len(current) != self.BLOCK_SIZE: break previous = current # print("end find file {0}{1}:".format(self.number, filename)) except EnvironmentError as err: print("ERROR: {0}{1}{2}".format(self.number, filename, err)) def parse_option(args_list=None): args_list = args_list if args_list else sys.argv[1:] parse = optparse.OptionParser() parse.add_option("-r", "--recurse", dest="recurse", default="True", help="subdirectory recursion or not" ) parse.add_option("-n", "--numprocess", type=int, dest="numprocess", default=7, help="number of process") # parse.add_option("-f", "--filename", dest="filename", help="Filename that need to be searched") 文件直接在所有参数后面输入 parse.add_option("-w", "--word", dest="word", help="Searched words") parse.add_option("-d", "--debug", dest="debug", default="debug", help="if now debug") options, args = parse.parse_args(args_list) # print("123") # 返回 相应的opts, 查询词, 未匹配的词 return options, options.word, args def get_files(args, recurse): file_list = [] # 假设目录一定以/结尾且不存在小数点 for filename in args: # isfile, isdir... if os.path.isfile(filename): file_list.append(filename) # recurse if "true" in recurse.lower() and os.path.isdir(filename): for root, dirs, files in os.walk(filename): for file in files: file_list.append(str(os.path.join(root, file))) # for item in file_list: # print(item) return file_list def grep_word(): # 手动输入参数 args_list = [item for item in input().split()] print(time.clock()) # 获取用户指定的命令行选项, 待搜索单词, 待搜索文件列表 opts, word, args = parse_option(args_list) # 带读取的文件列表(文件列表, 是否递归搜索) file_list = get_files(args, opts.recurse) # 每个进程被分配的文件数量 files_per_process = len(file_list) // opts.numprocess # 分片(多余的文件给第一个进程) start, end = 0, (files_per_process + (len(file_list) % opts.numprocess)) number = 1 threads = [] while start < len(file_list): thread = GrepWordThread(number) threads.append(thread) thread.set_word(word) for filename in file_list[start:end]: thread.add_filename(filename) thread.start() # ???会死锁??? number += 1 start, end = end, end + files_per_process for thread in threads: thread.join() if __name__ == '__main__': grep_word() print(time.clock())
书上的方法
大概思路
使用Queue保存文件列表, 进程不断从queue中取文件, 比如: 进程A开始读取queue.get(),同时queue.pop(),由于A涉及到文件读写,被阻塞, 进程B读取queue.get(),也被阻塞,在某个进程n被阻塞的时候,A进程执行完之前读取的文件,并使用queue.task_done()通知此任务已被完成, 再次读取一个新的queue.get(),n+1进程继续读取被阻塞, 之前被阻塞的进程由于处理文件的大小不一样, 执行结束时间也不一样, 但是都是执行完本身的任务立刻执行下一个任务. 此方法的好处是, queue自带锁机制, 而且如果有个文件特别大, 可以一个进程从头执行到尾,其他进程执行其他任务,相比任务分配的多进程方法,这个方法更加灵活.
""" 作者: 王海霞 日期: 2019/8/8 21:54 项目名称: programming_in_python3 文件名称: grepword_t """ import sys from queue import Queue from threading import Thread import optparse import os def parse_option(args_list=None): args_list = args_list if args_list else sys.argv[1:] parse = optparse.OptionParser() parse.add_option("-r", "--recurse", dest="recurse", default="True", help="subdirectory recursion or not" ) parse.add_option("-n", "--numprocess", type=int, dest="numprocess", default=7, help="number of process") # parse.add_option("-f", "--filename", dest="filename", help="Filename that need to be searched") 文件直接在所有参数后面输入 parse.add_option("-w", "--word", dest="word", help="Searched words") parse.add_option("-d", "--debug", dest="debug", default="false", help="if now debug") options, args = parse.parse_args(args_list) # print("123") # 返回 相应的opts, 查询词, 未匹配的词 return options, options.word, args def get_files(args, recurse): file_list = [] # 假设目录一定以/结尾且不存在小数点 for filename in args: # isfile, isdir... if os.path.isfile(filename): file_list.append(filename) # recurse if "true" in recurse.lower() and os.path.isdir(filename): for root, dirs, files in os.walk(filename): for file in files: file_list.append(str(os.path.join(root, file))) # for item in file_list: # print(item) return file_list class Worker(Thread): def __init__(self, work_queue, word, number): Thread.__init__(self) self.work_queue = work_queue self.word = word self.number = number self.BLOCK_SIZE = 8000 def run(self): while True: try: filename = self.work_queue.get() self.process(filename) except EnvironmentError as err: print("{0}{1}".format(self.number, err)) finally: # 表示以前排队的任务已完成。 # 由队列使用者线程使用。 # 对于用于获取任务的每个get(),对task_done()的后续调用会告知队列该任务的处理已完成。 # 如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用)。 # 如果调用的次数超过队列中放置的项目,则引发ValueError。 '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' self.work_queue.task_done() def process(self, filename): try: # 以二进制模式进行读操作 with open(filename, 'rb') as fh: # print("begin find file {0}{1}:".format(self.number, filename)) previous = fh.read(0).decode('utf-8', 'ignore') while True: # 如果文件很大,一次性读取这些文件会出现问题,所以我们以块的形式读入每个文件, # 这个方法的另一个好处是如果提前出现了可以不用往下找,因为我们目的只是判断是否存在 current = fh.read(self.BLOCK_SIZE) # 文件读取完毕 if not current: break # 假定所有文件都是用utf-8编码 current = current.decode('utf-8', 'ignore') if (self.word in current) or (self.word in previous[-len(self.word):] + current[:len(self.word)]): print("{0}{1}".format(self.number, filename)) break if len(current) != self.BLOCK_SIZE: break previous = current # print("end find file {0}{1}:".format(self.number, filename)) except EnvironmentError as err: print("ERROR: {0}{1}{2}".format(self.number, filename, err)) def grepword(): args_list = [item for item in input().split()] opts, word, args = parse_option(args_list) file_list = get_files(args, opts.recurse) work_queue = Queue() for i in range(opts.numprocess): number = "{0}:".format(i+1) if "true" in opts.debug.lower() else "" worker = Worker(work_queue, word, number) # 守护进程 # 主程序在没有非守护进程的时候会退出 # 在start()前设置 # The entire Python program exits when no alive non-daemon threads are left. worker.daemon = True worker.start() for filename in file_list: work_queue.put(filename) work_queue.join() # 与Queue.task_done配合使用 if __name__ == '__main__': grepword()
- 点赞 2
- 收藏
- 分享
- 文章举报
相关文章推荐
- python导入模块
- python实现用户好友推荐
- python基于item-item filtering实现话题推荐
- 用python爬取高考网历年高考分数线将数据放入MySQL并绘制图表
- 软件测试基础+测试开发+python+测试工具免费领取
- python自学中的我
- ZZULI 1015: 计算时间间隔 Python
- python起步——可变对象和不可变对象
- Gvim —— win 7 下 vim 环境配置 及python开发常用设置
- 使用python脚本提取OC中写死的字符串方便国际化
- python3利用pandas将csv文件中的数据导入mysql数据库
- 【Python基础操作】1.数据读取(csv,Excel,MySQL)
- Python3.6.2 图形界面模块Tk (Day1)
- 用python+tkinter写个校验和工具
- python selenium+pywin32 实现网页另存为
- python 输出所有大小写字母和0~9数字
- pycharm导入python包
- python神器pycharm的安装
- python字符串前加 f 的含义
- Python中 IOError 和 FileNotFoundError