您的位置:首页 > 其它

四个服务器设计模型

2015-06-14 11:58 381 查看
五个简单TCP协议(MuduoManual.pdf P50)

muduo库网络模型使用示例(sudoku求解服务器MuduoManual.pdf P35 )

reactor(一个IO线程)

reactor + threadpool (一个IO + 多个计算线程)

multiple reactor (多个IO线程)

one loop per thread + thread pool (多个IO线程 + 计算线程池)

网络编程关注4个半事件:

连接建立

连接断开

消息到达

信息发送完毕(对于低流量的服务来说,通常不需要关注该事件)

如何实现server

1 提供一个xxxServer类

2 在该类中包含一个TcpServer对象

注册一些事件

OnConnection

OnMessage

OnWriteComplete

TcpConnection::shutdown () 内部实现,只关闭写入这一半

--------------------------------------

----------------------------------------

下面的程序都是用来解 数独 的,数独的实现如下

sudoku.h

[cpp]
view plaincopyprint?

#ifndef MUDUO_EXAMPLES_SUDOKU_SUDOKU_H
#define MUDUO_EXAMPLES_SUDOKU_SUDOKU_H

#include <muduo/base/Types.h>

// FIXME, use (const char*, len) for saving memory copying.
muduo::string solveSudoku(const muduo::string& puzzle);
const int kCells = 81;

#endif

sudoku.cc

[cpp]
view plaincopyprint?

#include "sudoku.h"

#include <vector>
#include <assert.h>
#include <string.h>

using namespace muduo;

struct Node;
typedef Node Column;
struct Node
{
Node* left;
Node* right;
Node* up;
Node* down;
Column* col;
int name;
int size;
};

const int kMaxNodes = 1 + 81*4 + 9*9*9*4;
const int kMaxColumns = 400;
const int kRow = 100, kCol = 200, kBox = 300;

