您的位置:首页 > 其它

Boost.Asio学习之实现广播ChatRoom

2017-01-10 20:58 399 查看
见:http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/examples/cpp11_examples.html

或者:https://github.com/NearXdu/AsioLearn

1.服务端

对于一个广播聊天室来说。

服务端能够接受连接,并且将收到的消息转发到所有已连接的客户端。

因此,在设计服务端代码时,需要考虑如何保存Tcp连接实例。

在Boost.Asio中给出的例子中一共设计了3个类(实际上是4个,其中一个作为虚基类)。

1. Server

2. Room

3. Session

Server类对象异步的接收连接,它包含一个Room类对象引用,将管理所有的连接(Session对象的shared_ptr)当Server收到消息之后,将向所有的Session(连接)转发消息。

#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>
#include <boost/asio.hpp>
#include "chat_message.hpp"
using boost::asio::ip::tcp;

typedef std::deque<chat_message> chat_message_queue;

class chat_participant
{
public:
virtual ~chat_participant() {}
virtual void deliver(const chat_message& msg) = 0; //participant should deliver message
};

typedef std::shared_ptr<chat_participant> chat_participant_ptr;//shared ptr

class chat_room
{
public:
void join(chat_participant_ptr participant)
{
participants_.insert(participant);//add a client
//将之前的消息写一遍给最新的连接者
for (auto msg: recent_msgs_)
participant->deliver(msg);
}

void leave(chat_participant_ptr participant)
{
participants_.erase(participant);//remove a client
}

void deliver(const chat_message& msg)
{
//先将发送的消息放入待发送列表。
recent_msgs_.push_back(msg);
while (recent_msgs_.size() > max_recent_msgs)
recent_msgs_.pop_front();//delete expired message

for (auto participant: participants_)
participant->deliver(msg);//将消息向所有的对象转发一遍
}

private:
std::set<chat_participant_ptr> participants_;//all client
enum { max_recent_msgs = 100 };
chat_message_queue recent_msgs_;
};//chat room

class chat_session:
public chat_participant,
public std::enable_shared_from_this<chat_session>
{
public:
chat_session(tcp::socket socket, chat_room& room)
: socket_(std::move(socket)),
room_(room)
{
}
void start()
{
//收到连接之后首先将客户加入到room中
//
room_.join(shared_from_this());//add a participant

do_read_header();//读消息
}
void deliver(const chat_message& msg)
{
//
bool write_in_progress = !write_msgs_.empty();//空
write_msgs_.push_back(msg);//会将消息先放到write_msgs_中
if (!write_in_progress)
{
//write message
do_write();
}
}
private:
void do_read_header()
{
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),//data,length
[this, self](boost::system::error_code ec, std::size_t /*length*/)//lambada
{
if (!ec && read_msg_.decode_header())
{
do_read_body();// read body
}
else
{
auto ep_=socket_.remote_endpoint();
std::cout<<"client : "<<ep_.port()<<" leave this room"<<std::endl;

room_.leave(shared_from_this());//read a eof
}
});
}

void do_read_body()
{
auto self(shared_from_this());
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
room_.deliver(read_msg_);//deliver将发送消息,刚连接进来的客户将收到历史消息
do_read_header();//
}
else
{
room_.leave(shared_from_this());
}
});
}

void do_write()
{
auto self(shared_from_this());
boost::asio::async_write(socket_,//当前session的socket
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
std::string msg(write_msgs_.front().data(),write_msgs_.front().length());
std::cout<<msg<<std::endl;
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
room_.leave(shared_from_this());
}
});
}

tcp::socket socket_;
chat_room& room_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};//seesion

class chat_server
{
public:
//constructor
chat_server(boost::asio::io_service& io_service,
const tcp::endpoint& endpoint)
: acceptor_(io_service, endpoint),//listen fd
socket_(io_service)//conn fd
{
do_accept();
}

private:
void do_accept()
{
//a new connection
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
auto ep_=socket_.remote_endpoint();
std::cout<<"client : "<<ep_.port()<<" enter this room"<<std::endl;
std::make_shared<chat_session>(std::move(socket_), room_)->start();//session

}

do_accept();
});
}

tcp::acceptor acceptor_;
tcp::socket socket_;
//server should keep all client connected
chat_room room_;//chat_room class//保存所有的client
};

int main(int argc, char* argv[])
{
try
{
#if 0
if (argc < 2)
{
std::cerr << "Usage: chat_server <port> [<port> ...]\n";
return 1;
}
#endif

boost::asio::io_service io_service;

std::list<chat_server> servers;
//  for (int i = 1; i < argc; ++i)
//  {
tcp::endpoint endpoint(tcp::v4(), 1024);//listen 1024 port
servers.emplace_back(io_service, endpoint);//constructor
//  }

io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}


2.客户端

客户端就比较简单了。

首先它需要两个线程:

1.一个线程用来读取标准输入

2.一个线程执行Proactor(io_service::run)

客户端读取数据后,进行简单的编码(消息格式为固定4个字节的header和body),然后发起异步连接,写入网络套接字。

#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "chat_message.hpp"

using boost::asio::ip::tcp;

typedef std::deque<chat_message> chat_message_queue;

class chat_client
{
public:
chat_client(boost::asio::io_service& io_service,
tcp::resolver::iterator endpoint_iterator)
: io_service_(io_service),
socket_(io_service)//初始化
{
do_connect(endpoint_iterator);
}

void write(const chat_message& msg)
{
io_service_.post(//发起异步事件
[this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
});
}

void close()
{
io_service_.post([this]() { socket_.close(); });
}

private:
void do_connect(tcp::resolver::iterator endpoint_iterator)
{
//发起连接
boost::asio::async_connect(socket_, endpoint_iterator,
[this](boost::system::error_code ec, tcp::resolver::iterator)
{
if (!ec)
{
do_read_header();//读取消息
}
});
}

void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
socket_.close();
}
});
}

void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
std::cout.write(read_msg_.body(), read_msg_.body_length());
std::cout << "\n";//打印消息到屏幕
do_read_header();
}
else
{
socket_.close();
}
});
}

void do_write()
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
socket_.close();
}
});
}

private:
boost::asio::io_service& io_service_;
tcp::socket socket_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};

int main(int argc, char* argv[])
{
//客户端需要两个线程
//主线程负责读取标准输入
//thread线程负责调度异步事件(io_service.run())
try
{
#if 0
if (argc != 3)
{
std::cerr << "Usage: chat_client <host> <port>\n";
return 1;
}
#endif

boost::asio::io_service io_service;

tcp::resolver resolver(io_service);
auto endpoint_iterator = resolver.resolve({ "localhost", "1024" });
chat_client c(io_service, endpoint_iterator);//客户端

std::thread t([&io_service](){ io_service.run(); });//启动线程执行io_service.run()

char line[chat_message::max_body_length + 1];
while (std::cin.getline(line, chat_message::max_body_length + 1))
{
chat_message msg;
msg.body_length(std::strlen(line));//获取header信息
std::memcpy(msg.body(), line, msg.body_length());
msg.encode_header();//消息编码
c.write(msg);
}

c.close();
t.join();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: