一个人的战争(3) : 我眼中的异步与非阻塞
2006-09-06 21:49
225 查看
最近在尝试封装个网络工具包,从Socket开始,希望全部自己动手并且能提供一个跨平台的接口,哈哈,新手的通病就是求大求全,不过有一点,就是学习一些现代操作系统共有的特性的时候,我认为站在多个差不多的平台的角度来看待同一个问题要比单一的观察一个OS提供的API要好的多.
闲话少说, 当我进行到WIN32的OVERLAPPED I/O和IOCP的时候突然发现我无法继续抽象了,众所周知,Berkeley socket并没有提供一个异步操作的网络I/O函数,也就是无法提供一个相同的界面,这时候我反问自己,到底什么是异步呢?返回头看我封装的一部分
class IOCP : private NonCopyable
...{
private:
HANDLE m_handle;
size_t m_max_worker_thread;
size_t m_curr_worker_thread;
bool m_is_running;
std::vector<ThreadSpace::ThreadX*> m_thread_hdl;
public:
IOCP(size_t max_worker_thread = 2);
virtual ~IOCP();
bool create();//size_t worker_thread_num);
void close();
int increase_worker_thread(size_t n); //增加N个workerthread;
int decrease_worker_thread(size_t n); //减少n个workerthread;
public:
bool associate_handle_with_iocp(HANDLE hdl, void* key);
bool post_a_status(t_u_long num, void *key, void *async_info);
public:
t_u_int32 worker_thread(void *arg);
public:
virtual bool error_handle(t_u_long num, void *key, void *pdata);// = 0;
virtual void real_worker(t_u_long num, void *key, void *pdata) = 0;
};
IOCP::IOCP(size_t max_worker_thread) :
m_handle(INVALID_HANDLE_VALUE),
m_max_worker_thread(max_worker_thread),
m_curr_worker_thread(0),
m_is_running(false)
...{
}
IOCP::~IOCP()
...{
try...{
close();
}catch(...)
...{
}
}
bool IOCP::associate_handle_with_iocp(HANDLE hdl, void *key)
...{
if(!m_is_running) return false;
return (::CreateIoCompletionPort(hdl, m_handle, (ULONG_PTR)key, 0) != NULL);
}
bool IOCP::post_a_status(t_u_long num, void *key, void *async_info)
...{
if(!m_is_running) return false;
return (::PostQueuedCompletionStatus(m_handle, num, (ULONG_PTR)key, (OVERLAPPED*)async_info) == TRUE);
}
int IOCP::increase_worker_thread(size_t n)
...{
if(!m_is_running) return -1;
using namespace ThreadSpace;
ThreadAttr attr;
attr.create_flags = 0;
attr.stack_size = 0;
size_t i;
for( i = 0; i < n; ++i)...{
if(m_curr_worker_thread == m_max_worker_thread) break;
void *p = 0;
ThreadX* pthread = MFExecutor(this, &IOCP::worker_thread, p, attr);
m_thread_hdl.push_back(pthread);
++m_curr_worker_thread;
}
return i;
}
int IOCP::decrease_worker_thread(size_t n)
...{
if(!m_is_running) return -1;
size_t i;
for( i = 0; i < n; ++i)...{
if(m_curr_worker_thread == 0) break;
post_a_status(OP_SHUT_DOWN, (void*)OP_SHUT_DOWN, (void*)OP_SHUT_DOWN);
--m_curr_worker_thread;
}
for(size_t j = 0; j < i; ++j)...{ //将所有结束的线程句病剔除掉
t_u_long idx = wait_multi_thread(&m_thread_hdl[0], m_thread_hdl.size(), false);
free_thread(m_thread_hdl[idx - WAIT_OBJECT_0]);
m_thread_hdl.erase(m_thread_hdl.begin() + idx - WAIT_OBJECT_0);
}
return i;
}
bool IOCP::create()
...{
if(m_is_running) return false;
m_handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
assert_ex((m_handle != NULL), IOModelExpt(::GetLastError()));
m_is_running = true;
increase_worker_thread(m_max_worker_thread);
return true;
}
void IOCP::close()
...{
if(!m_is_running) return;
if(m_curr_worker_thread != 0)...{
decrease_worker_thread(m_curr_worker_thread);
}
CloseHandle(m_handle);
m_is_running = false;
}
t_u_int32 IOCP::worker_thread(void *arg)
...{
while(true)...{
t_u_long num = 0;
void *pdata;
void *pkey = 0;
bool state = (GetQueuedCompletionStatus(m_handle,
&num,
(DWORD*)&pkey,
(LPOVERLAPPED*)&pdata,
INFINITE
) == TRUE);
if(!state)
...{
if(error_handle(num, pkey, pdata))
...{
continue;
}else
...{
break;
}
}else
...{
if( num == OP_SHUT_DOWN && (t_u_long)pkey == OP_SHUT_DOWN && (t_u_long)pdata == OP_SHUT_DOWN)
break;
real_worker(num, pkey, pdata);
}
}
return 0;
}
bool IOCP::error_handle(t_u_long num, void *key, void *pdata)
...{
t_u_long err_code = ::GetLastError();
if(err_code == WAIT_TIMEOUT)
...{
return true;
}else
...{
return false;
}
}
}
请注意
u_int32 IOCP::worker_thread(void *arg)
这个函数是启动后当做N个线程来运行的,
bool state = (GetQueuedCompletionStatus(m_handle,
&num,
(DWORD*)&pkey,
(LPOVERLAPPED*)&pdata,
INFINITE
) == TRUE);
函数执行到这里是等待有无I/O操作,这时候我发现,所谓的“异步”这个词汇的语义是“协作”,这个“协作”的语义无所谓线程或者进程等等,这时候那个所谓的异步IO函数类似WSARecv等,的语义已经不是接受数据了,而应该是发起一个接受数据的信号,告诉正在等待执行实际IO操作的地方执行动作,所谓的IOCP的handle, key和overlapped结构等都应该被归为同步点信息,如果这样理解的话,我用select和非阻塞recv照样能封装出和IOCP相同界面的类,所以我定义了一个
class AsyncInfo
...{
public:
::WSAOVERLAPPED overlapped;
char buf[MAXIMUMPACKAGESIZE];
size_t len;
public:
AsyncInfo();
~AsyncInfo();
void clear();
};
这个结构当然是win32下的,给WSARecv等函数调用,说白了就是发送一个同步点信息,我并没有确切的用select或者linux下的epoll实做一个和iocp相同接口的类,但是我认为经过繁杂的处理后还是可以完成的。
下面说下我认为的非阻塞与异步的区别,这也是让我头疼了很久的东西
如上所述,所谓异步,就是协作,异步函数的语义已经退化为向某个地方发起一个执行指令了。而阻塞与非阻塞是针对一个函数说的,所谓阻塞的意义是当这个函数未完成则无法继续向下执行,而非阻塞是一种试探性的语义,类似非阻塞的send就是可以写么?如果可以则写入,否则返回错误,这和异步完全是两个东西,但是总容易让人搞混,以上是一点心得,请高人指定.
闲话少说, 当我进行到WIN32的OVERLAPPED I/O和IOCP的时候突然发现我无法继续抽象了,众所周知,Berkeley socket并没有提供一个异步操作的网络I/O函数,也就是无法提供一个相同的界面,这时候我反问自己,到底什么是异步呢?返回头看我封装的一部分
class IOCP : private NonCopyable
...{
private:
HANDLE m_handle;
size_t m_max_worker_thread;
size_t m_curr_worker_thread;
bool m_is_running;
std::vector<ThreadSpace::ThreadX*> m_thread_hdl;
public:
IOCP(size_t max_worker_thread = 2);
virtual ~IOCP();
bool create();//size_t worker_thread_num);
void close();
int increase_worker_thread(size_t n); //增加N个workerthread;
int decrease_worker_thread(size_t n); //减少n个workerthread;
public:
bool associate_handle_with_iocp(HANDLE hdl, void* key);
bool post_a_status(t_u_long num, void *key, void *async_info);
public:
t_u_int32 worker_thread(void *arg);
public:
virtual bool error_handle(t_u_long num, void *key, void *pdata);// = 0;
virtual void real_worker(t_u_long num, void *key, void *pdata) = 0;
};
IOCP::IOCP(size_t max_worker_thread) :
m_handle(INVALID_HANDLE_VALUE),
m_max_worker_thread(max_worker_thread),
m_curr_worker_thread(0),
m_is_running(false)
...{
}
IOCP::~IOCP()
...{
try...{
close();
}catch(...)
...{
}
}
bool IOCP::associate_handle_with_iocp(HANDLE hdl, void *key)
...{
if(!m_is_running) return false;
return (::CreateIoCompletionPort(hdl, m_handle, (ULONG_PTR)key, 0) != NULL);
}
bool IOCP::post_a_status(t_u_long num, void *key, void *async_info)
...{
if(!m_is_running) return false;
return (::PostQueuedCompletionStatus(m_handle, num, (ULONG_PTR)key, (OVERLAPPED*)async_info) == TRUE);
}
int IOCP::increase_worker_thread(size_t n)
...{
if(!m_is_running) return -1;
using namespace ThreadSpace;
ThreadAttr attr;
attr.create_flags = 0;
attr.stack_size = 0;
size_t i;
for( i = 0; i < n; ++i)...{
if(m_curr_worker_thread == m_max_worker_thread) break;
void *p = 0;
ThreadX* pthread = MFExecutor(this, &IOCP::worker_thread, p, attr);
m_thread_hdl.push_back(pthread);
++m_curr_worker_thread;
}
return i;
}
int IOCP::decrease_worker_thread(size_t n)
...{
if(!m_is_running) return -1;
size_t i;
for( i = 0; i < n; ++i)...{
if(m_curr_worker_thread == 0) break;
post_a_status(OP_SHUT_DOWN, (void*)OP_SHUT_DOWN, (void*)OP_SHUT_DOWN);
--m_curr_worker_thread;
}
for(size_t j = 0; j < i; ++j)...{ //将所有结束的线程句病剔除掉
t_u_long idx = wait_multi_thread(&m_thread_hdl[0], m_thread_hdl.size(), false);
free_thread(m_thread_hdl[idx - WAIT_OBJECT_0]);
m_thread_hdl.erase(m_thread_hdl.begin() + idx - WAIT_OBJECT_0);
}
return i;
}
bool IOCP::create()
...{
if(m_is_running) return false;
m_handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
assert_ex((m_handle != NULL), IOModelExpt(::GetLastError()));
m_is_running = true;
increase_worker_thread(m_max_worker_thread);
return true;
}
void IOCP::close()
...{
if(!m_is_running) return;
if(m_curr_worker_thread != 0)...{
decrease_worker_thread(m_curr_worker_thread);
}
CloseHandle(m_handle);
m_is_running = false;
}
t_u_int32 IOCP::worker_thread(void *arg)
...{
while(true)...{
t_u_long num = 0;
void *pdata;
void *pkey = 0;
bool state = (GetQueuedCompletionStatus(m_handle,
&num,
(DWORD*)&pkey,
(LPOVERLAPPED*)&pdata,
INFINITE
) == TRUE);
if(!state)
...{
if(error_handle(num, pkey, pdata))
...{
continue;
}else
...{
break;
}
}else
...{
if( num == OP_SHUT_DOWN && (t_u_long)pkey == OP_SHUT_DOWN && (t_u_long)pdata == OP_SHUT_DOWN)
break;
real_worker(num, pkey, pdata);
}
}
return 0;
}
bool IOCP::error_handle(t_u_long num, void *key, void *pdata)
...{
t_u_long err_code = ::GetLastError();
if(err_code == WAIT_TIMEOUT)
...{
return true;
}else
...{
return false;
}
}
}
请注意
u_int32 IOCP::worker_thread(void *arg)
这个函数是启动后当做N个线程来运行的,
bool state = (GetQueuedCompletionStatus(m_handle,
&num,
(DWORD*)&pkey,
(LPOVERLAPPED*)&pdata,
INFINITE
) == TRUE);
函数执行到这里是等待有无I/O操作,这时候我发现,所谓的“异步”这个词汇的语义是“协作”,这个“协作”的语义无所谓线程或者进程等等,这时候那个所谓的异步IO函数类似WSARecv等,的语义已经不是接受数据了,而应该是发起一个接受数据的信号,告诉正在等待执行实际IO操作的地方执行动作,所谓的IOCP的handle, key和overlapped结构等都应该被归为同步点信息,如果这样理解的话,我用select和非阻塞recv照样能封装出和IOCP相同界面的类,所以我定义了一个
class AsyncInfo
...{
public:
::WSAOVERLAPPED overlapped;
char buf[MAXIMUMPACKAGESIZE];
size_t len;
public:
AsyncInfo();
~AsyncInfo();
void clear();
};
这个结构当然是win32下的,给WSARecv等函数调用,说白了就是发送一个同步点信息,我并没有确切的用select或者linux下的epoll实做一个和iocp相同接口的类,但是我认为经过繁杂的处理后还是可以完成的。
下面说下我认为的非阻塞与异步的区别,这也是让我头疼了很久的东西
如上所述,所谓异步,就是协作,异步函数的语义已经退化为向某个地方发起一个执行指令了。而阻塞与非阻塞是针对一个函数说的,所谓阻塞的意义是当这个函数未完成则无法继续向下执行,而非阻塞是一种试探性的语义,类似非阻塞的send就是可以写么?如果可以则写入,否则返回错误,这和异步完全是两个东西,但是总容易让人搞混,以上是一点心得,请高人指定.
相关文章推荐
- 网络编程之同步,阻塞,异步,非阻塞
- 互联网我来了 -- 2. js中"异步/阻塞"等概念的简析
- 同步与异步、阻塞与非阻塞
- IO - 同步,异步,阻塞,非阻塞
- 同步异步阻塞非阻塞杂记
- 阻塞、非阻塞或者同步、异步概念
- 网络编程 同步,阻塞,异步,非阻塞
- 阻塞 非阻塞与同步 异步
- socket阻塞与非阻塞,同步与异步、I/O模型
- 同步、异步、阻塞、非阻塞
- 处理大并发之一 对异步非阻塞的理解
- 聊聊iOS下block + GCD (Grand Central Dispatch)实现异步非阻塞
- 同步异步阻塞非阻塞Reactor模式和Proactor模式 (目前JAVA的NIO就属于同步非阻塞IO)
- 什么叫阻塞,非阻塞,异步,同步?
- 同步,异步,阻塞,非阻塞
- 阻塞 非阻塞 同步 异步
- 关于同步、异步、阻塞、非阻塞的几点理解
- 并发,同步,异步,阻塞,非阻塞,线程
- 同步、异步、与阻塞、非阻塞的区别
- 深入理解并发/并行,阻塞/非阻塞,同步/异步