class SudokuSolver
{
public:
SudokuSolver(int board[kCells])
: inout_(board),
cur_node_(0)
{
stack_.reserve(100);

root_ = new_column();
root_->left = root_->right = root_;
memset(columns_, 0, sizeof(columns_));

bool rows[kCells][10] = { {false} };
bool cols[kCells][10] = { {false} };
bool boxes[kCells][10] = { {false} };

for (int i = 0; i < kCells; ++i) {
int row = i / 9;
int col = i % 9;
int box = row/3*3 + col/3;
int val = inout_[i];
rows[row][val] = true;
cols[col][val] = true;
boxes[box][val] = true;
}

for (int i = 0; i < kCells; ++i) {
if (inout_[i] == 0) {
append_column(i);
}
}

for (int i = 0; i < 9; ++i) {
for (int v = 1; v < 10; ++v) {
if (!rows[i][v])
append_column(get_row_col(i, v));
if (!cols[i][v])
append_column(get_col_col(i, v));
if (!boxes[i][v])
append_column(get_box_col(i, v));
}
}

for (int i = 0; i < kCells; ++i) {
if (inout_[i] == 0) {
int row = i / 9;
int col = i % 9;
int box = row/3*3 + col/3;
//int val = inout[i];
for (int v = 1; v < 10; ++v) {
if (!(rows[row][v] || cols[col][v] || boxes[box][v])) {
Node* n0 = new_row(i);
Node* nr = new_row(get_row_col(row, v));
Node* nc = new_row(get_col_col(col, v));
Node* nb = new_row(get_box_col(box, v));
put_left(n0, nr);
put_left(n0, nc);
put_left(n0, nb);
}
}
}
}
}

bool solve()
{
if (root_->left == root_) {
for (size_t i = 0; i < stack_.size(); ++i) {
Node* n = stack_[i];
int cell = -1;
int val = -1;
while (cell == -1 || val == -1) {
if (n->name < 100)
cell = n->name;
else
val = n->name % 10;
n = n->right;
}

//assert(cell != -1 && val != -1);
inout_[cell] = val;
}
return true;
}

Column* const col = get_min_column();
cover(col);
for (Node* row = col->down; row != col; row = row->down) {
stack_.push_back(row);
for (Node* j = row->right; j != row; j = j->right) {
cover(j->col);
}
if (solve()) {
return true;
}
stack_.pop_back();
for (Node* j = row->left; j != row; j = j->left) {
uncover(j->col);
}
}
uncover(col);
return false;
}

private:

Column* root_;
int* inout_;
Column* columns_[400];
std::vector<Node*> stack_;
Node nodes_[kMaxNodes];
int cur_node_;

Column* new_column(int n = 0)
{
assert(cur_node_ < kMaxNodes);
Column* c = &nodes_[cur_node_++];
memset(c, 0, sizeof(Column));
c->left = c;
c->right = c;
c->up = c;
c->down = c;
c->col = c;
c->name = n;
return c;
}

void append_column(int n)
{
assert(columns_
== NULL);

Column* c = new_column(n);
put_left(root_, c);
columns_
= c;
}

Node* new_row(int col)
{
assert(columns_[col] != NULL);
assert(cur_node_ < kMaxNodes);

Node* r = &nodes_[cur_node_++];

//Node* r = new Node;
memset(r, 0, sizeof(Node));
r->left = r;
r->right = r;
r->up = r;
r->down = r;
r->name = col;
r->col = columns_[col];
put_up(r->col, r);
return r;
}

int get_row_col(int row, int val)
{
return kRow+row*10+val;
}

int get_col_col(int col, int val)
{
return kCol+col*10+val;
}

int get_box_col(int box, int val)
{
return kBox+box*10+val;
}

Column* get_min_column()
{
Column* c = root_->right;
int min_size = c->size;
if (min_size > 1) {
for (Column* cc = c->right; cc != root_; cc = cc->right) {
if (min_size > cc->size) {
c = cc;
min_size = cc->size;
if (min_size <= 1)
break;
}
}
}
return c;
}

void cover(Column* c)
{
c->right->left = c->left;
c->left->right = c->right;
for (Node* row = c->down; row != c; row = row->down) {
for (Node* j = row->right; j != row; j = j->right) {
j->down->up = j->up;
j->up->down = j->down;
j->col->size--;
}
}
}

void uncover(Column* c)
{
for (Node* row = c->up; row != c; row = row->up) {
for (Node* j = row->left; j != row; j = j->left) {
j->col->size++;
j->down->up = j;
j->up->down = j;
}
}
c->right->left = c;
c->left->right = c;
}

void put_left(Column* old, Column* nnew)
{
nnew->left = old->left;
nnew->right = old;
old->left->right = nnew;
old->left = nnew;
}

void put_up(Column* old, Node* nnew)
{
nnew->up = old->up;
nnew->down = old;
old->up->down = nnew;
old->up = nnew;
old->size++;
nnew->col = old;
}
};

string solveSudoku(const string& puzzle)
{
assert(puzzle.size() == implicit_cast<size_t>(kCells));

string result = "NoSolution";

int board[kCells] = { 0 };
bool valid = true;
for (int i = 0; i < kCells; ++i)
{
board[i] = puzzle[i] - '0';
valid = valid && (0 <= board[i] && board[i] <= 9);
}

if (valid)
{
SudokuSolver s(board);
if (s.solve())
{
result.clear();
result.resize(kCells);
for (int i = 0; i < kCells; ++i)
{
result[i] = static_cast<char>(board[i] + '0');
}
}
}
return result;
}

reactor模型

只有一个IO线程:

这个IO线程既负责listenfd也负责connfd

[cpp]
view plaincopyprint?

#include "sudoku.h"

#include <muduo/base/Atomic.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpServer.h>

#include <boost/bind.hpp>

#include <utility>

#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class SudokuServer
{
public:
SudokuServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "SudokuServer"),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
}

void start()
{
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
LOG_DEBUG << conn->name();
size_t len = buf->readableBytes();
while (len >= kCells + 2)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
string request(buf->peek(), crlf);
buf->retrieveUntil(crlf + 2);
len = buf->readableBytes();
if (!processRequest(conn, request))
{
conn->send("Bad Request!\r\n");
conn->shutdown();
break;
}
}
else if (len > 100) // id + ":" + kCells + "\r\n"
{
conn->send("Id too long!\r\n");
conn->shutdown();
break;
}
else
{
break;
}
}
}

