Boost.Asio学习之实现广播ChatRoom
2017-01-10 20:58
399 查看
见:http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/examples/cpp11_examples.html
或者:https://github.com/NearXdu/AsioLearn
服务端能够接受连接,并且将收到的消息转发到所有已连接的客户端。
因此,在设计服务端代码时,需要考虑如何保存Tcp连接实例。
在Boost.Asio中给出的例子中一共设计了3个类(实际上是4个,其中一个作为虚基类)。
1. Server
2. Room
3. Session
Server类对象异步的接收连接,它包含一个Room类对象引用,将管理所有的连接(Session对象的shared_ptr)当Server收到消息之后,将向所有的Session(连接)转发消息。
首先它需要两个线程:
1.一个线程用来读取标准输入
2.一个线程执行Proactor(io_service::run)
客户端读取数据后,进行简单的编码(消息格式为固定4个字节的header和body),然后发起异步连接,写入网络套接字。
或者:https://github.com/NearXdu/AsioLearn
1.服务端
对于一个广播聊天室来说。服务端能够接受连接,并且将收到的消息转发到所有已连接的客户端。
因此,在设计服务端代码时,需要考虑如何保存Tcp连接实例。
在Boost.Asio中给出的例子中一共设计了3个类(实际上是4个,其中一个作为虚基类)。
1. Server
2. Room
3. Session
Server类对象异步的接收连接,它包含一个Room类对象引用,将管理所有的连接(Session对象的shared_ptr)当Server收到消息之后,将向所有的Session(连接)转发消息。
#include <cstdlib> #include <deque> #include <iostream> #include <list> #include <memory> #include <set> #include <utility> #include <boost/asio.hpp> #include "chat_message.hpp" using boost::asio::ip::tcp; typedef std::deque<chat_message> chat_message_queue; class chat_participant { public: virtual ~chat_participant() {} virtual void deliver(const chat_message& msg) = 0; //participant should deliver message }; typedef std::shared_ptr<chat_participant> chat_participant_ptr;//shared ptr class chat_room { public: void join(chat_participant_ptr participant) { participants_.insert(participant);//add a client //将之前的消息写一遍给最新的连接者 for (auto msg: recent_msgs_) participant->deliver(msg); } void leave(chat_participant_ptr participant) { participants_.erase(participant);//remove a client } void deliver(const chat_message& msg) { //先将发送的消息放入待发送列表。 recent_msgs_.push_back(msg); while (recent_msgs_.size() > max_recent_msgs) recent_msgs_.pop_front();//delete expired message for (auto participant: participants_) participant->deliver(msg);//将消息向所有的对象转发一遍 } private: std::set<chat_participant_ptr> participants_;//all client enum { max_recent_msgs = 100 }; chat_message_queue recent_msgs_; };//chat room class chat_session: public chat_participant, public std::enable_shared_from_this<chat_session> { public: chat_session(tcp::socket socket, chat_room& room) : socket_(std::move(socket)), room_(room) { } void start() { //收到连接之后首先将客户加入到room中 // room_.join(shared_from_this());//add a participant do_read_header();//读消息 } void deliver(const chat_message& msg) { // bool write_in_progress = !write_msgs_.empty();//空 write_msgs_.push_back(msg);//会将消息先放到write_msgs_中 if (!write_in_progress) { //write message do_write(); } } private: void do_read_header() { auto self(shared_from_this()); boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.data(), chat_message::header_length),//data,length [this, self](boost::system::error_code ec, std::size_t /*length*/)//lambada { if (!ec && read_msg_.decode_header()) { do_read_body();// read body } else { auto ep_=socket_.remote_endpoint(); std::cout<<"client : "<<ep_.port()<<" leave this room"<<std::endl; room_.leave(shared_from_this());//read a eof } }); } void do_read_body() { auto self(shared_from_this()); boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { room_.deliver(read_msg_);//deliver将发送消息,刚连接进来的客户将收到历史消息 do_read_header();// } else { room_.leave(shared_from_this()); } }); } void do_write() { auto self(shared_from_this()); boost::asio::async_write(socket_,//当前session的socket boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { std::string msg(write_msgs_.front().data(),write_msgs_.front().length()); std::cout<<msg<<std::endl; write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { room_.leave(shared_from_this()); } }); } tcp::socket socket_; chat_room& room_; chat_message read_msg_; chat_message_queue write_msgs_; };//seesion class chat_server { public: //constructor chat_server(boost::asio::io_service& io_service, const tcp::endpoint& endpoint) : acceptor_(io_service, endpoint),//listen fd socket_(io_service)//conn fd { do_accept(); } private: void do_accept() { //a new connection acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) { auto ep_=socket_.remote_endpoint(); std::cout<<"client : "<<ep_.port()<<" enter this room"<<std::endl; std::make_shared<chat_session>(std::move(socket_), room_)->start();//session } do_accept(); }); } tcp::acceptor acceptor_; tcp::socket socket_; //server should keep all client connected chat_room room_;//chat_room class//保存所有的client }; int main(int argc, char* argv[]) { try { #if 0 if (argc < 2) { std::cerr << "Usage: chat_server <port> [<port> ...]\n"; return 1; } #endif boost::asio::io_service io_service; std::list<chat_server> servers; // for (int i = 1; i < argc; ++i) // { tcp::endpoint endpoint(tcp::v4(), 1024);//listen 1024 port servers.emplace_back(io_service, endpoint);//constructor // } io_service.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }
2.客户端
客户端就比较简单了。首先它需要两个线程:
1.一个线程用来读取标准输入
2.一个线程执行Proactor(io_service::run)
客户端读取数据后,进行简单的编码(消息格式为固定4个字节的header和body),然后发起异步连接,写入网络套接字。
#include <cstdlib> #include <deque> #include <iostream> #include <thread> #include <boost/asio.hpp> #include "chat_message.hpp" using boost::asio::ip::tcp; typedef std::deque<chat_message> chat_message_queue; class chat_client { public: chat_client(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator) : io_service_(io_service), socket_(io_service)//初始化 { do_connect(endpoint_iterator); } void write(const chat_message& msg) { io_service_.post(//发起异步事件 [this, msg]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { do_write(); } }); } void close() { io_service_.post([this]() { socket_.close(); }); } private: void do_connect(tcp::resolver::iterator endpoint_iterator) { //发起连接 boost::asio::async_connect(socket_, endpoint_iterator, [this](boost::system::error_code ec, tcp::resolver::iterator) { if (!ec) { do_read_header();//读取消息 } }); } void do_read_header() { boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.data(), chat_message::header_length), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec && read_msg_.decode_header()) { do_read_body(); } else { socket_.close(); } }); } void do_read_body() { boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { std::cout.write(read_msg_.body(), read_msg_.body_length()); std::cout << "\n";//打印消息到屏幕 do_read_header(); } else { socket_.close(); } }); } void do_write() { boost::asio::async_write(socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { socket_.close(); } }); } private: boost::asio::io_service& io_service_; tcp::socket socket_; chat_message read_msg_; chat_message_queue write_msgs_; }; int main(int argc, char* argv[]) { //客户端需要两个线程 //主线程负责读取标准输入 //thread线程负责调度异步事件(io_service.run()) try { #if 0 if (argc != 3) { std::cerr << "Usage: chat_client <host> <port>\n"; return 1; } #endif boost::asio::io_service io_service; tcp::resolver resolver(io_service); auto endpoint_iterator = resolver.resolve({ "localhost", "1024" }); chat_client c(io_service, endpoint_iterator);//客户端 std::thread t([&io_service](){ io_service.run(); });//启动线程执行io_service.run() char line[chat_message::max_body_length + 1]; while (std::cin.getline(line, chat_message::max_body_length + 1)) { chat_message msg; msg.body_length(std::strlen(line));//获取header信息 std::memcpy(msg.body(), line, msg.body_length()); msg.encode_header();//消息编码 c.write(msg); } c.close(); t.join(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }
相关文章推荐
- boost.asio 学习笔记05——asio的windows实现
- boost.asio 学习笔记05 asio的windows实现
- boost.asio 学习笔记05——asio的windows实现
- boost.asio 学习笔记05——asio的windows实现
- Boost.Asio学习之同步echo服务器实现
- boost学习之asio—chat
- Boost.Asio学习之异步echo服务器实现
- boost::asio::ip::tcp实现网络通信的小例子
- [转]boost::asio::ip::tcp实现网络通信的小例子
- boost asio io_service学习笔记
- boost::asio::ip::tcp实现网络通信的小例子
- 利用golang实现与boost中asio相同的功能
- WCF 学习总结4 -- 用Duplex实现消息广播
- boost asio io_service学习笔记
- boost asio 学习总结之 io_service
- boost.asio的跨平台实现
- boost::asio学习 - HTTP Server性能测试报告
- Boost Asio在Windows下设计及实现解析
- boost::asio学习 - HTTP Server性能测试报告
- boost::asio学习 - 常用方法总结