Windows核心编程学习七:用户模式下的线程同步
2013-07-08 14:57
489 查看
注:源码为学习《Windows核心编程》的一些尝试,非原创。若能有助于一二访客,幸甚。
1.基本框架
/* * File: CQueue.cpp * Time: 2013-07-08 * 描述: 学习《Windows核心编程》 */ #include "Queue.h" #include <tchar.h> #include <windowsx.h> #include "resource.h" /************************************************************************/ #define chHANDLE_DLGMSG(hWnd, message, fn) \ case (message): return (SetDlgMsgResult(hWnd, uMsg, \ HANDLE_##message((hWnd), (wParam), (lParam), (fn)))) // Sets the dialog box icons inline void chSETDLGICONS(HWND hWnd, int idi) { SendMessage(hWnd, WM_SETICON, ICON_BIG, (LPARAM) LoadIcon((HINSTANCE) GetWindowLongPtr(hWnd, GWLP_HINSTANCE), MAKEINTRESOURCE(idi))); SendMessage(hWnd, WM_SETICON, ICON_SMALL, (LPARAM) LoadIcon((HINSTANCE) GetWindowLongPtr(hWnd, GWLP_HINSTANCE), MAKEINTRESOURCE(idi))); } /************************************************************************/ BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) { chSETDLGICONS(hWnd, IDI_QUEUE); return TRUE; } void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) { switch (id) { case IDCANCEL: EndDialog(hWnd, id); break; } } INT_PTR WINAPI Dlg_Proc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) { switch (uMsg) { chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hWnd, WM_COMMAND, Dlg_OnCommand); } return FALSE; } /*************************************************************************/ int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR pszCmdLine, int) { DialogBox(hInstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc); return 0; }
2.队列的实现
此处的队列并不具有线程安全性,由客户线程和服务器线程来负责他们访问的全局队列进行同步/* * File: CQueue.h * Time: 2013-07-08 * 描述: 学习《Windows核心编程》 */ #ifndef _CQUEUE_H_ #define _CQUEUE_H_ #include <windows.h> class CQueue { public: struct ELEMENT { int m_nThreadNum; // 线程号 int m_nRequestNum; // 请求号 }; typedef ELEMENT* PELEMENT; private: struct INNER_ELEMENT { int m_nStamp; // 记录插入顺序,添加元素时递增,0表示为空 ELEMENT m_element; }; typedef INNER_ELEMENT* PINNER_ELEMENT; private: PINNER_ELEMENT m_pElements; // 队列元素数组 int m_nMaxElements; // 数组长度 int m_nCurrentStamp; // 记录当前插入的元素,添加元素时递增 private: int GetFreeSlot(); // 返回第一个m_nStamp为0(表示内容已经被读取或者内容为空)的元素索引 int GetNextSlot(int nThreadNum); // 返回需要线程nThreadNum处理的m_nStamp值最小(表示最早添加)且不为0(空闲或已读)的元素索引 public: CQueue(int nMaxElements); ~CQueue(); BOOL IsFull(); // 队列是否已满 BOOL IsEmpty(int nThreadNum); // 队列中是否存在线程号为nThreadNum处理的元素 void AddElement(ELEMENT e); // 添加元素到队列 BOOL GetNewElement(int nThreadNum, ELEMENT& e); // 获取队列中需要线程号为nThreadNum处理的最早插入的元素 }; #endif
CQueue::CQueue(int nMaxElements) { // 为队列分配空间并初始化队列元素 m_pElements = (PINNER_ELEMENT)HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT) * nMaxElements); ZeroMemory(m_pElements, sizeof(INNER_ELEMENT) * nMaxElements); // 初始化元素计数 m_nCurrentStamp = 0; // 保存队列最大元素数 m_nMaxElements = nMaxElements; } CQueue::~CQueue() { // 释放空间 HeapFree(GetProcessHeap(), 0, m_pElements); } // 队列是否已满 BOOL CQueue::IsFull() { return (GetFreeSlot() == -1); } // 队列中是否存在线程号为nThreadNum处理的元素 BOOL CQueue::IsEmpty(int nThreadNum) { return (GetNextSlot(nThreadNum) == -1); } // 返回第一个m_nStamp为0(表示内容已经被读取或者内容为空)的元素索引 int CQueue::GetFreeSlot() { // 寻找stamp为0 的元素索引 for (int current = 0; current < m_nMaxElements; current++) { if (m_pElements[current].m_nStamp == 0) return current; } // 没有找到空余位置 return -1; } // 返回需要线程nThreadNum处理的m_nStamp值最小(表示最早添加)且不为0(空闲或已读)的元素索引 int CQueue::GetNextSlot(int nThreadNum) { // 默认没有需要线程nThreadNum处理的元素 int firstSlot = -1; // m_nCurrentStamp为最后添加的元素的stamp,为最大的stamp int firstStamp = m_nCurrentStamp + 1; for (int current = 0; current < m_nMaxElements; current++) { if ((m_pElements[current].m_nStamp != 0) && // 不是空位置 ((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum) && // 需要线程nThreadNum处理 (m_pElements[current].m_nStamp < firstStamp)) // 比当前找到的stamp小 { firstStamp = m_pElements[current].m_nStamp; // 更新 firstSlot = current; } } return firstSlot; } // 添加元素到队列 void CQueue::AddElement(ELEMENT e) { // 如果队列已满,返回 int nFreeSlot = GetFreeSlot(); if (nFreeSlot == -1) return; // 拷贝元素 m_pElements[nFreeSlot].m_element = e; // 记录插入顺序 m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp; } // 获取队列中需要线程号为nThreadNum处理的最早插入的元素 BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT& e) { // 没有找到需要线程nThreadNum处理的元素 int nNewSlot = GetNextSlot(nThreadNum); if (nNewSlot == -1) return FALSE; // 保存找到的元素 e = m_pElements[nNewSlot].m_element; // 标记为已读 m_pElements[nNewSlot].m_nStamp = 0; return TRUE; }
3.添加字符串函数
void AddText(HWND hWndLB, PCTSTR pszFormat, ...) { va_list argList; va_start(argList, pszFormat); TCHAR sz[20 * 1024]; _vstprintf_s(sz, _countof(sz), pszFormat, argList); ListBox_SetCurSel(hWndLB, ListBox_AddString(hWndLB, sz)); va_end(argList); }
void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) { switch (id) { case IDCANCEL: EndDialog(hWnd, id); break; case IDC_BTN_STOP: HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENT); AddText(hWndLB, TEXT("测试 AddText() 函数, 结束键被按下了...")); Button_Enable(hWndCtl, FALSE); break; } }
4.客户(写者)线程
CQueue g_q(10); volatile LONG g_fShutdown; HWND g_hWnd; SRWLOCK g_srwLock; CONDITION_VARIABLE g_cvReadyToConsume; // 写者发出信号,表示可以读取 CONDITION_VARIABLE g_cvReadyToProduce; // 读者发出信号,表示可以写入 // Handles to all reader/writer threads HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS]; // Number of reader/writer threads int g_nNumThreads = 0;
DWORD WINAPI WriterThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); // 线程号 HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENT); int nRequestNum = 1; // 请求号 while (1) { CQueue::ELEMENT e = { nThreadNum, nRequestNum }; // 以独占模式获得SRWLock,若锁已被别的线程占用,无论占用锁的线程是服务器线程还是 // 客户线程,当前线程都会被阻塞在AcquireSRWLockExclusive中,直到锁被释放 AcquireSRWLockExclusive(&g_srwLock); if (g_q.IsFull()) // 队列已满 { AddText(hWndLB, TEXT("线程[%d] 队列已满: 不能添加元素%d"), nThreadNum, nRequestNum); // 睡眠,等待读取者线程读取一个元素,腾出一个位置容纳新元素 SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, INFINITE, 0); } // 添加元素 g_q.AddElement(e); AddText(hWndLB, TEXT("线程[%d] 添加元素%d"), nThreadNum, nRequestNum); // 释放锁 ReleaseSRWLockExclusive(&g_srwLock); // 通知服务器线程有数据需要处理 WakeAllConditionVariable(&g_cvReadyToConsume); Sleep(1500); nRequestNum++; } AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum); return 0; }
// This macro function calls the C runtime's _beginthreadex function. // The C runtime library doesn't want to have any reliance on Windows' data // types such as HANDLE. This means that a Windows programmer needs to cast // values when using _beginthreadex. Since this is terribly inconvenient, // I created this macro to perform the casting. typedef unsigned (__stdcall *PTHREAD_START) (void *); #define chBEGINTHREADEX(psa, cbStackSize, pfnStartAddr, \ pvParam, dwCreateFlags, pdwThreadId) \ ((HANDLE)_beginthreadex( \ (void *) (psa), \ (unsigned) (cbStackSize), \ (PTHREAD_START) (pfnStartAddr), \ (void *) (pvParam), \ (unsigned) (dwCreateFlags), \ (unsigned *) (pdwThreadId)))
BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) { chSETDLGICONS(hWnd, IDI_QUEUE); g_hWnd = hWnd; // 初始化SRWLock InitializeSRWLock(&g_srwLock); // 初始化条件变量 InitializeConditionVariable(&g_cvReadyToConsume); InitializeConditionVariable(&g_cvReadyToProduce); g_fShutdown = FALSE; // 创建写者线程 DWORD dwThreadID; for (int i = 0; i < 4; i++) g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID)(INT_PTR) i, 0, &dwThreadID); return TRUE; }
5.服务器(读者)线程
BOOL ConsumeElement(int nThreadNum, int nRequestNum, HWND hWndLB) { // 以共享模式获得srwLock,如果锁已经被客户端线程以独占模式占用,函数会阻塞 // 如果锁已经被另一个服务器线程以共享模式获得,运行对请求进行处理 AcquireSRWLockShared(&g_srwLock); // 队列不存在该服务器线程处理的元素,则阻塞,等待一个客户线程产生新的元素 // 而触发g_cvReadyToConsume条件变量为止 while (g_q.IsEmpty(nThreadNum)) { AddText(hWndLB, TEXT("服务器线程[%d] 没有元素可处理"), nThreadNum); SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED); } // 获取新元素 CQueue::ELEMENT e; g_q.GetNewElement(nThreadNum, e); // 释放锁 ReleaseSRWLockShared(&g_srwLock); AddText(hWndLB, TEXT("服务器线程[%d]处理客户线程%d生产的元素%d"), nThreadNum, e.m_nThreadNum, e.m_nRequestNum); // 读取一个元素完成,新增一个空位,通知客户端线程可以继续生产 WakeConditionVariable(&g_cvReadyToProduce); return TRUE; } DWORD WINAPI ReaderThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVER); int nRequestNum = 1; while (1) { if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB)) return 0; Sleep(2500); } AddText(hWndLB, TEXT("服务器线程[%d] 结束"), nThreadNum); return 0; }
BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) { chSETDLGICONS(hWnd, IDI_QUEUE); g_hWnd = hWnd; // 初始化SRWLock InitializeSRWLock(&g_srwLock); // 初始化条件变量 InitializeConditionVariable(&g_cvReadyToConsume); InitializeConditionVariable(&g_cvReadyToProduce); g_fShutdown = FALSE; // 创建写者线程 DWORD dwThreadID; for (int i = 0; i < 4; i++) g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID)(INT_PTR) i, 0, &dwThreadID); // 创建读者线程 for (int i = 0; i < 2; i++) g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, ReaderThread, (PVOID)(INT_PTR) i, 0, &dwThreadID); return TRUE; }
6.停止处理
DWORD WINAPI WriterThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); // 线程号 HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENT); for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) { CQueue::ELEMENT e = { nThreadNum, nRequestNum }; // 以独占模式获得SRWLock,若锁已被别的线程占用,无论占用锁的线程是服务器线程还是 // 客户线程,当前线程都会被阻塞在AcquireSRWLockExclusive中,直到锁被释放 AcquireSRWLockExclusive(&g_srwLock); if (g_q.IsFull()) // 队列已满 { AddText(hWndLB, TEXT("客户线程[%d]队列已满:不能添加元素%d"), nThreadNum, nRequestNum); // 睡眠,等待读取者线程读取一个元素,腾出一个位置容纳新元素 SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, INFINITE, 0); } if (g_fShutdown) { AddText(hWndLB, TEXT("客户线程[%d] 结束"), nThreadNum); // 释放锁 ReleaseSRWLockExclusive(&g_srwLock); // 告诉所有阻塞的线程结束 WakeAllConditionVariable(&g_cvReadyToProduce); return 0; } else { // 添加元素 g_q.AddElement(e); AddText(hWndLB, TEXT("客户线程[%d]添加元素%d"), nThreadNum, nRequestNum); // 释放锁 ReleaseSRWLockExclusive(&g_srwLock); // 通知服务器线程有数据需要处理 WakeAllConditionVariable(&g_cvReadyToConsume); Sleep(1500); } } AddText(hWndLB, TEXT("客户线程[%d] 结束"), nThreadNum); return 0; } BOOL ConsumeElement(int nThreadNum, int nRequestNum, HWND hWndLB) { // 以共享模式获得srwLock,如果锁已经被客户端线程以独占模式占用,函数会阻塞 // 如果锁已经被另一个服务器线程以共享模式获得,运行对请求进行处理 AcquireSRWLockShared(&g_srwLock); // 队列不存在该服务器线程处理的元素,则阻塞,等待一个客户线程产生新的元素 // 而触发g_cvReadyToConsume条件变量为止 while (g_q.IsEmpty(nThreadNum)) { AddText(hWndLB, TEXT("服务器线程[%d] 没有元素可处理"), nThreadNum); SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED); } if (g_fShutdown) { AddText(hWndLB, TEXT("服务器线程[%d] 结束"), nThreadNum); // 释放锁 ReleaseSRWLockShared(&g_srwLock); // 通知另一个服务器线程结束 WakeConditionVariable(&g_cvReadyToConsume); return FALSE; } // 获取新元素 CQueue::ELEMENT e; g_q.GetNewElement(nThreadNum, e); // 释放锁 ReleaseSRWLockShared(&g_srwLock); AddText(hWndLB, TEXT("服务器线程[%d]处理客户线程%d生产的元素%d"), nThreadNum, e.m_nThreadNum, e.m_nRequestNum); // 读取一个元素完成,新增一个空位,通知客户端线程可以继续生产 WakeConditionVariable(&g_cvReadyToProduce); return TRUE; } DWORD WINAPI ReaderThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVER); for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) { if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB)) return 0; Sleep(2500); } AddText(hWndLB, TEXT("服务器线程[%d] 结束"), nThreadNum); return 0; }
void StopProcessing() { if (!g_fShutdown) { // 告诉所有线程结束 InterlockedExchange(&g_fShutdown, TRUE); // 释放所有在等待条件变量的线程 WakeAllConditionVariable(&g_cvReadyToConsume); WakeAllConditionVariable(&g_cvReadyToProduce); // 等待所有线程关闭,然后清理 WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE); // 清理内核对象 while (g_nNumThreads--) CloseHandle(g_hThreads[g_nNumThreads]); AddText(GetDlgItem(g_hWnd, IDC_SERVER), TEXT("----------------")); AddText(GetDlgItem(g_hWnd, IDC_CLIENT), TEXT("----------------")); } } DWORD WINAPI StoppingThread(PVOID pvParam) { StopProcessing(); return 0; } void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) { switch (id) { case IDCANCEL: EndDialog(hWnd, id); break; case IDC_BTN_STOP: // 结束处理不能在UI线程中调用,否则导致死锁 DWORD dwThreadID; CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread, NULL, 0, &dwThreadID)); Button_Enable(hWndCtl, FALSE); break; } } INT_PTR WINAPI Dlg_Proc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) { switch (uMsg) { chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hWnd, WM_COMMAND, Dlg_OnCommand); } return FALSE; } /*************************************************************************/ int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR pszCmdLine, int) { DialogBox(hInstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc); StopProcessing(); return 0; }
相关文章推荐
- windows核心编程-8.用户模式下的线程同步
- windows 核心编程之8 用户模式下的线程同步
- windows 核心编程(用户模式下的线程同步)
- Windows via C/C++ 学习(16)用户模式下的线程同步(一)
- Windows核心编程学习九:利用内核对象进行线程同步
- 核心编程笔记8——用户模式下的线程同步
- Windows核心编程学习一:使用DialogBoxParam显示模式对话框
- Windows核心编程学习五:进程的环境变量
- windows 核心编程之9 内核对象用于线程同步
- ASP.NET 3.5核心编程学习笔记(9):用户配置文件
- 学习笔记之用户模式下的线程同步
- Windows Via C/C++:用户模式下的线程同步——Cache行
- Windows Via C/C++:用户模式下的线程同步——轻量级读写锁(Slim Reader-Writer Locks)
- Windows核心编程学习笔记 第二部分 完成编程任务 第8章 用户模式下的线程同步
- Windows Via C/C++:用户模式下的线程同步——概述
- Windows Via C/C++:用户模式下的线程同步——Condition Variables 条件变量
- Windows核心编程学习三:利用专有命名空间实现单一实例
- Windows核心编程学习笔记--第15章
- Windows核心编程学习笔记--进程和线程的优先级
- 基于Visual C++之Windows核心编程代码分析(2)实现Windows用户管理