面向连接的Socket Server的简单实现
2015-09-25 20:56
441 查看
一、基本原理
有时候我们需要实现一个公共的模块,需要对多个其他的模块提供服务,最常用的方式就是实现一个Socket Server,接受客户的请求,并返回给客户结果。这经常涉及到如果管理多个连接及如何多线程的提供服务的问题,常用的方式就是连接池和线程池,基本流程如下:
首先服务器端有一个监听线程,不断监听来自客户端的连接。
当一个客户端连接到监听线程后,便建立了一个新的连接。
监听线程将新建立的连接放入连接池进行管理,然后继续监听新来的连接。
线程池中有多个服务线程,每个线程都监听一个任务队列,一个建立的连接对应一个服务任务,当服务线程发现有新的任务的时候,便用此连接向客户端提供服务。
一个Socket Server所能够提供的连接数可配置,如果超过配置的个数则拒绝新的连接。
当服务线程完成服务的时候,客户端关闭连接,服务线程关闭连接,空闲并等待处理新的任务。
连接池的监控线程清除其中关闭的连接对象,从而可以建立新的连接。
二、对Socket的封装
Socket的调用主要包含以下的步骤:调用比较复杂,我们首先区分两类Socket,一类是Listening Socket,一类是Connected Socket.
Listening Socket由MySocketServer负责,一旦accept,则生成一个Connected Socket,又MySocket负责。
MySocket主要实现的方法如下:
int MySocket::write(const char * buf, int length) { int ret = 0; int left = length; int index = 0; while(left > 0) { ret = send(m_socket, buf + index, left, 0); if(ret == 0) break; else if(ret == -1) { break; } left -= ret; index += ret; } if(left > 0) return -1; return 0; } |
int MySocket::read(char * buf, int length) { int ret = 0; int left = length; int index = 0; while(left > 0) { ret = recv(m_socket, buf + index, left, 0); if(ret == 0) break; else if(ret == -1) return -1; left -= ret; index += ret; } return index; } |
int MySocket::status() { int status; int ret; fd_set checkset; struct timeval timeout; FD_ZERO(&checkset); FD_SET(m_socket, &checkset); timeout.tv_sec = 10; timeout.tv_usec = 0; status = select((int)m_socket + 1, &checkset, 0, 0, &timeout); if(status < 0) ret = -1; else if(status == 0) ret = 0; else ret = 0; return ret; } |
int MySocket::close() { struct linger lin; lin.l_onoff = 1; lin.l_linger = 0; setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin)); ::close(m_socket); return 0; } |
int MySocketServer::init(int port) { if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1) { 4000 return -1; } struct sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(struct sockaddr_in)); serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1) { ::close(m_socket); return -1; } if(listen(m_socket, SOMAXCONN) == -1) { ::close(m_socket); return -1; } struct linger lin; lin.l_onoff = 1; lin.l_linger = 0; setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin)); m_port = port; m_inited = true; return 0; } |
MySocket * MySocketServer::accept() { int sock; struct sockaddr_in clientAddr; socklen_t clientAddrSize = sizeof(clientAddr); if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1) { return NULL; } MySocket* socket = new MySocket(sock); return socket; } |
MySocket * MySocketServer::accept(int timeout) { struct timeval timeout; timeout.tv_sec = timeout; timeout.tv_usec = 0; fd_set checkset; FD_ZERO(&checkset); FD_SET(m_socket, &checkset); int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout); if(status < 0) return NULL; else if(status == 0) return NULL; if(FD_ISSET(m_socket, &checkset)) { return accept(); } } |
三、线程池的实现
一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *> m_taskQueue;任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex
任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond
如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads
需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads
每个线程需要一个线程运行函数:
void * __thread_new_proc(void *p) { ((MyThread *)p)->run(); return 0; } |
int MyThread::start() { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setschedpolicy(&attr, SCHED_FIFO); int ret = pthread_create(&m_thread, &attr, thread_func, args); pthread_attr_destroy(&attr); if(ret != 0) return –1; } |
int MyThread::stop() { int ret = pthread_kill(m_thread, SIGINT); if(ret != 0) return –1; } |
int MyThread::join() { int ret = pthread_join(m_thread, NULL); if(ret != 0) return –1; } |
void MyThread::run() { while (false == m_bStop) { MyTask *pTask = m_threadPool->getNextTask(); if (NULL != pTask) { pTask->process(); } } } |
int MyThreadPool::init() { pthread_condattr_t cond_attr; pthread_condattr_init(&cond_attr); pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); int ret = pthread_cond_init(&m_cond, &cond_attr); pthread_condattr_destroy(&cond_attr); if (ret_val != 0) return –1; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); ret = pthread_mutex_init(&m_queueMutex, &attr); pthread_mutexattr_destroy(&attr); if (ret_val != 0) return –1; for (int i = 0; i< m_poolSize; ++i) { MyThread *thread = new MyThread(i+1, this); m_threads.push_back(thread); } return 0; } |
int MyThreadPool::start() { int ret; for (int i = 0; i< m_poolSize; ++i) { ret = m_threads[i]->start(); if (ret != 0) break; } ret = pthread_cond_broadcast(&m_cond); if(ret != 0) return –1; return 0; } |
void MyThreadPool::addTask(MyTask *ptask) { if (NULL == ptask) return; pthread_mutex_lock(&m_queueMutex); m_taskQueue.push_back(ptask); if (m_waitingThreadCount > 0) pthread_cond_signal(&m_cond); pthread_mutex_unlock(&m_queueMutex); } |
MyTask * MyThreadPool::getNextTask() { MyTask *pTask = NULL; pthread_mutex_lock(&m_queueMutex); while (m_taskQueue.begin() == m_taskQueue.end()) { ++m_waitingThreadCount; pthread_cond_wait(&n_cond, &m_queueMutex); --m_waitingThreadCount; } pTask = m_taskQueue.front(); m_taskQueue.pop_front(); pthread_mutex_unlock(&m_queueMutex); return pTask; } |
void MyTask::process() { //用read从客户端读取指令 //对指令进行处理 //用write向客户端写入结果 } |
四、连接池的实现
每个连接池保存一个链表保存已经建立的连接:list<MyConnection *> * m_connections当然这个链表也需要锁来进行多线程保护:pthread_mutex_t m_connectionMutex;
此处一个MyConnection也是一个MyTask,由一个线程来负责。
线程池也作为连接池的成员变量:MyThreadPool * m_threadPool
连接池由类MyConnectionPool负责,其主要函数如下:
void MyConnectionPool::addConnection(MyConnection * pConn) { pthread_mutex_lock(&m_connectionMutex); m_connections->push_back(pConn); pthread_mutex_unlock(&m_connectionMutex); m_threadPool->addTask(pConn); } |
void MyConnectionPool::managePool() { pthread_mutex_lock(&m_connectionMutex); for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); ) { MyConnection *conn = *itr; if (conn->isFinish()) { delete conn; conn = NULL; list<MyConnection *>::iterator pos = itr++; m_connections->erase(pos); } else if (conn->isError()) { //处理错误的连接 ++itr; } else { ++itr; } } pthread_mutex_unlock(&m_connectionMutex); } |
五、监听线程的实现
监听线程需要有一个MySocketServer来监听客户端的连接,每当形成一个新的连接,查看是否超过设置的最大连接数,如果超过则关闭连接,如果未超过设置的最大连接数,则形成一个新的MyConnection,将其加入连接池和线程池。MySocketServer *pServer = new MySocketServer(port); MyConnectionPool *pPool = new MyConnectionPool(); while (!stopFlag) { MySocket * sock = pServer->acceptConnection(5); if(sock != null) { if(m_connections.size > maxConnectionSize) { sock.close(); } MyTask *pTask = new MyConnection(); pPool->addConnection(pTask); } } |