您的位置:首页 > 编程语言

ACE Proactor前摄器模式的服务器端代码

2008-06-05 08:14 375 查看
// sqServer.cpp : Defines the entry point for the console application.
//

#include "ace/OS.h"
#include "ace/Arg_Shifter.h"
#include "ace/Svc_Handler.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Proactor.h"

#include <sstream>
#include <iostream>
using namespace std;

class My_Task:public ACE_Task<ACE_MT_SYNCH>
{
public:
static My_Task* instance(void)
{
return ACE_Singleton<My_Task, ACE_SYNCH_MUTEX>::instance ();
}
My_Task():m_logtype(0),m_logfile("sqServer.log")
{
}
int open(void* =0)
{
activate(THR_NEW_LWP,1);
return 0;
}
void logtype(size_t lt)
{
m_logtype=lt;
}
void logfile(const string& lf)
{
m_logfile=lf;
}
private:
int svc(void)
{
int seq=0;
FILE* fp=0;

ACE_Message_Block * mb =0;
time_t curtime;
tm *ltime;
do
{
int rh=getq(mb);
if(mb)
{
char *buf=mb->rd_ptr();
int len=mb->length();
int size=mb->size();
if(len==size)
{
buf[len-1]=0;
}
else
{
buf[len]=0;
}
++seq;
ostringstream ostr;
switch(m_logtype)
{
case 1:
std::cout<<"ID:"<<seq<<'/n';
break;
case 2:

if(!fp)
{
fp=fopen(m_logfile.c_str(),"a");
if(!fp)
{
std::cout<<"Error:Open log file fail,exit now!"<<endl;
exit(1);
}
}

curtime=time(0);
ltime=localtime(&curtime);
ostr<<'['<<ltime->tm_hour<<':'<<ltime->tm_min<<':'<<ltime->tm_sec<<']'<<"ID:"<<seq<<'/n';
if(fputs(ostr.str().c_str(),fp)<0)
{
fclose(fp);
fp=0;
}
else
{
fflush(fp);
}

break;
case 3:
std::cout<<"ID:"<<seq<<",Data=/n"<<buf<<'/n';
break;
default:
break;
}
mb->release();
}
}while(mb);
fclose(fp);
return 0;
}
private:
size_t m_logtype;
string m_logfile;
};

class My_Service_Handler:public ACE_Service_Handler
{
public:
My_Service_Handler()
{
}
My_Service_Handler(const string& respmsg):m_respMsg(respmsg),m_isfirst(true)
{
}
~My_Service_Handler()
{
My_Task::instance()->putq(m_recvmb);
if(this->handle()!=ACE_INVALID_HANDLE)
{
ACE_OS::closesocket(this->handle());
//this->handle(ACE_INVALID_HANDLE);
}
}
virtual void open(ACE_HANDLE h,ACE_Message_Block&)
{
this->handle(h);
if(this->reader_.open(*this)!=0||this->writer_.open(*this)!=0)
{
delete this;
return;
}
ACE_NEW_NORETURN(m_recvmb,ACE_Message_Block(10240));
if(this->reader_.read(*m_recvmb,m_recvmb->space())!=0)
{
delete this;
return;
}
return;
}
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb=result.message_block();
if(!result.success()||result.bytes_transferred()==0)
{
//mb.release();
delete this;
}
else
{
if(m_isfirst)
{
ACE_Message_Block *sendmb;
ACE_NEW_NORETURN(sendmb,ACE_Message_Block(1024));
sendmb->copy(m_respMsg.c_str(),m_respMsg.size());
if(this->writer_.write(*sendmb,sendmb->length())!=0)
{
sendmb->release();
}
m_isfirst=false;
}

if(this->reader_.read(mb,mb.space())!=0)
{
//mb.release();
delete this;
return;
}
}
}
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
result.message_block().release();
//ACE_OS::closesocket(result.handle());
return;
}
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
string m_respMsg;
ACE_Message_Block *m_recvmb;
bool m_isfirst;
};
//typedef ACE_Asynch_Acceptor<My_Service_Handler> My_Asynch_Acceptor;
class My_Asynch_Acceptor:public ACE_Asynch_Acceptor<My_Service_Handler>
{
public:
My_Asynch_Acceptor(const string& respcode)
{
string strBody="<?xml version=/"1.0/" encoding=/"utf-8/"?>/r/n";
strBody+="<int xmlns=/"http://tempuri.org//">";
strBody+=respcode;
strBody+="</int>/r/n";
char tmpBuf[10]={0};
ACE_OS::itoa(strBody.size(),tmpBuf,10);
string strHead="HTTP/1.1 200 OK/r/n";
strHead+="Content-Type: text/xml; charset=utf-8/r/n";
strHead+="Content-Length: ";
strHead+=tmpBuf;
strHead+="/r/n/r/n";
m_respMsg=strHead;
m_respMsg+=strBody;
}
protected:
virtual My_Service_Handler* make_handler (void)
{
My_Service_Handler *handler = 0;
ACE_NEW_RETURN (handler,
My_Service_Handler(m_respMsg),
0);
return handler;
}
private:
string m_respMsg;
};

class RunProactorLoop:public ACE_Task<ACE_MT_SYNCH>
{
public:
static RunProactorLoop* instance(void)
{
return ACE_Singleton<RunProactorLoop, ACE_SYNCH_MUTEX>::instance ();
}
int open(void* =0)
{
activate(THR_NEW_LWP,1);
return 0;
}
int svc(void)
{
ACE_Proactor::instance()->proactor_run_event_loop();
return 0;
}
};

int main(int argc, char* argv[])
{
size_t localPort=80;
string respCode="1";
ACE_Arg_Shifter arg(argc, argv);
while(arg.is_anything_left ())
{
const char *current_arg = arg.get_current();

if(ACE_OS::strcasecmp(current_arg,"-localport")==0)
{
arg.consume_arg();
if(arg.is_anything_left ())
localPort = ACE_OS::atoi(arg.get_current());
}
if(ACE_OS::strcasecmp(current_arg,"-respcode")==0)
{
arg.consume_arg();
if(arg.is_anything_left ())
respCode = arg.get_current();
}
if(ACE_OS::strcasecmp(current_arg,"-logtype")==0)
{
arg.consume_arg();
if(arg.is_anything_left ())
{
size_t logtype= ACE_OS::atoi(arg.get_current());
My_Task::instance()->logtype(logtype);
}
}
arg.consume_arg();
}

My_Asynch_Acceptor acceptor(respCode);
ACE_INET_Addr lAddr(localPort);
if(acceptor.open(lAddr,0,0,50))
{
return -1;
}

ACE_OS::printf ("Starting event loop until you types ^C/n");

My_Task::instance()->open();
RunProactorLoop::instance()->open();
ACE_Thread_Manager::instance()->wait();

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: