您的位置:首页 > 其它

ACE Proactor模式的一个问题

2008-03-21 16:41 393 查看
我现在想使用ACE里的Proactor来做一个通信的程序框架.看书上说就是使用的完成端口的技术.可以提供最为优化的通信.
但是在使用的时候会有这样的问题.我使用了VC写了一个程序.连接一个socket.然后开启一个线程发送数据包.每sleep 10 毫秒发一个大小为50byte的数据包.这样我开了40个之后.就会出显问题了.服务器就会出错误了.我的源代码是这样写的:

///接收器类

// Accepte.h: interface for the Accepte class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
#define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_Acceptor.h>
#include "Receive.h"
class Accepte : public ACE_Asynch_Acceptor<Receive>
{
public:
Receive* make_handler (void);
Accepte();
virtual ~Accepte();
virtual int validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local);
};

#endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)

#include "Accepte.h"

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

Accepte::Accepte()
{

}

Accepte::~Accepte()
{

}

Receive* Accepte::make_handler(void)
{
return new Receive();
}

int Accepte::validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local)
{
return 0;
}

///客户端类
// Receive.h: interface for the Receive class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
#define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_io.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>
#include <ace/OS_Memory.h>
class Receive : public ACE_Service_Handler
{
public:
Receive();
virtual ~Receive();
virtual void open(ACE_HANDLE h,ACE_Message_Block& );
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
private:
void pro_read();
ACE_Asynch_Write_Stream write_;
ACE_Asynch_Read_Stream reader_;
ACE_Message_Block* mb_;

};

#endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)

///
// Receive.cpp: implementation of the Receive class.
//
//////////////////////////////////////////////////////////////////////

#include "Receive.h"

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

Receive::Receive()
{
mb_=0;
ACE_DEBUG((LM_INFO,ACE_TEXT("有一个连接./n")));
}
Receive::~Receive()
{
{
ACE_DEBUG((LM_INFO,ACE_TEXT("连接退出./n")));
if (this->handle() != ACE_INVALID_HANDLE )
{
closesocket(SOCKET(this->handle()));
}
}
}
void Receive::pro_read()
{
if (mb_==0)
{
mb_ = new ACE_Message_Block(1024);
}
else
{
mb_->reset();
}
if ( this->reader_.read(*mb_,mb_->space()) != 0)
{
ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
mb_->release();
delete this;
return;
}
}
void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
{
this->handle(h);
if (this->write_.open(*this)!=0 ||
this->reader_.open(*this) != 0 )
{
delete this;
return ;
}
pro_read();
}
void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb_ = result.message_block();
if ( !result.success() || result.bytes_transferred() == 0)
{
mb_.release();
delete this;
}
else
{
printf("接收到数据包/n");
//printf("内容:%s/n",(char*)mb_.base());
printf("数据包的大小为:%d/n",result.bytes_transferred());
pro_read();
}
return ;
}

void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
//result.message_block().release();
return ;
}

////////////////////////////////////////////////////////////////////////////////////////////////////
//main.cpp

#ifdef _DEBUG
#pragma comment(lib,"aced")
#else
#pragma comment(lib,"ace")
#endif
#include <ace/ace.h>
#include "Accepte.h"
#include "Proactor_Task.h"

int ACE_TMAIN(int ,char*[])
{
Accepte accepte;
accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,0,1);
int nExit=0;
while (nExit==0)
{
ACE_Proactor::instance()->run_event_loop();
}
return 0;
}

请教做过ACE前摄式的朋友来看一下这个代码吧.谢谢啦. 问题点数:200、回复次数:5Top

1 楼huangxiaoke2000(小小)回复于 2006-04-14 18:51:10 得分 0

你在类Receive中添加一个变量为int io_count=0;
init_readstream和init_writestream的时候加一,handle_read和handle_write的时候减一
因为你关闭的时候还有外出的I/O在操作
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