Reactor和Proactor模式下带有自动重连机制的客户端实现
2011-06-30 09:53
204 查看
最近终于有点时间,趁着重写服务器重连部分,重新写了一个支持链路重连机制的客户端模块。先说Reactor的,Reactor自动重连比较简单,因为是同步的。
为了实现以上功能,首先要添加一个管理类(CClientReConnectManager),管理所有已经连接和还没有连接的对象。然后添加一个单元类(CReactorClientInfo),这个单元类负责管理指定链接对象的信息。
CClientReConnectManager包含了以下方法
bool Init(ACE_Reactor* pReactor);
bool Connect(int nServerID, const char* pIP, int nPort, IClientMessage* pClientMessage);
bool Close(int nServerID); //关闭连接
bool ConnectErrorClose(int nServerID); //由内部错误引起的失败,由ProConnectClient调用
bool SendData(int nServerID, ACE_Message_Block* pmblk); //发送数据
bool SetHandler(int nServerID, CConnectClient* pConnectClient); //将指定的CProConnectClient*绑定给nServerID
IClientMessage* GetClientMessage(int nServerID); //获得ClientMessage对象
bool StartConnectTask(int nIntervalTime = CONNECT_LIMIT_RETRY); //设置自动重连的定时器
void CancelConnectTask(); //关闭重连定时器
void Close();
virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0); //定时器执行
复制代码
Init是初始化管理类,需要指定一个Reactor反应器,反应器可以自己创建一个,也可以用默认的。Connect()函数是创建一个新的链接,需要ServerID(这个你可以自己去定义一下,只要能区分出各个链接不同就可以了,不可重复),IP和端口,这是必备的,呵呵,就不多说了,关键是IClientMessage这个对象,这是我定义的一个消息处理类,你可以在外面继承这个类,这个类提供了两种方法
class IClientMessage
{
public:
virtual bool RecvData(CClientParse* pClientParse) = 0; //接收数据的函数
virtual bool ConnectError(int nError) = 0; //当出错的时候,调用此接口返回错误信息
};
复制代码
RecvData()是接受完成数据包后,会自动调用这个接口,由继承的类去实现内部的数据处理,同样,当数据链接出错的话,系统会回调ConnectError()方法并告诉继承的类是什么错误导致的失败。
每当用户调用发送接口的时候,我会先检查链接是否健康,如果健康,则正常发送,如果不健康或者已经断开,就会自动重连。为了保持数据链接的最大稳定性,我添加了一个定时器,会定时检测所有注册的链接是否正常,如果不正常的话会自动重连。
定时器自动重连的方法是
int CClientReConnectManager::handle_timeout(const ACE_Time_Value &tv, const void *arg)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_ThreadWritrLock);
mapReactorConnectInfo::iterator b = m_mapConnectInfo.begin();
mapReactorConnectInfo::iterator e = m_mapConnectInfo.end();
for(b; b!= e; b++)
{
CReactorClientInfo* pClientInfo = (CReactorClientInfo* )b->second;
if(NULL == pClientInfo->GetConnectClient())
{
//如果连接不存在,则重新建立连接
pClientInfo->Run();
}
}
return 0;
}
复制代码
好了,管理类差不多就是这样了。至于ConnectHander的实现,Proactor和Reactor是不同的,于是我实现了两个工程。代码分别在里面加了注释,其实并不复杂。倒是Proactor的客户端链接,这个应该注意一下,Connect成功了并不代表链接就已经建立,只是代表连接已经准备好建立,至于是否成功,需要在ProConnectHander的Open方法下获得,异步就是这点需要注意一下。
呵呵,上传一下测试过的代码,以下代码在VS2005下编译通过,测试通过,proceXP测试内存稳定。(我的开发机器上ACE的版本是5.7.4,如果用更高版本的ACE,比如6.0.0编译会报错,提示#include "ace/os.h"不存在,你只要把这句话替换成,#include "ace/OS_main.h"即可)
这些代码里有调用例子,有兴趣的朋友可以看看。
ACEReactorClient.rar (27.89 KB)
ACEConnectClient.rar (37.38 KB)
为了实现以上功能,首先要添加一个管理类(CClientReConnectManager),管理所有已经连接和还没有连接的对象。然后添加一个单元类(CReactorClientInfo),这个单元类负责管理指定链接对象的信息。
CClientReConnectManager包含了以下方法
bool Init(ACE_Reactor* pReactor);
bool Connect(int nServerID, const char* pIP, int nPort, IClientMessage* pClientMessage);
bool Close(int nServerID); //关闭连接
bool ConnectErrorClose(int nServerID); //由内部错误引起的失败,由ProConnectClient调用
bool SendData(int nServerID, ACE_Message_Block* pmblk); //发送数据
bool SetHandler(int nServerID, CConnectClient* pConnectClient); //将指定的CProConnectClient*绑定给nServerID
IClientMessage* GetClientMessage(int nServerID); //获得ClientMessage对象
bool StartConnectTask(int nIntervalTime = CONNECT_LIMIT_RETRY); //设置自动重连的定时器
void CancelConnectTask(); //关闭重连定时器
void Close();
virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0); //定时器执行
复制代码
Init是初始化管理类,需要指定一个Reactor反应器,反应器可以自己创建一个,也可以用默认的。Connect()函数是创建一个新的链接,需要ServerID(这个你可以自己去定义一下,只要能区分出各个链接不同就可以了,不可重复),IP和端口,这是必备的,呵呵,就不多说了,关键是IClientMessage这个对象,这是我定义的一个消息处理类,你可以在外面继承这个类,这个类提供了两种方法
class IClientMessage
{
public:
virtual bool RecvData(CClientParse* pClientParse) = 0; //接收数据的函数
virtual bool ConnectError(int nError) = 0; //当出错的时候,调用此接口返回错误信息
};
复制代码
RecvData()是接受完成数据包后,会自动调用这个接口,由继承的类去实现内部的数据处理,同样,当数据链接出错的话,系统会回调ConnectError()方法并告诉继承的类是什么错误导致的失败。
每当用户调用发送接口的时候,我会先检查链接是否健康,如果健康,则正常发送,如果不健康或者已经断开,就会自动重连。为了保持数据链接的最大稳定性,我添加了一个定时器,会定时检测所有注册的链接是否正常,如果不正常的话会自动重连。
定时器自动重连的方法是
int CClientReConnectManager::handle_timeout(const ACE_Time_Value &tv, const void *arg)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_ThreadWritrLock);
mapReactorConnectInfo::iterator b = m_mapConnectInfo.begin();
mapReactorConnectInfo::iterator e = m_mapConnectInfo.end();
for(b; b!= e; b++)
{
CReactorClientInfo* pClientInfo = (CReactorClientInfo* )b->second;
if(NULL == pClientInfo->GetConnectClient())
{
//如果连接不存在,则重新建立连接
pClientInfo->Run();
}
}
return 0;
}
复制代码
好了,管理类差不多就是这样了。至于ConnectHander的实现,Proactor和Reactor是不同的,于是我实现了两个工程。代码分别在里面加了注释,其实并不复杂。倒是Proactor的客户端链接,这个应该注意一下,Connect成功了并不代表链接就已经建立,只是代表连接已经准备好建立,至于是否成功,需要在ProConnectHander的Open方法下获得,异步就是这点需要注意一下。
呵呵,上传一下测试过的代码,以下代码在VS2005下编译通过,测试通过,proceXP测试内存稳定。(我的开发机器上ACE的版本是5.7.4,如果用更高版本的ACE,比如6.0.0编译会报错,提示#include "ace/os.h"不存在,你只要把这句话替换成,#include "ace/OS_main.h"即可)
这些代码里有调用例子,有兴趣的朋友可以看看。
ACEReactorClient.rar (27.89 KB)
ACEConnectClient.rar (37.38 KB)
相关文章推荐
- 【基础】利用thrift实现一个非阻塞带有回调机制的客户端
- I/O子系统:select,poll,epoll,kqueue, iocp(Windows)及各种I/O复用机制 模式Reactor Proactor
- I/O子系统:select,poll,epoll,kqueue, iocp(Windows)及各种I/O复用机制 模式Reactor Proactor
- I/O子系统:select,poll,epoll,kqueue, iocp(Windows)及各种I/O复用机制 模式Reactor Proactor
- JDBC实现Mysql自动重连机制的方法详解
- [心跳] 使用心跳机制实现CS架构下多客户端的在线状态实时更新以及掉线自动重连
- 利用thrift实现一个非阻塞带有回调机制的客户端
- JDBC如何实现Mysql自动重连的机制
- JDBC如何实现Mysql自动重连的机制
- 高性能IO设计的Reactor和Proactor模式
- Reactor, Proactor并发模式与高性能视频服务器
- 使用spring注解 自动装配以及自动扫描机制 实现零xml配置的前提
- C++ Socket C/S ,实现客户端,服务器端断开重连
- Spring+Rmi中的客户端自动重连配置
- Proactor和Reactor模式
- 对比高性能I/O设计模式-Reactor/Proactor
- JavaScript实现HTML5游戏断线自动重连的方法
- QTP实现从outlook客户端自动发送邮件
- Android实现来电自动挂断实现机制
- 实现带有数据绑定的客户端脚本控制的二级联动菜单[zz]