您的位置:首页 > Web前端 > React

ACE Reactor for Windows模型源码研究

2013-09-03 14:41 357 查看
最近研究了下ACE的Reactor模型的源码。相比之前自己写的ACE Select模型,复杂了不少。ACE的Reactor框架,用户通过继承ACE_Event_Handler事件处理类。关联ACE_Reactor反应器,将无阻塞的IO隐蔽在ACE_Reactor对象的底层实现,这样减少了开发的事件和风险,提高了效率。

照例,首先叙述顶层的例子。这里,我首先定义一个ACE_Event_Handler的派生类HandleAccept(故名思议也知道是干什么的),负责输入描述符的处理:

class HandleAccept : public ACE_Event_Handler
{
private:
ACE_SOCK_Acceptor acceptor_;
ACE_INET_Addr inet_address_;
ACE_Reactor_Mask mask_;

u_short mPort;

HandleData *handle_data_;

public:
HandleAccept( ACE_Reactor *reactor ) : ACE_Event_Handler(reactor) {}

int open(u_short nPort);
virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){}
virtual int handle_close( ACE_HANDLE handle = ACE_INVALID_HANDLE,
ACE_Reactor_Mask mask_ = 0);
virtual ACE_HANDLE get_handle (void) const;
};

再定义一个ACE_Event_Handler的派生类HandleData,负责数据的处理:

class HandleData : public ACE_Event_Handler
{
private:

ACE_SOCK_Stream peer_;
ACE_Message_Block *head_;
ACE_Message_Block *data_;

public:
HandleData(ACE_Reactor *reactor) : ACE_Event_Handler(reactor) {}
int open();
virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){}
virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE , ACE_Reactor_Mask mask_ = 0);
virtual ACE_HANDLE get_handle(void) const;

ACE_SOCK_Stream &peer() {return peer_;}
int recv_data(ACE_SOCK_Stream strem);
};


这里着重介绍几个重要的接口,首先是open,在open中需要先初始化一个acceptor的socket,并且注册相关事件的mask到ACE_Reactor的反应器中(其实这里主要是ACCEPT_MASK)。

int HandleAccept::open(u_short nPort)
{
inet_address_.set(nPort);
if ( acceptor_.open(inet_address_ , 1) < 0 ) return -1;

ACE_SET_BITS(mask_ ,
ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::ACCEPT_MASK);

std::cout<<"HandleAccept::open()"<<std::endl;
return reactor()->register_handler(this , mask_);
}
当有输入事件的时候,handle_input回调将会被调用,在这里我们先new一个HandleData,这也是一个ACE_Event_Handler的派生类,负责数据的处理。然后调用ACE_SOCK_Acceptor的accept方法,等待输入的描述符。

int HandleAccept::handle_input(ACE_HANDLE handle)
{
handle_data_ = new HandleData(reactor());
ACE_INET_Addr remote_addr_;
if ( acceptor_.accept(handle_data_->peer() , &remote_addr_) < 0 )
{
std::cout<<"*ERROR* Fail to Accept connection"<<std::endl;
return -1;
}
std::cout<<"connect from "<<remote_addr_.get_host_addr()<<std::endl;

handle_data_->open();

return 0;
}


随后,在main的主程序中:

int main(int argc , char *argv[])
{
HandleAccept yankee(ACE_Reactor::instance());

yankee.open(1234);

ACE_Reactor::instance()->run_event_loop();

return 0;
}

如果使用传统的网络模型来实现,开发者将不得不面对以下问题:

设置和清楚fd_sets
检测事件,并对信号中断做出响应

管理内部锁
将事件多路分离给相关联的事件处理器
分派对I/O,信号和定时器事件的处理函数

ACE_Reactor框架解决了这一切,开发者只需关心上层的内容即可。首先从HandleAccep::open进入,层层转进到ACE_WFMO_Reactor::register_handler_i(至于为什么走到ACE_WFMO_Reactor,稍后就会提到):

int
ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
ACE_HANDLE io_handle,
ACE_Event_Handler *event_handler,
ACE_Reactor_Mask new_masks=READ_MASK)
{
// .......................
int found = this->handler_rep_.modify_network_events_i (io_handle,
new_masks,
old_masks,
new_network_events,
event_handle,
delete_event,
ACE_Reactor::ADD_MASK);
// .......................
int result = ::WSAEventSelect ((SOCKET) io_handle,
event_handle,
new_network_events);
if (found)
return result;
else if (result != SOCKET_ERROR &&
this->handler_rep_.bind_i (1,
event_handler,
new_network_events,
io_handle,
event_handle,
delete_event) != -1)
// .......................
}

首先调用 this->handler_rep_.modify_network_events_i(),在这里old_masks将获取event_handler感兴趣的事件集所对应的MASK,并增加new_masks对应的事件集FD,modify_network_events_i返回是否在active handles,suspended handles或者records to be added中找到io_handle,如果找到event_handle将指向这个io_handle。WSAEventSelect
将io_handle和event_handle绑定在一起。如果modify_network_events_i没有找到io_handle在记录中,则随后调用this->handler_rep_.bind_i负责插入一个新的Event_Handler入口到to_be_added_info_[]中。随后,to_be_added_info_[]的数据会被加入到current_handles_[]中,这一步将在ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos()中完成。

从ACE_Reactor::run_event_loop()进入,逐步分析。run_event_loop()走到ACE_Reactor::run_reactor_event_loop。程序进入一个while循环:

int
ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)//typedef int (*REACTOR_EVENT_HOOK)(ACE_Reactor *);
{
// .............................
while (1)
{
int const result = this->implementation_->handle_events ();

// .............................
}


this->implementation_是在ACE_Reactor::ACE_Reactor()中分配的,这是一种代理模式的用法,为的是确保windows和linux之间的移植性,具体的实现根据不同的平台而异,具体底层的各种操作,都是由这个this->implementation_做的。在windows下,this->implementation_由ACE_WFMO_Reactor在ACE_Reactor的构造中实现:

ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl,
bool delete_implementation)
{
// .....................
ACE_NEW (impl,
ACE_WFMO_Reactor);
//.........................

this->implementation (impl);
// .........................
}
}
接着前面的this->implementation_->handle_events ()调用,由这里进入,调用到ACE_WFMO_Reactor::event_handling,这里会调用wait_for_multiple_events和safe_dispatch,前者负责监听输入,后者负责分发事件。首先分析wait_for_multiple_events:

DWORD
ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
int alertable)
{
// ................................
#else
return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
this->handler_rep_.handles (),
FALSE,
timeout,
alertable);
#endif /* ACE_HAS_PHARLAP */  //noblock
}
这里,调用WaitForMultipleObjectsEx非阻塞地去等待相关事件,第一个参数是等待对象的数组的size(对象句柄的最大数量是64。此参数不能是零),第二个参数制定一个等待处理的对象数组。this->handler_rep_是ACE_WFMO_Reactor_Handler_Repository对象,ACE_Reactor将所有要管理的handle都存储在这个对象中this->handler_rep_.handles()将返回ACE_WFMO_Reactor_Handler_Repository::current_handles_,这保存了当前记录的相关句柄。这里将会向上层返回一个有输入的对象数组下标。

再来分析数据分发的safe_dispatch,这个方法经过层层调用,执行到ACE_WFMO_Reactor::complex_dispatch_handler,这里通过之前获取的数组下标,获取相关handle:

int
ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
ACE_HANDLE event_handle)
{
// This dispatch is used for I/O entires.

ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
this->handler_rep_.current_info ()[slot];
// .......................................
然后调用upcall:

int
ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
ACE_HANDLE event_handle)
{
// ..............
problems |= this->upcall (current_info.event_handler_,
current_info.io_handle_,
events);
// ..............
}
在upcall中,通过匹配事件描述符,决定执行的顶层HandleAccept::handle_input()方法,假设此时有个外部链接connect进来,则会造成一个FD_ACCEPT事件:

ACE_Reactor_Mask
ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
ACE_HANDLE io_handle,
WSANETWORKEVENTS &events)
{
// .................
if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
{
action = event_handler->handle_input (io_handle);    //这里执行最顶层HandleAccept::handle_input方法
if (action <= 0)
{
ACE_CLR_BITS (actual_events, FD_ACCEPT);
if (action == -1)
ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
}
}
// ..................
}

如果HandleAccept::handle_input()方法返回错误值-1,那么程序会将ACCEPT_MASK加入problems,这个值将导致后面执行handle_close()

int
ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
ACE_Reactor_Mask to_be_removed_masks)
{
// ......................
else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
{
ACE_HANDLE handle = this->current_info_[slot].io_handle_;
this->current_info_[slot].event_handler_->handle_close (handle,
to_be_removed_masks);
}

return 0;
}


总之,ACE的Reactor框架充分利用了C++的封装性以及多态性,并通过代理,实现了跨平台底层的透明,对快速开发应用有重要的意义。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