利用PPL实现复杂的多线程模式的生产者-消费者
2014-06-24 23:36
337 查看
[作者]
常用网名: 猪头三
出生日期: 1981.XX.XX
生理特征: 男
婚姻状况: 已婚
个人网站: http://www.x86asm.com
Email: pliceman_110@163.com
QQ交流: 643439947
编程生涯: 2001年~至今[13年]
职业生涯: 11年
开发语言: C/C++、80x86ASM、Object Pascal、C#
开发工具: VC++、VC#、Delphi
技能种类: 逆向 驱动 磁盘 文件
研发领域: Windows应用软件安全/Windows系统内核安全/Windows系统磁盘数据安全
项目经历: 磁盘性能优化/文件系统数据恢复/文件信息采集/敏感文件监测跟踪/网络安全检测
[序言]
至于为什么要研究PPL, 还是因为5年前的问题至今没有解决:如何让文件遍历更加快~~例如我们需要统计某个文件夹的总大小,或者某个分区里所有文件的总大小.这是在做类似系统维护工具, 必须要处理的问题.通过这段时间认真学习和测试PPL的各种理论, 终结如下:
1> 如果任务仅仅是枚举文件而不做任何操作, PPL是无法提升性能以及效率. 运行速度跟非PPL一样.
2> PPL对操作跟磁盘IO没有关系的运算非常快, 比如数值算法
3> PPL在商业软件开发上, 做任务并行处理非常方便. 比如杀毒软件在做全盘扫描, 如果用PPL来做时,可以把扫描, 分析(动态脱壳, 特征码搜索, MD5校验, PE头检查...)等把各种行为按照任务来进行并发处理,
4> 一般来说作为一个普通的软件开发人员在使用PPL时按照任务为单位的并行处理, 可以让你避免各种多线程开发的各种坑爹行为.
[什么是PPL]
PPL是Parallel Patterns Library, 是微软开发工具提供的一个用于并行开发的库. 其实PPL不是重点, 并行理论才是重点, 你只有掌握了并行编程的相关理论才会使用PPL. 如果想了解相关并行编程的资料, 可以去MSDN学习.
Parallel Programming in Visual C++
http://msdn.microsoft.com/en-us/library/hh875062.aspx
[代码演示]
这2天我花费10多个小时来尝试写出类似杀毒软件做全盘扫描文件的并行代码, 此代码仅仅是一种思路, 利用了生产者-消费者的多线程理论. 生产者是枚举全盘文件并把目录和文件分别分发给消费者, 让消费者对目标文件或者目录进行病毒分析, 比如: 动态脱壳, 特征码搜索, MD5校验, PE头检查...此代码仅仅用到的PPL里面的:concurrency::combinable、concurrency::concurrent_queue、concurrency::parallel_for_each这3个东西却实现了复杂的多线程模式的生产者-消费者机制.
常用网名: 猪头三
出生日期: 1981.XX.XX
生理特征: 男
婚姻状况: 已婚
个人网站: http://www.x86asm.com
Email: pliceman_110@163.com
QQ交流: 643439947
编程生涯: 2001年~至今[13年]
职业生涯: 11年
开发语言: C/C++、80x86ASM、Object Pascal、C#
开发工具: VC++、VC#、Delphi
技能种类: 逆向 驱动 磁盘 文件
研发领域: Windows应用软件安全/Windows系统内核安全/Windows系统磁盘数据安全
项目经历: 磁盘性能优化/文件系统数据恢复/文件信息采集/敏感文件监测跟踪/网络安全检测
[序言]
至于为什么要研究PPL, 还是因为5年前的问题至今没有解决:如何让文件遍历更加快~~例如我们需要统计某个文件夹的总大小,或者某个分区里所有文件的总大小.这是在做类似系统维护工具, 必须要处理的问题.通过这段时间认真学习和测试PPL的各种理论, 终结如下:
1> 如果任务仅仅是枚举文件而不做任何操作, PPL是无法提升性能以及效率. 运行速度跟非PPL一样.
2> PPL对操作跟磁盘IO没有关系的运算非常快, 比如数值算法
3> PPL在商业软件开发上, 做任务并行处理非常方便. 比如杀毒软件在做全盘扫描, 如果用PPL来做时,可以把扫描, 分析(动态脱壳, 特征码搜索, MD5校验, PE头检查...)等把各种行为按照任务来进行并发处理,
4> 一般来说作为一个普通的软件开发人员在使用PPL时按照任务为单位的并行处理, 可以让你避免各种多线程开发的各种坑爹行为.
[什么是PPL]
PPL是Parallel Patterns Library, 是微软开发工具提供的一个用于并行开发的库. 其实PPL不是重点, 并行理论才是重点, 你只有掌握了并行编程的相关理论才会使用PPL. 如果想了解相关并行编程的资料, 可以去MSDN学习.
Parallel Programming in Visual C++
http://msdn.microsoft.com/en-us/library/hh875062.aspx
[代码演示]
这2天我花费10多个小时来尝试写出类似杀毒软件做全盘扫描文件的并行代码, 此代码仅仅是一种思路, 利用了生产者-消费者的多线程理论. 生产者是枚举全盘文件并把目录和文件分别分发给消费者, 让消费者对目标文件或者目录进行病毒分析, 比如: 动态脱壳, 特征码搜索, MD5校验, PE头检查...此代码仅仅用到的PPL里面的:concurrency::combinable、concurrency::concurrent_queue、concurrency::parallel_for_each这3个东西却实现了复杂的多线程模式的生产者-消费者机制.
#include "stdafx.h" #include <afx.h> #include <afxinet.h> #include <iostream> #include <vector> #include <ppl.h> #include <concurrent_vector.h> #include <concurrent_queue.h> // 统计目录/文件的数量以及大小总量 concurrency::combinable<__int64> g_int64_Dirs_Count; concurrency::combinable<__int64> g_int64_Files_Count; concurrency::combinable<__int64> g_int64_AllFiles_Size; // 判断全盘枚举文件任务是否退出 concurrency::combinable<int> g_int_FilesFinder_IsExit; // 统计耗时 (单位:毫秒) template <class Function> __int64 time_call(Function&& funpoint_param_Fun) { __int64 int64_begin = GetTickCount(); funpoint_param_Fun(); return GetTickCount() - int64_begin; } // 获取所有盘符 void fun_GetAllLogicalDriveName(std::vector<std::wstring> & vector_DriveName) { DWORD dword_BufferLen = 0; dword_BufferLen = ::GetLogicalDriveStrings(0, NULL); PWCHAR pwchar_DriveName = new WCHAR[dword_BufferLen]; PWCHAR pwchar_DriveName_Next = NULL; if (::GetLogicalDriveStrings(dword_BufferLen, pwchar_DriveName)) { pwchar_DriveName_Next = pwchar_DriveName; while (*pwchar_DriveName != NULL) { switch (GetDriveType(pwchar_DriveName)) { case DRIVE_UNKNOWN: break; case DRIVE_NO_ROOT_DIR: break; case DRIVE_REMOVABLE: break; case DRIVE_FIXED: { std::wstring str_DriveName(pwchar_DriveName); vector_DriveName.push_back(str_DriveName); break; } case DRIVE_REMOTE: break; case DRIVE_CDROM: break; case DRIVE_RAMDISK: break; default: break; } pwchar_DriveName += lstrlenW(pwchar_DriveName) + 1; } } // 久不用C++了,释放空间引起异常,懒得修改 //delete[]pwchar_DriveName; }// End fun_GetAllLogicalDriveName() // 枚举系统目录 void fun_FindDirsAndFiles(CStringW str_param_RootDir, concurrency::concurrent_vector<std::wstring> & cvector_param_DirsAndFiles, concurrency::concurrent_queue<std::wstring> & cqueue_param_Dirs, concurrency::concurrent_queue<std::wstring> & cqueue_param_Files) { CFileFind class_DirAndFileFinder; CStringW cstring_RootPath; CStringW cstr_Path; std::wstring str_Path; cstring_RootPath = str_param_RootDir; if (cstring_RootPath.Right(1) != "\\") { cstring_RootPath += "\\"; } cstring_RootPath += "*.*"; BOOL bool_Finding = class_DirAndFileFinder.FindFile(cstring_RootPath); while (bool_Finding) { bool_Finding = class_DirAndFileFinder.FindNextFile(); if (class_DirAndFileFinder.IsDots()) { continue; } else if (class_DirAndFileFinder.IsDirectory()) { cstr_Path = class_DirAndFileFinder.GetFilePath(); str_Path = std::wstring(cstr_Path.GetBuffer(0)); // 保存目录 cvector_param_DirsAndFiles.push_back(str_Path); // 保存目录(回调使用) cqueue_param_Dirs.push(str_Path); // 统计目录数目 g_int64_Dirs_Count.local() += 1; // 继续查找下一个目录 fun_FindDirsAndFiles(cstr_Path, cvector_param_DirsAndFiles, cqueue_param_Dirs, cqueue_param_Files); } else if (!class_DirAndFileFinder.IsDirectory()) { cstr_Path = class_DirAndFileFinder.GetFilePath(); str_Path = std::wstring(cstr_Path.GetBuffer(0)); // 保存文件 cvector_param_DirsAndFiles.push_back(str_Path); // 保存文件(回调使用) cqueue_param_Files.push(str_Path); // 统计文件数目 g_int64_Files_Count.local() += 1; } } class_DirAndFileFinder.Close(); return; }// End fun_FindDirsAndFiles() // 对目录进行操作 void fun_ppldo_Dirs(concurrency::combinable<int> & int_param_IsExit, concurrency::concurrent_queue<std::wstring> & cqueue_param_Dirs) { while (true) { if (int_param_IsExit.combine(std::plus<int>()) == 1 && cqueue_param_Dirs.empty() == true) { return; } else { if (cqueue_param_Dirs.empty() != true) { // 对目录进行操作 std::wstring str_DirPath = L""; if (cqueue_param_Dirs.try_pop(str_DirPath) == true) { // 这里可以并发处理对目标目录的操作 } } } } }// End fun_ppldo_Dirs() // 对文件进行操作 void fun_ppldo_Files(concurrency::combinable<int> & int_param_IsExit, concurrency::concurrent_queue<std::wstring> & cqueue_param_Files) { while (true) { if (int_param_IsExit.combine(std::plus<int>()) == 1 && cqueue_param_Files.empty() == true) { return; } else { if (cqueue_param_Files.empty() != true) { std::wstring str_FilePath = L""; if (cqueue_param_Files.try_pop(str_FilePath) == true) { // 这里可以并发处理对目标文件的操作 } } } } }// End fun_ppldo_Dirs() int _tmain(int argc, _TCHAR* argv[]) { // 设置输出支持中文 std::wcout.imbue(std::locale("chs")); // 初始化基本变量 __int64 int64_Elapsed; // 保存已经枚举的文件 concurrency::concurrent_vector<std::wstring> cv_DirsAndFiles; concurrency::concurrent_queue<std::wstring> cq_Dirs; concurrency::concurrent_queue<std::wstring> cq_Files; std::wcout << L"开始并行枚举所有分区的文件(枚举的过程获取文件大小)...." << std::endl; // 获取盘符 std::vector<std::wstring> vector_Drive; fun_GetAllLogicalDriveName(vector_Drive); // 开始并行执行枚举 int64_Elapsed = time_call([&] { g_int64_Dirs_Count.local() = 0; g_int64_Files_Count.local() = 0; g_int64_AllFiles_Size.local() = 0; g_int_FilesFinder_IsExit.local() = 0; // 3个任务同时并发并执行生产者与消费者的协同配合 concurrency::parallel_invoke( [&] { // 枚举所有分区的目录和文件l(生产者) concurrency::parallel_for_each(std::begin(vector_Drive), std::end(vector_Drive), [&](std::wstring str_param_DriveName){ fun_FindDirsAndFiles(str_param_DriveName.c_str(), cv_DirsAndFiles, cq_Dirs, cq_Files); }); // 设置结束标志位 g_int_FilesFinder_IsExit.local() += 1; }, [&]{ // 获取目录进行操作(消费者) fun_ppldo_Dirs(g_int_FilesFinder_IsExit, cq_Dirs); }, [&]{ // 获取文件进行操作(消费者) fun_ppldo_Files(g_int_FilesFinder_IsExit, cq_Files); } ); }); std::wcout << L"目录总数: " << g_int64_Dirs_Count.combine(std::plus<__int64>()) << std::endl; std::wcout << L"文件总数: " << g_int64_Files_Count.combine(std::plus<__int64>()) << std::endl; if (int64_Elapsed < 1000) { std::wcout << L"一共耗时: " << int64_Elapsed << L" 毫秒." << std::endl; } else { std::wcout << L"一共耗时: " << int64_Elapsed / 1000 << L" 秒." << std::endl; } // 暂停 std::cin.get(); return 0; }// End _tmain()
相关文章推荐
- 【转载】多线程--C#利用多线程实现消费者和生产者模式
- java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现
- 多线程--C#利用多线程实现消费者和生产者模式
- Java多线程-工具篇-BlockingQueue(实现生产者和消费者模式)
- Java Note: 多线程的同步(互斥锁)的方法对比,信号量锁,读写锁的实现,生产者-消费者模式的实现
- 关于网宿厦门研发中心笔试的一道PV操作题:利用java中的多线程实现生产者与消费者的同步问题
- IOS多线程使用GCD与信号量实现生产者与消费者模式
- Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型
- Java Note: 多线程的同步(互斥锁)的方法对比,信号量锁,读写锁的实现,生产者-消费者模式的实现
- Java利用BlockingQueue实现生产者和消费者模式
- Java Note: 多线程的同步(互斥锁)的方法对比,信号量锁,读写锁的实现,生产者-消费者模式的实现
- Java Note: 多线程的同步(互斥锁)的方法对比,信号量锁,读写锁的实现,生产者-消费者模式的实现
- 利用条件变量实现多线程生产者消费者问题
- 多线程---使用ManualResetEvent来控制线程间的同步(实现了消费者和生产者模式)
- 多线程实现生产者与消费者模式
- Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型
- Java多线程实现消费者/生产者模式
- java利用lock和unlock实现消费者与生产者问题(多线程)
- Linux - 多线程实现生产者与消费者模式
- Java多线程系列-Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型