您的位置:首页 > 理论基础 > 计算机网络

基于ACE的网络服务端通讯编程

2008-03-27 11:22 176 查看
完成了基本的通讯功能

测试报告如下:

昨天准备11台 只有7台机子可以运行 每个开了10个
大部分 都好的 有1台开10个的时候出错 还有1台早上看的时候10个都出错



根据上面的图显示结果来看是由于服务端先关闭,客户端发送数据失败导致。

源代码如下:

class ClientAcceptor : public ACE_Event_Handler
{
public:
ClientAcceptor(void);
virtual ~ClientAcceptor(void);
//
int open(const ACE_INET_Addr &listen_addr);
//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->acceptor_.get_handle();
}
//当一个连接准备接受的时候调用这个函数
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);


//当这个句柄从ACE_Reactor中移除的时候调用这个函数
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);




protected:
ACE_SOCK_Acceptor acceptor_;


};


#i nclude "StdAfx.h"
#i nclude "./lientacceptor.h"


ClientAcceptor::ClientAcceptor(void)
{
}


ClientAcceptor::~ClientAcceptor(void)
{
this->handle_close(ACE_INVALID_HANDLE,0);
}


int ClientAcceptor::open(const ACE_INET_Addr &listen_addr)
{
if(this->acceptor_.open(listen_addr,1) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);


}
return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);

}


int ClientAcceptor::handle_input(ACE_HANDLE)
{
ClientService *client;
ACE_NEW_RETURN(client,ClientService,-1);
auto_ptr<ClientService> p (client);


if(this->acceptor_.accept(client->peer()) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("Failed to accept"),ACE_TEXT("client connection")),-1);


}

p.release();
client->reactor(this->reactor());
if(client->open() == -1)
{
client->handle_close(ACE_INVALID_HANDLE,0);


}
return 0;


}

int ClientAcceptor::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
{
if(this->acceptor_.get_handle() != ACE_INVALID_HANDLE)
{
ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,m);
this->acceptor_.close();
}
return 0;
}


class ClientService : public ACE_Event_Handler
{
public:
ClientService(void);
virtual ~ClientService(void);

ACE_SOCK_Stream &peer(void) {return this->sock_;}

int open(void);

//得到当前句柄的I/O句柄
virtual ACE_HANDLE get_handle(void) const
{
return this->sock_.get_handle();
}
//当客户端出现一个input
virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);

//当发生一个output的时候
virtual int handle_output(ACE_HANDLE fd = ACE_INVALID_HANDLE);

//当从ACE_Reactor移除一个句柄的时候
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);

protected:
ACE_SOCK_Stream sock_;
ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
};

#i nclude "StdAfx.h"
#i nclude "./lientservice.h"


ClientService::ClientService(void)
{
}


ClientService::~ClientService(void)
{
}




int ClientService::handle_input(ACE_HANDLE)
{
const size_t INPUT_SIZE = 4096;
char buffer[INPUT_SIZE];
ssize_t recv_cnt,send_cnt;

if((recv_cnt = this->sock_.recv(buffer,sizeof(buffer))) <= 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection closed/n")));
return -1;
}

/*
* 在这里加入数据包处理函数

*/
send_cnt = this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));

if(send_cnt == recv_cnt)
{
return 0;
}

if(send_cnt == -1 && ACE_OS::last_error() != EWOULDBLOCK)
{
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")),0);

}

if(send_cnt == -1)
send_cnt = 0;

ACE_Message_Block *mb;
size_t remaining = ACE_static_cast(size_t,(recv_cnt - send_cnt));
ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
int output_off = this->output_queue_.is_empty();
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if(this->output_queue_.enqueue_tail(mb,&nowait) == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p; discarding data/n"),ACE_TEXT("enqueue failed")));
mb->release();
return 0;
}

if(output_off)
{
return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);

}
return 0;
}

int ClientService::handle_output(ACE_HANDLE)
{
ACE_Message_Block *mb;
ACE_Time_Value nowait(ACE_OS::gettimeofday());
while( 0 == this->output_queue_.dequeue_head(mb,&nowait))
{
ssize_t send_cnt = this->sock_.send(mb->rd_ptr(),mb->length());
if(send_cnt == -1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) %p/n"),ACE_TEXT("send")));
}
else
{
mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
}
if(mb->length() > 0)
{
this->output_queue_.enqueue_head(mb);
break;
}
mb->release();
}
return(this->output_queue_.is_empty())? -1 : 0;

}

int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
{
if(mask == ACE_Event_Handler::WRITE_MASK)
return 0;
mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;

this->reactor()->remove_handler(this,mask);
this->sock_.close();
this->output_queue_.flush();
delete this;
return 0;
}

int ClientService::open(void)
{
ACE_TCHAR peer_name[MAXHOSTNAMELEN];
ACE_INET_Addr peer_addr;
if(this->sock_.get_remote_addr(peer_addr) == 0 && peer_addr.addr_to_string(peer_name,MAXHOSTNAMELEN) == 0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) Connection from %s/n"),peer_name));

}
return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: