ACE Proactor模式的一个问题
2008-03-21 16:41
393 查看
我现在想使用ACE里的Proactor来做一个通信的程序框架.看书上说就是使用的完成端口的技术.可以提供最为优化的通信.
但是在使用的时候会有这样的问题.我使用了VC写了一个程序.连接一个socket.然后开启一个线程发送数据包.每sleep 10 毫秒发一个大小为50byte的数据包.这样我开了40个之后.就会出显问题了.服务器就会出错误了.我的源代码是这样写的:
///接收器类
// Accepte.h: interface for the Accepte class.
//
//////////////////////////////////////////////////////////////////////
#if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
#define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_Acceptor.h>
#include "Receive.h"
class Accepte : public ACE_Asynch_Acceptor<Receive>
{
public:
Receive* make_handler (void);
Accepte();
virtual ~Accepte();
virtual int validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local);
};
#endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
#include "Accepte.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
Accepte::Accepte()
{
}
Accepte::~Accepte()
{
}
Receive* Accepte::make_handler(void)
{
return new Receive();
}
int Accepte::validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local)
{
return 0;
}
///客户端类
// Receive.h: interface for the Receive class.
//
//////////////////////////////////////////////////////////////////////
#if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
#define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_io.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>
#include <ace/OS_Memory.h>
class Receive : public ACE_Service_Handler
{
public:
Receive();
virtual ~Receive();
virtual void open(ACE_HANDLE h,ACE_Message_Block& );
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
private:
void pro_read();
ACE_Asynch_Write_Stream write_;
ACE_Asynch_Read_Stream reader_;
ACE_Message_Block* mb_;
};
#endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
///
// Receive.cpp: implementation of the Receive class.
//
//////////////////////////////////////////////////////////////////////
#include "Receive.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
Receive::Receive()
{
mb_=0;
ACE_DEBUG((LM_INFO,ACE_TEXT("有一个连接./n")));
}
Receive::~Receive()
{
{
ACE_DEBUG((LM_INFO,ACE_TEXT("连接退出./n")));
if (this->handle() != ACE_INVALID_HANDLE )
{
closesocket(SOCKET(this->handle()));
}
}
}
void Receive::pro_read()
{
if (mb_==0)
{
mb_ = new ACE_Message_Block(1024);
}
else
{
mb_->reset();
}
if ( this->reader_.read(*mb_,mb_->space()) != 0)
{
ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
mb_->release();
delete this;
return;
}
}
void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
{
this->handle(h);
if (this->write_.open(*this)!=0 ||
this->reader_.open(*this) != 0 )
{
delete this;
return ;
}
pro_read();
}
void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb_ = result.message_block();
if ( !result.success() || result.bytes_transferred() == 0)
{
mb_.release();
delete this;
}
else
{
printf("接收到数据包/n");
//printf("内容:%s/n",(char*)mb_.base());
printf("数据包的大小为:%d/n",result.bytes_transferred());
pro_read();
}
return ;
}
void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
//result.message_block().release();
return ;
}
////////////////////////////////////////////////////////////////////////////////////////////////////
//main.cpp
#ifdef _DEBUG
#pragma comment(lib,"aced")
#else
#pragma comment(lib,"ace")
#endif
#include <ace/ace.h>
#include "Accepte.h"
#include "Proactor_Task.h"
int ACE_TMAIN(int ,char*[])
{
Accepte accepte;
accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,0,1);
int nExit=0;
while (nExit==0)
{
ACE_Proactor::instance()->run_event_loop();
}
return 0;
}
请教做过ACE前摄式的朋友来看一下这个代码吧.谢谢啦. 问题点数:200、回复次数:5Top
init_readstream和init_writestream的时候加一,handle_read和handle_write的时候减一
因为你关闭的时候还有外出的I/O在操作
但是在使用的时候会有这样的问题.我使用了VC写了一个程序.连接一个socket.然后开启一个线程发送数据包.每sleep 10 毫秒发一个大小为50byte的数据包.这样我开了40个之后.就会出显问题了.服务器就会出错误了.我的源代码是这样写的:
///接收器类
// Accepte.h: interface for the Accepte class.
//
//////////////////////////////////////////////////////////////////////
#if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
#define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_Acceptor.h>
#include "Receive.h"
class Accepte : public ACE_Asynch_Acceptor<Receive>
{
public:
Receive* make_handler (void);
Accepte();
virtual ~Accepte();
virtual int validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local);
};
#endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
#include "Accepte.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
Accepte::Accepte()
{
}
Accepte::~Accepte()
{
}
Receive* Accepte::make_handler(void)
{
return new Receive();
}
int Accepte::validate_connection (const ACE_Asynch_Accept::Result& result,
const ACE_INET_Addr &remote,
const ACE_INET_Addr& local)
{
return 0;
}
///客户端类
// Receive.h: interface for the Receive class.
//
//////////////////////////////////////////////////////////////////////
#if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
#define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include <ace/Asynch_io.h>
#include <ace/Message_Block.h>
#include <ace/Log_Msg.h>
#include <ace/OS_Memory.h>
class Receive : public ACE_Service_Handler
{
public:
Receive();
virtual ~Receive();
virtual void open(ACE_HANDLE h,ACE_Message_Block& );
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
private:
void pro_read();
ACE_Asynch_Write_Stream write_;
ACE_Asynch_Read_Stream reader_;
ACE_Message_Block* mb_;
};
#endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
///
// Receive.cpp: implementation of the Receive class.
//
//////////////////////////////////////////////////////////////////////
#include "Receive.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
Receive::Receive()
{
mb_=0;
ACE_DEBUG((LM_INFO,ACE_TEXT("有一个连接./n")));
}
Receive::~Receive()
{
{
ACE_DEBUG((LM_INFO,ACE_TEXT("连接退出./n")));
if (this->handle() != ACE_INVALID_HANDLE )
{
closesocket(SOCKET(this->handle()));
}
}
}
void Receive::pro_read()
{
if (mb_==0)
{
mb_ = new ACE_Message_Block(1024);
}
else
{
mb_->reset();
}
if ( this->reader_.read(*mb_,mb_->space()) != 0)
{
ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p.")));
mb_->release();
delete this;
return;
}
}
void Receive::open(ACE_HANDLE h,ACE_Message_Block& )
{
this->handle(h);
if (this->write_.open(*this)!=0 ||
this->reader_.open(*this) != 0 )
{
delete this;
return ;
}
pro_read();
}
void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb_ = result.message_block();
if ( !result.success() || result.bytes_transferred() == 0)
{
mb_.release();
delete this;
}
else
{
printf("接收到数据包/n");
//printf("内容:%s/n",(char*)mb_.base());
printf("数据包的大小为:%d/n",result.bytes_transferred());
pro_read();
}
return ;
}
void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
//result.message_block().release();
return ;
}
////////////////////////////////////////////////////////////////////////////////////////////////////
//main.cpp
#ifdef _DEBUG
#pragma comment(lib,"aced")
#else
#pragma comment(lib,"ace")
#endif
#include <ace/ace.h>
#include "Accepte.h"
#include "Proactor_Task.h"
int ACE_TMAIN(int ,char*[])
{
Accepte accepte;
accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,0,1);
int nExit=0;
while (nExit==0)
{
ACE_Proactor::instance()->run_event_loop();
}
return 0;
}
请教做过ACE前摄式的朋友来看一下这个代码吧.谢谢啦. 问题点数:200、回复次数:5Top
1 楼huangxiaoke2000(小小)回复于 2006-04-14 18:51:10 得分 0
你在类Receive中添加一个变量为int io_count=0;init_readstream和init_writestream的时候加一,handle_read和handle_write的时候减一
因为你关闭的时候还有外出的I/O在操作
相关文章推荐
- iOS数据、界面分开设计模式遇到的一个问题
- 【原创】基于ACE Proactor框架下高并发、大容量吞吐程序设计既最近的一个产品开发总结
- ACE前摄器Proactor模式
- ACE中的两种I/O多路复用模式 Reactor 和 Proactor 的比较
- 职责链模式和工厂模式混合求解一个简单的解密问题
- 用ACE的Reactor模式实现网络通讯时,ACE内部用WSAEventSelect函数把网络事件与一个事件对象关联起来,目的是为了后面用WaitForMultipleObjects函数统一处理。
- 设计模式精解-第三章-一个急需灵活代码的问题
- 只有一个公网IP也可以使用LVS的DR模式!(外带php session粘滞问题解决)
- 组合还是继承,这是一个问题?——由模式谈面向对象的原则之多用组合、少用继承
- ACE前摄器Proactor模式
- ACE在LINUX下环境搭建的一个小问题及处理
- 组合还是继承,这是一个问题?——由模式谈面向对象的原则之多用组合、少用继承
- ACE前摄器Proactor模式 handle_read_stream ACE_Asynch_Write_Stream
- ACE Proactor前摄器模式的服务器端代码
- 思考一个模式识别与机器学习相关的问题
- 【U3D日记-2016年9月2日】设计模式解决工作问题的一个实例
- 自己写的一个pull模式的source filter,播放mpeg1 2可以,而播放wmv不行的问题
- 关于CS模式下,控制一个容器内控件的值问题
- 每天一个设计模式之Decorator模式解决类间组合爆炸问题
- JAVA策略模式(3)之解决具体遇到的一个问题