bool processRequest(const TcpConnectionPtr& conn, const string& request)
{
string id;
string puzzle;
bool goodRequest = true;

string::const_iterator colon = find(request.begin(), request.end(), ':');
if (colon != request.end())
{
id.assign(request.begin(), colon);
puzzle.assign(colon+1, request.end());
}
else
{
puzzle = request;
}

if (puzzle.size() == implicit_cast<size_t>(kCells))
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle);
if (id.empty())
{
conn->send(result+"\r\n");
}
else
{
conn->send(id+":"+result+"\r\n");
}
}
else
{
goodRequest = false;
}
return goodRequest;
}

EventLoop* loop_;
TcpServer server_;
Timestamp startTime_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
EventLoop loop;
InetAddress listenAddr(9981);
SudokuServer server(&loop, listenAddr);

server.start();

loop.loop();
}

multiple reactor

IO线程的数目多个

EventLoopThreadPoll IO线程池

直接设置server_.setThreadNum(numThreads)就OK了


main reactor 负责listenfd accept

sub reactor 负责connfd

使用roundbin轮叫策略

来一个连接,就选择下一个EventLoop,这样让多个连接分配给若干个EventLoop来处理,

而每个EventLoop属于一个IO线程,也就意味着,多个连接分配给若干个IO线程来处理。


[cpp]
view plaincopyprint?

include "sudoku.h"

#include <muduo/base/Atomic.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpServer.h>

#include <boost/bind.hpp>

#include <utility>

#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class SudokuServer
{
public:
SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
: loop_(loop),
server_(loop, listenAddr, "SudokuServer"),
numThreads_(numThreads),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(numThreads);
}

void start()
{
LOG_INFO << "starting " << numThreads_ << " threads.";
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
LOG_DEBUG << conn->name();
size_t len = buf->readableBytes();
while (len >= kCells + 2)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
string request(buf->peek(), crlf);
buf->retrieveUntil(crlf + 2);
len = buf->readableBytes();
if (!processRequest(conn, request))
{
conn->send("Bad Request!\r\n");
conn->shutdown();
break;
}
}
else if (len > 100) // id + ":" + kCells + "\r\n"
{
conn->send("Id too long!\r\n");
conn->shutdown();
break;
}
else
{
break;
}
}
}

bool processRequest(const TcpConnectionPtr& conn, const string& request)
{
string id;
string puzzle;
bool goodRequest = true;

string::const_iterator colon = find(request.begin(), request.end(), ':');
if (colon != request.end())
{
id.assign(request.begin(), colon);
puzzle.assign(colon+1, request.end());
}
else
{
puzzle = request;
}

if (puzzle.size() == implicit_cast<size_t>(kCells))
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle);
if (id.empty())
{
conn->send(result+"\r\n");
}
else
{
conn->send(id+":"+result+"\r\n");
}
}
else
{
goodRequest = false;
}
return goodRequest;
}

EventLoop* loop_;
TcpServer server_;
int numThreads_;
Timestamp startTime_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
int numThreads = 0;
if (argc > 1)
{
numThreads = atoi(argv[1]);
}
EventLoop loop;
InetAddress listenAddr(9981);
SudokuServer server(&loop, listenAddr, numThreads);

server.start();

loop.loop();
}

reactor + threadPool

一个IO线程,多个计算thread的模式

EventLoop + threadpool + numThreads_

[cpp]
view plaincopyprint?

include "sudoku.h"

#include <muduo/base/Atomic.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/base/ThreadPool.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpServer.h>

#include <boost/bind.hpp>

#include <utility>

#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class SudokuServer
{
public:
SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
: loop_(loop),
server_(loop, listenAddr, "SudokuServer"),
numThreads_(numThreads),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
}

void start()
{
LOG_INFO << "starting " << numThreads_ << " threads.";
threadPool_.start(numThreads_);//线程池的线程个数
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
LOG_DEBUG << conn->name();
size_t len = buf->readableBytes();
while (len >= kCells + 2)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
string request(buf->peek(), crlf);
buf->retrieveUntil(crlf + 2);
len = buf->readableBytes();
if (!processRequest(conn, request))
{
conn->send("Bad Request!\r\n");
conn->shutdown();
break;
}
}
else if (len > 100) // id + ":" + kCells + "\r\n"
{
conn->send("Id too long!\r\n");
conn->shutdown();
break;
}
else
{
break;
}
}
}

