基于ACE的网络服务端通讯编程
2008-03-27 11:22
176 查看
完成了基本的通讯功能
测试报告如下:
昨天准备11台 只有7台机子可以运行 每个开了10个
大部分 都好的 有1台开10个的时候出错 还有1台早上看的时候10个都出错
根据上面的图显示结果来看是由于服务端先关闭,客户端发送数据失败导致。
源代码如下:
class ClientAcceptor : public ACE_Event_Handler
{
public:
ClientAcceptor(void);
virtual ~ClientAcceptor(void);
//
int open(const ACE_INET_Addr &listen_addr);
//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->acceptor_.get_handle();
}
//当一个连接准备接受的时候调用这个函数
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当这个句柄从ACE_Reactor中移除的时候调用这个函数
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
protected:
ACE_SOCK_Acceptor acceptor_;
};
#i nclude "StdAfx.h"
#i nclude "./lientacceptor.h"
ClientAcceptor::ClientAcceptor(void)
{
}
ClientAcceptor::~ClientAcceptor(void)
{
this->handle_close(ACE_INVALID_HANDLE,0);
}
int ClientAcceptor::open(const ACE_INET_Addr &listen_addr)
{
if(this->acceptor_.open(listen_addr,1) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
}
return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
}
int ClientAcceptor::handle_input(ACE_HANDLE)
{
ClientService *client;
ACE_NEW_RETURN(client,ClientService,-1);
auto_ptr<ClientService> p (client);
if(this->acceptor_.accept(client->peer()) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("Failed to accept"),ACE_TEXT("client connection")),-1);
}
p.release();
client->reactor(this->reactor());
if(client->open() == -1)
{
client->handle_close(ACE_INVALID_HANDLE,0);
}
return 0;
}
int ClientAcceptor::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
{
if(this->acceptor_.get_handle() != ACE_INVALID_HANDLE)
{
ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,m);
this->acceptor_.close();
}
return 0;
}
class ClientService : public ACE_Event_Handler
{
public:
ClientService(void);
virtual ~ClientService(void);
ACE_SOCK_Stream &peer(void) {return this->sock_;}
int open(void);
//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->sock_.get_handle();
}
//当客户端出现一个input
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当发生一个output的时候
virtual int handle_output(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当从ACE_Reactor移除一个句柄的时候
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
protected:
ACE_SOCK_Stream sock_;
ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
};
#i nclude "StdAfx.h"
#i nclude "./lientservice.h"
ClientService::ClientService(void)
{
}
ClientService::~ClientService(void)
{
}
int ClientService::handle_input(ACE_HANDLE)
{
const size_t INPUT_SIZE = 4096;
char buffer[INPUT_SIZE];
ssize_t recv_cnt,send_cnt;
if((recv_cnt = this->sock_.recv(buffer,sizeof(buffer))) <= 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection closed/n")));
return -1;
}
/*
* 在这里加入数据包处理函数
*/
send_cnt = this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
if(send_cnt == recv_cnt)
{
return 0;
}
if(send_cnt == -1 && ACE_OS::last_error() != EWOULDBLOCK)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")),0);
}
if(send_cnt == -1)
send_cnt = 0;
ACE_Message_Block *mb;
size_t remaining = ACE_static_cast(size_t,(recv_cnt - send_cnt));
ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
int output_off = this->output_queue_.is_empty();
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if(this->output_queue_.enqueue_tail(mb,&nowait) == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p; discarding data/n"),ACE_TEXT("enqueue failed")));
mb->release();
return 0;
}
if(output_off)
{
return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
}
return 0;
}
int ClientService::handle_output(ACE_HANDLE)
{
ACE_Message_Block *mb;
ACE_Time_Value nowait(ACE_OS::gettimeofday());
while( 0 == this->output_queue_.dequeue_head(mb,&nowait))
{
ssize_t send_cnt = this->sock_.send(mb->rd_ptr(),mb->length());
if(send_cnt == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")));
}
else
{
mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
}
if(mb->length() > 0)
{
this->output_queue_.enqueue_head(mb);
break;
}
mb->release();
}
return(this->output_queue_.is_empty())? -1 : 0;
}
int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
{
if(mask == ACE_Event_Handler::WRITE_MASK)
return 0;
mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,mask);
this->sock_.close();
this->output_queue_.flush();
delete this;
return 0;
}
int ClientService::open(void)
{
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if(this->sock_.get_remote_addr(peer_addr) == 0 && peer_addr.addr_to_string(peer_name,MAXHOSTNAMELEN) == 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection from %s/n"),peer_name));
}
return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
}
测试报告如下:
昨天准备11台 只有7台机子可以运行 每个开了10个
大部分 都好的 有1台开10个的时候出错 还有1台早上看的时候10个都出错
根据上面的图显示结果来看是由于服务端先关闭,客户端发送数据失败导致。
源代码如下:
class ClientAcceptor : public ACE_Event_Handler
{
public:
ClientAcceptor(void);
virtual ~ClientAcceptor(void);
//
int open(const ACE_INET_Addr &listen_addr);
//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->acceptor_.get_handle();
}
//当一个连接准备接受的时候调用这个函数
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当这个句柄从ACE_Reactor中移除的时候调用这个函数
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
protected:
ACE_SOCK_Acceptor acceptor_;
};
#i nclude "StdAfx.h"
#i nclude "./lientacceptor.h"
ClientAcceptor::ClientAcceptor(void)
{
}
ClientAcceptor::~ClientAcceptor(void)
{
this->handle_close(ACE_INVALID_HANDLE,0);
}
int ClientAcceptor::open(const ACE_INET_Addr &listen_addr)
{
if(this->acceptor_.open(listen_addr,1) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
}
return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
}
int ClientAcceptor::handle_input(ACE_HANDLE)
{
ClientService *client;
ACE_NEW_RETURN(client,ClientService,-1);
auto_ptr<ClientService> p (client);
if(this->acceptor_.accept(client->peer()) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("Failed to accept"),ACE_TEXT("client connection")),-1);
}
p.release();
client->reactor(this->reactor());
if(client->open() == -1)
{
client->handle_close(ACE_INVALID_HANDLE,0);
}
return 0;
}
int ClientAcceptor::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
{
if(this->acceptor_.get_handle() != ACE_INVALID_HANDLE)
{
ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,m);
this->acceptor_.close();
}
return 0;
}
class ClientService : public ACE_Event_Handler
{
public:
ClientService(void);
virtual ~ClientService(void);
ACE_SOCK_Stream &peer(void) {return this->sock_;}
int open(void);
//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->sock_.get_handle();
}
//当客户端出现一个input
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当发生一个output的时候
virtual int handle_output(ACE_HANDLE fd = ACE_INVALID_HANDLE);
//当从ACE_Reactor移除一个句柄的时候
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
protected:
ACE_SOCK_Stream sock_;
ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
};
#i nclude "StdAfx.h"
#i nclude "./lientservice.h"
ClientService::ClientService(void)
{
}
ClientService::~ClientService(void)
{
}
int ClientService::handle_input(ACE_HANDLE)
{
const size_t INPUT_SIZE = 4096;
char buffer[INPUT_SIZE];
ssize_t recv_cnt,send_cnt;
if((recv_cnt = this->sock_.recv(buffer,sizeof(buffer))) <= 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection closed/n")));
return -1;
}
/*
* 在这里加入数据包处理函数
*/
send_cnt = this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
if(send_cnt == recv_cnt)
{
return 0;
}
if(send_cnt == -1 && ACE_OS::last_error() != EWOULDBLOCK)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")),0);
}
if(send_cnt == -1)
send_cnt = 0;
ACE_Message_Block *mb;
size_t remaining = ACE_static_cast(size_t,(recv_cnt - send_cnt));
ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
int output_off = this->output_queue_.is_empty();
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if(this->output_queue_.enqueue_tail(mb,&nowait) == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p; discarding data/n"),ACE_TEXT("enqueue failed")));
mb->release();
return 0;
}
if(output_off)
{
return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
}
return 0;
}
int ClientService::handle_output(ACE_HANDLE)
{
ACE_Message_Block *mb;
ACE_Time_Value nowait(ACE_OS::gettimeofday());
while( 0 == this->output_queue_.dequeue_head(mb,&nowait))
{
ssize_t send_cnt = this->sock_.send(mb->rd_ptr(),mb->length());
if(send_cnt == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")));
}
else
{
mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
}
if(mb->length() > 0)
{
this->output_queue_.enqueue_head(mb);
break;
}
mb->release();
}
return(this->output_queue_.is_empty())? -1 : 0;
}
int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
{
if(mask == ACE_Event_Handler::WRITE_MASK)
return 0;
mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,mask);
this->sock_.close();
this->output_queue_.flush();
delete this;
return 0;
}
int ClientService::open(void)
{
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if(this->sock_.get_remote_addr(peer_addr) == 0 && peer_addr.addr_to_string(peer_name,MAXHOSTNAMELEN) == 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection from %s/n"),peer_name));
}
return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
}
相关文章推荐
- Openssl:构建CA的过程并实现web服务基于https访问的网络架构
- JAVA网络编程实现基于TCP协议的时间服务(向服务器查询当前时间)
- NSS,全称Network Socialing Services,即基于网络的社会化服务
- OpenStack-M版(Mitaka)搭建基于(Centos7.2)+++六、Openstack网络服务(neutron)下
- ACE框架简介以及一个基于ACE的C/S服务程序实例
- 基于网络抓包实现kubernetes中微服务的应用级监控
- SafeNet为亚马逊网络服务客户提供基于云的数据保护技术
- 基于反向代理的CDN网络加速服务——CloudFlare
- 基于网络抓包实现kubernetes中微服务的应用级监控
- 基于。NET的模块化网络服务应用程序服务器开发 第一章
- 黑马程序员:基于TCP协议的网络服务:ServerSocket、Socket
- 基于Comet推送的网络拍卖服务 北京地区诚征商业合作伙伴
- 一种基于网络服务的客户端自动升级框架及其应用
- 没有网络,也能上网-基于USSD技术的信息服务
- socket 网络编程高速入门(一)教你编写基于UDP/TCP的服务(client)通信
- (网络层: 二 ) 网络层提供的服务(基于无连接的服务)
- (网络层: 二 ) 网络层提供的服务(基于无连接的服务)
- 基于网络抓包实现kubernetes中微服务的应用级监控
- 基于微线程的网络服务框架