boost::asio 连接管理9
2013-01-16 21:07
351 查看
这节先重构下代码,业务逻辑代码和通信基础设施代码需要解耦。业务逻辑代码处理通信过程中的协议,以及背后需要的应用逻辑。而通信基础设施代码专注于并发,TCP连接等特性。
首先把前面的代码Connection的一些成员函数纯虚函数,业务类可以继承之,根据需要重写虚函数。
Server类编程模板类,接受业务类作为模板参数,只要该类提供几个必须的公有成员函数即可。
因此目录结构调整如下:
├── CMakeLists.txt
├── include
│ ├── business
│ └── core
└── src
├── business
├── CMakeLists.txt
├── core
└── main.cc
server类由于是模板类,删除.cc文件,只保留.h文件,并且放在core目录下。
connection类也放在core目录下,也变成模板类。core下的这两个类就是通信基础设施类。
新建一个business类,里面都放应用逻辑相关的代码。现在这里有一个Client类。
对应的src/CMakeLists.txt文件适当修改:
cmake_minimum_required(VERSION 2.8)
set(CMAKE_BUILD_TYPE Debug)
set(PROJECT_INCLUDE_DIR ../include)
find_package(Boost COMPONENTS system filesystem thread REQUIRED)
include_directories(${Boost_INCLUDE_DIR} ${PROJECT_INCLUDE_DIR})
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src CPP_LIST1)
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src/core CPP_LIST2)
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src/business CPP_LIST3)
add_executable(service ${CPP_LIST1} ${CPP_LIST2} ${CPP_LIST3})
target_link_libraries(service ${Boost_LIBRARIES})
add_definitions(-Wall)
好。现在看一下main.cc的代码:
#include <iostream>
#include "core/server.h"
#include "business/client.h"
using namespace std;
int main(int argc,char ** argv) {
try {
io_service iosev;
tcp::endpoint listen_endpoint(tcp::v4(), 8888);
Server<Client> server(iosev, listen_endpoint, 10);
server.Run();
} catch(std::exception const& ex) {
cout << "Exception: " << ex.what() << "";
}
}
改动在于include client.h文件,并创建Server<Client>对象。
看一下core/server.h文件:
#ifndef CORE_SERVER_H_
#define CORE_SERVER_H_
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <vector>
using namespace std;
using namespace boost;
using boost::system::error_code;
using namespace boost::asio;
using ip::tcp;
// Crate a thread pool for io_service.
// Run the io_service to accept new incoming TCP connection and handle the I/O events
// You should provide your class as template argument here
// Your class must inherit from Connection class.
template<class T>
class Server {
public:
typedef T ClientType;
Server(io_service& s, tcp::endpoint const& listen_endpoint, size_t threads_number)
: io_(s),
signals_(s),
acceptor_(io_, listen_endpoint),
thread_pool_size_(threads_number) {
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif
signals_.async_wait(bind(&Server::Stop, this));
shared_ptr<ClientType> c(new ClientType(io_));
acceptor_.async_accept(c->socket, bind(&Server::AfterAccept, this, c, _1));
}
void AfterAccept(shared_ptr<ClientType>& c, error_code const& ec) {
// Check whether the server was stopped by a signal before this completion
// handler had a chance to run.
if (!acceptor_.is_open()) {
cout << "acceptor is closed" << endl;
return;
}
if (!ec) {
c->StartJob();
shared_ptr<ClientType> c2(new ClientType(io_));
acceptor_.async_accept(c2->socket, bind(&Server::AfterAccept, this, c2, _1));
}
}
// Create a thread pool for io_service
// Launch io_service
void Run() {
// Create a pool of threads to run all of the io_services.
vector<shared_ptr<thread> > threads;
for (size_t i = 0; i < thread_pool_size_; ++i) {
shared_ptr<thread> t(new thread(bind(&io_service::run, &io_)));
threads.push_back(t);
}
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
private:
void Stop() {
cout << "stopping" << endl;
acceptor_.close();
io_.stop();
}
private:
io_service& io_;
boost::asio::signal_set signals_;
tcp::acceptor acceptor_;
size_t thread_pool_size_;
};
#endif
再看一下connection.h文件:
1.加了关键字virtual ... =0
2. enable_shared_from_this的模板参数是T而不再是Connection类。
现在看看business/client.h,这个类实现了具体服务器和一个客户端之间通信的逻辑。
#ifndef BUSINESS_CLIENT_H_
#define BUSINESS_CLIENT_H_
#include "core/connection.h"
#include <vector>
using namespace std;
class Client: public Connection<Client> {
public:
Client(io_service& s);
void StartJob();
void CloseSocket();
void AfterReadChar(error_code const& ec);
private:
vector<char> read_buffer_;
};
#endif
client.cc文件代码:
#include "business/client.h"
#include <boost/bind.hpp>
using namespace boost;
Client::Client(io_service& s):
Connection(s), read_buffer_(1, 0) {
}
void Client::StartJob() {
async_read(socket, buffer(read_buffer_),
strand_.wrap(bind(&Client::AfterReadChar, shared_from_this(), _1)));
}
void Client::CloseSocket() {
Connection::CloseSocket();
}
void Client::AfterReadChar(error_code const& ec) {
if (ec) {
cout << ec.message() << endl;
return;
}
char x = read_buffer_[0];
if (x == 'a') {
cout << "correct data received" << endl;
async_read(socket, buffer(read_buffer_),
strand_.wrap(bind(&Client::AfterReadChar, shared_from_this(), _1)));
} else {
cout << "wrong data received, char is:" << (int) x << endl;
CloseSocket();
}
}
好了,现在这个解耦够用了。下一节进行测试。
首先把前面的代码Connection的一些成员函数纯虚函数,业务类可以继承之,根据需要重写虚函数。
Server类编程模板类,接受业务类作为模板参数,只要该类提供几个必须的公有成员函数即可。
因此目录结构调整如下:
├── CMakeLists.txt
├── include
│ ├── business
│ └── core
└── src
├── business
├── CMakeLists.txt
├── core
└── main.cc
server类由于是模板类,删除.cc文件,只保留.h文件,并且放在core目录下。
connection类也放在core目录下,也变成模板类。core下的这两个类就是通信基础设施类。
新建一个business类,里面都放应用逻辑相关的代码。现在这里有一个Client类。
对应的src/CMakeLists.txt文件适当修改:
cmake_minimum_required(VERSION 2.8)
set(CMAKE_BUILD_TYPE Debug)
set(PROJECT_INCLUDE_DIR ../include)
find_package(Boost COMPONENTS system filesystem thread REQUIRED)
include_directories(${Boost_INCLUDE_DIR} ${PROJECT_INCLUDE_DIR})
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src CPP_LIST1)
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src/core CPP_LIST2)
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src/business CPP_LIST3)
add_executable(service ${CPP_LIST1} ${CPP_LIST2} ${CPP_LIST3})
target_link_libraries(service ${Boost_LIBRARIES})
add_definitions(-Wall)
好。现在看一下main.cc的代码:
#include <iostream>
#include "core/server.h"
#include "business/client.h"
using namespace std;
int main(int argc,char ** argv) {
try {
io_service iosev;
tcp::endpoint listen_endpoint(tcp::v4(), 8888);
Server<Client> server(iosev, listen_endpoint, 10);
server.Run();
} catch(std::exception const& ex) {
cout << "Exception: " << ex.what() << "";
}
}
改动在于include client.h文件,并创建Server<Client>对象。
看一下core/server.h文件:
#ifndef CORE_SERVER_H_
#define CORE_SERVER_H_
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <vector>
using namespace std;
using namespace boost;
using boost::system::error_code;
using namespace boost::asio;
using ip::tcp;
// Crate a thread pool for io_service.
// Run the io_service to accept new incoming TCP connection and handle the I/O events
// You should provide your class as template argument here
// Your class must inherit from Connection class.
template<class T>
class Server {
public:
typedef T ClientType;
Server(io_service& s, tcp::endpoint const& listen_endpoint, size_t threads_number)
: io_(s),
signals_(s),
acceptor_(io_, listen_endpoint),
thread_pool_size_(threads_number) {
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif
signals_.async_wait(bind(&Server::Stop, this));
shared_ptr<ClientType> c(new ClientType(io_));
acceptor_.async_accept(c->socket, bind(&Server::AfterAccept, this, c, _1));
}
void AfterAccept(shared_ptr<ClientType>& c, error_code const& ec) {
// Check whether the server was stopped by a signal before this completion
// handler had a chance to run.
if (!acceptor_.is_open()) {
cout << "acceptor is closed" << endl;
return;
}
if (!ec) {
c->StartJob();
shared_ptr<ClientType> c2(new ClientType(io_));
acceptor_.async_accept(c2->socket, bind(&Server::AfterAccept, this, c2, _1));
}
}
// Create a thread pool for io_service
// Launch io_service
void Run() {
// Create a pool of threads to run all of the io_services.
vector<shared_ptr<thread> > threads;
for (size_t i = 0; i < thread_pool_size_; ++i) {
shared_ptr<thread> t(new thread(bind(&io_service::run, &io_)));
threads.push_back(t);
}
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
}
private:
void Stop() {
cout << "stopping" << endl;
acceptor_.close();
io_.stop();
}
private:
io_service& io_;
boost::asio::signal_set signals_;
tcp::acceptor acceptor_;
size_t thread_pool_size_;
};
#endif
再看一下connection.h文件:
#ifndef CORE_CONNECTION_H_ #define CORE_CONNECTION_H_ #include <boost/asio.hpp> #include <boost/enable_shared_from_this.hpp> using namespace boost::asio; using ip::tcp; using boost::system::error_code; using namespace boost; using namespace std; template<class T> class Connection: public boost::enable_shared_from_this<T> { public: Connection(io_service& s) : socket(s), strand_(s) { } ~Connection() { } // You must override it yourself // Default implementation closes the socket using shutdonw&cloes methods // You could override it if want change it // Or resue it with Connection::CloseSocket() format void CloseSocket() { socket.shutdown(tcp::socket::shutdown_both); socket.close(); } // You must override it yourself virtual void StartJob() = 0; tcp::socket socket; // Strand to ensure the connection's handlers are not called concurrently. boost::asio::io_service::strand strand_; }; #endif改动有两处:
1.加了关键字virtual ... =0
2. enable_shared_from_this的模板参数是T而不再是Connection类。
现在看看business/client.h,这个类实现了具体服务器和一个客户端之间通信的逻辑。
#ifndef BUSINESS_CLIENT_H_
#define BUSINESS_CLIENT_H_
#include "core/connection.h"
#include <vector>
using namespace std;
class Client: public Connection<Client> {
public:
Client(io_service& s);
void StartJob();
void CloseSocket();
void AfterReadChar(error_code const& ec);
private:
vector<char> read_buffer_;
};
#endif
client.cc文件代码:
#include "business/client.h"
#include <boost/bind.hpp>
using namespace boost;
Client::Client(io_service& s):
Connection(s), read_buffer_(1, 0) {
}
void Client::StartJob() {
async_read(socket, buffer(read_buffer_),
strand_.wrap(bind(&Client::AfterReadChar, shared_from_this(), _1)));
}
void Client::CloseSocket() {
Connection::CloseSocket();
}
void Client::AfterReadChar(error_code const& ec) {
if (ec) {
cout << ec.message() << endl;
return;
}
char x = read_buffer_[0];
if (x == 'a') {
cout << "correct data received" << endl;
async_read(socket, buffer(read_buffer_),
strand_.wrap(bind(&Client::AfterReadChar, shared_from_this(), _1)));
} else {
cout << "wrong data received, char is:" << (int) x << endl;
CloseSocket();
}
}
好了,现在这个解耦够用了。下一节进行测试。
相关文章推荐
- boost::asio 连接管理5
- boost::asio 连接管理10
- boost::asio 连接管理11 如何关闭连接
- boost::asio 连接管理6
- boost::asio 连接管理8
- boost::asio 连接管理1
- boost::asio 连接管理3
- boost::asio 连接管理2
- boost::asio 连接管理4
- boost::asio 连接管理7
- boost::asio 连接管理11 如何关闭连接
- boost::asio 连接管理8
- boost用asio设计TCP服务器中单连接的设计
- boost::asio 无法接受新连接的处理方法
- boost::asio设置同步连接超时
- 使用 boost::signals2 的智能连接管理
- Boos::asio服务器开发之连接管理
- boost asio 应用方法学——对象生命期管理
- boost::asio::ip::tcp::socket is connected?(如何知道socket已经连接?)
- boost::asio设置同步连接超时