bool processRequest(const TcpConnectionPtr& conn, const string& request)
{
string id;
string puzzle;
bool goodRequest = true;

string::const_iterator colon = find(request.begin(), request.end(), ':');
if (colon != request.end())
{
id.assign(request.begin(), colon);
puzzle.assign(colon+1, request.end());
}
else
{
puzzle = request;
}
/*计算线程中的线程进行处理*/
if (puzzle.size() == implicit_cast<size_t>(kCells))
{
threadPool_.run(boost::bind(&solve, conn, puzzle, id));
}
else
{
goodRequest = false;
}
return goodRequest;
}

static void solve(const TcpConnectionPtr& conn,
const string& puzzle,
const string& id)
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle);
/*这里处理完数据后,conn->send() 还是在IO线程中发送,而不是
在计算线程中处理
*/
if (id.empty())
{
conn->send(result+"\r\n");
}
else
{
conn->send(id+":"+result+"\r\n");
}
}

EventLoop* loop_;
TcpServer server_;
ThreadPool threadPool_; //计算线程池
int numThreads_;
Timestamp startTime_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
int numThreads = 0;
if (argc > 1)
{
numThreads = atoi(argv[1]);
}
EventLoop loop;
InetAddress listenAddr(9981);
SudokuServer server(&loop, listenAddr, numThreads);

server.start();

loop.loop();
}

4 、multiple reactors+threadpool

EventLoopThreadPoll + threadpool + IOnumThreads_ + ThreadPoolnumThreads_



include "sudoku.h"

#include <muduo/base/Atomic.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/base/ThreadPool.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpServer.h>

#include <boost/bind.hpp>

#include <utility>

#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class SudokuServer
{
public:
SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)
: loop_(loop),
server_(loop, listenAddr, "SudokuServer"),
numThreads_(numThreads),
startTime_(Timestamp::now())
{
server_.setConnectionCallback(
boost::bind(&SudokuServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(numThreads);//IO线程池的初始化
}

void start()
{
LOG_INFO << "starting " << numThreads_ << " threads.";
threadPool_.start(numThreads_);
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
LOG_DEBUG << conn->name();
size_t len = buf->readableBytes();
while (len >= kCells + 2)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
string request(buf->peek(), crlf);
buf->retrieveUntil(crlf + 2);
len = buf->readableBytes();
if (!processRequest(conn, request))
{
conn->send("Bad Request!\r\n");
conn->shutdown();
break;
}
}
else if (len > 100) // id + ":" + kCells + "\r\n"
{
conn->send("Id too long!\r\n");
conn->shutdown();
break;
}
else
{
break;
}
}
}

bool processRequest(const TcpConnectionPtr& conn, const string& request)
{
string id;
string puzzle;
bool goodRequest = true;

string::const_iterator colon = find(request.begin(), request.end(), ':');
if (colon != request.end())
{
id.assign(request.begin(), colon);
puzzle.assign(colon+1, request.end());
}
else
{
puzzle = request;
}
/*计算线程中的线程进行处理*/
if (puzzle.size() == implicit_cast<size_t>(kCells))
{
threadPool_.run(boost::bind(&solve, conn, puzzle, id));
}
else
{
goodRequest = false;
}
return goodRequest;
}

static void solve(const TcpConnectionPtr& conn,
const string& puzzle,
const string& id)
{
LOG_DEBUG << conn->name();
string result = solveSudoku(puzzle);
/*这里处理完数据后,conn->send() 还是在IO线程中发送,而不是
在计算线程中处理
*/
if (id.empty())
{
conn->send(result+"\r\n");
}
else
{
conn->send(id+":"+result+"\r\n");
}
}

EventLoop* loop_;
TcpServer server_;
ThreadPool threadPool_;
int numThreads_;
Timestamp startTime_;
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
int numThreads = 0;
if (argc > 1)
{
numThreads = atoi(argv[1]);
}
EventLoop loop;
InetAddress listenAddr(9981);
SudokuServer server(&loop, listenAddr, numThreads);

server.start();

loop.loop();
}

--------------------------------------------------

sudoKu 求解服务器,既是一个IO密集型,又是一个计算密集型的服务

IO线程池 + 计算线程池

计算时间如果比较久,就会使得IO线程阻塞,IO线程很快就用尽,就不处理大量的并发连接

一个IO线程+计算线程池

使用muduo 库来编程还是比较容易的,有兴趣的朋友可以试一下!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: