您的位置:首页 > Web前端 > React

一个windows下基于select多路分离器的Reactor模型

2010-12-26 14:30 531 查看
1.封装了windows下同步变量,包括实现condition,monitor.本来想封装线程,想着还要封装类似boost::bind或者signal/solt的,放弃了.

2.该Reactor模型不排除在同一个socket上投递多个eventhandler.一个eventhandler只针对一个socket事件处理(可读,可写.当然事件可以用掩码,本例没有采用这种做法.)

3.采用VS2005编译.工程携带了一个测试server和一个只用于发送数据的client,一个同时收发的client

4.代码摘录:

Eventhandler

class sCEventhandler
{
public:
sCEventhandler(void);
virtual ~sCEventhandler(void);

virtual int Handler_Read(){return 1;}
virtual int Handler_Write(){return 1;}
virtual int Handler_Expet(){return 1;}
virtual void Handler_Closed(){delete this;}
};

enum HandlerType
{
HANDLE_CONNECT,
HANDLE_ACCEPT,
HANDLE_WRITE,
HANDLE_READ,
HANDLE_EXPT
};

class sCSocketHandler : public sCEventhandler
{
public:
sCSocketHandler(){};
sCSocketHandler(sCSocket s,HandlerType e);
void SetSocket(sCSocket s,HandlerType e){m_sk = s;m_e = e;}
sCSocket GetSocket();
HandlerType GetType();

protected:
sCSocket  m_sk;
HandlerType m_e;
};


同步变量:

class MySyncObject
{
public:
MySyncObject(void);
virtual ~MySyncObject(void);
virtual BOOL lock(DWORD time = INFINITE);
virtual BOOL unlock() = 0;
virtual BOOL unlock(long count,long* preCount = NULL){return TRUE;}
operator HANDLE(){ return m_handle;}
protected:
HANDLE m_handle;
};

class MyMutex : public MySyncObject
{
public:
MyMutex( LPSECURITY_ATTRIBUTES lpas = NULL,BOOL bOwner = TRUE, LPCTSTR name = NULL)
{
m_handle = ::CreateMutex(lpas,bOwner,name);
}
BOOL unlock()
{
return ::ReleaseMutex(m_handle);
}

virtual ~MyMutex()
{
}
};

class MySemphore : public MySyncObject
{
public:
MySemphore( long lInital = 1,long lMax = 1,LPSECURITY_ATTRIBUTES lpas = NULL,LPCTSTR name = NULL)
{
m_handle = ::CreateSemaphore(lpas,lInital,lMax,name);
}
BOOL unlock()
{
return unlock(1);
}
BOOL unlock(long count,long* preCount = NULL)
{
return ::ReleaseSemaphore(m_handle,count,preCount);
}
};

class MyCondition
{
public:
MyCondition() : m_Semhpore(0,1),m_count(0){}
void wait( MyMutex& mutex,DWORD time = INFINITE);
void noticeOne();
void noticeAll();
private:
MySemphore m_Semhpore;
long m_count;
};

class MyMonitor : public MySyncObject
{
public:
MyMonitor():m_mutex(NULL,FALSE){}
BOOL lock(DWORD time  = INFINITE );
BOOL unlock();
void wait(DWORD time = INFINITE);
void noticeOne();
void noticeAll();
private:
MyMutex m_mutex;
MyCondition m_condition;
};


Reactor主执行函数:

void sCReactor::Run()
{
//没有listen或connect的阻塞在此
if ( m_FdRead.fd_count == 0 && m_FdWrite.fd_count==0 && m_FdExept.fd_count==0)
{
m_mutex.lock();
m_condition.wait(m_mutex);
m_mutex.unlock();
}
while(true)
{
fd_set fd_read,fd_write,fd_exept;
memcpy(&fd_read,&m_FdRead,sizeof(m_FdRead));
memcpy(&fd_write,&m_FdWrite,sizeof(m_FdWrite));
memcpy(&fd_exept,&m_FdExept,sizeof(m_FdExept));
int ret = select(0,&fd_read,&fd_write,&fd_exept,NULL);
if ( ret ==SOCKET_ERROR)
{

}
else if(ret > 0)
{
m_mutex.lock();
while( ret-->0)
{
//在一个socket上能同时发生可读/可写
// 注意: 一个socket上对应多个handler,可能导致不应该处理的handler调用了,而应该调用的handler没有被调用.
//注意二:handler的处理函数中,如果有register或者removehandler的操作,导致iterator自增操作出现问题
for (std::set<sCSocketHandler*>::iterator it = m_handlers.begin();it!=m_handlers.end();++it)
{
sCSocketHandler* handler = *it;
int result = 0;
bool bProcess = false;
HandlerType e = handler->GetType();
if ( FD_ISSET(handler->GetSocket(),&fd_read))
{
if ( e == HANDLE_ACCEPT || e == HANDLE_READ)
{
result = handler->Handler_Read();
bProcess = true;
FD_CLR(handler->GetSocket(),&fd_read);
}
}
else if ( FD_ISSET(handler->GetSocket(),&fd_write))
{
if ( e == HANDLE_CONNECT || e == HANDLE_WRITE)
{
result = handler->Handler_Write();
bProcess = true;
FD_CLR(handler->GetSocket(),&fd_write);
}
}
else if( FD_ISSET(handler->GetSocket(),&fd_exept))
{
result = handler->Handler_Expet();
bProcess = true;
FD_CLR(handler->GetSocket(),&fd_exept);
}
if ( bProcess)
{
if ( result < 0)
{
if ( result == -2)
{
//-2定义为socket关闭,相关的handler都要清掉
sCSocket earseSocket = handler->GetSocket();
for ( std::set<sCSocketHandler*>::iterator it = m_handlers.begin();it!=m_handlers.end();)
{
sCSocketHandler* earseHandler = *it;
if ( earseHandler->GetSocket() == earseSocket)
{
RemoveHandler(earseHandler);
it = m_handlers.begin();
earseHandler->Handler_Closed();
}
else
{
++it;
}

}
}
else
{
RemoveHandler(handler);
}
}
break;
}
}
}
m_mutex.unlock();
}
}
}


测试服务端代码:

void main(int argc,char** argv)
{
SocketGuard guard;
sCReactor reactor;
CServerAppector server(&reactor);
server.Open(sCSocketAddress("127.0.0.1",7000),&reactor);
while( true)
{
Sleep(100000);
}
}


完整工程例子下载链接:http://download.csdn.net/source/2938873
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: