网络安全传输系统(4)-线程池优化
2017-06-27 13:03
597 查看
1、线程池原理
在客户机/服务器模型中,对每个客户机的连接都创建一个线程来为期服务,这种方式好像没什么不妥之处。但是当我们的任务需要大量进行大量线程的创建和销毁操作时,这个消耗就会变成的相当大。比如说每秒中需要创建及销毁10000个线程,这对服务器来说压力会非常大,同时如果处理时间很短,以至于创建和销毁的时间远大于处理的时候,这就非常不划算了。因此这里引入线程池的概念:线程池就是有一堆已经创建好了的线程,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中,以供后面的任务使用,当池子里的线程全都处理忙碌状态时,这时任务需要稍作等待。
线程池的好处就在于线程复用,一个任务处理完成后,当前线程可以直接处理下一个任务,而不是销毁后再创建,非常适用于连续产生大量并发任务的场合。
2、线程池的实现
先采用一个结构体来描述线程池/*线程池结构*/ typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; /*链表结构,线程池中所有等待任务*/ Cthread_task *queue_head; /*是否销毁线程池*/ int shutdown; /*存放线程id的指针*/ pthread_t *threadid; /*线程池中线程数目*/ int max_thread_num; /*当前等待的任务数*/ int cur_task_size; } Cthread_pool;这里采用一个链表来保存线程池中等待的任务,当有新任务加入则唤醒一个线程,取下头结点的任务,然后开始工作。如果当前没有任务,则所有的线程都在睡觉,等待新任务加入然后被唤醒。
每个任务也采用一个结构体来保存:
typedef struct task { //任务需要执行的函数 void *(*process) (void *arg); //执行函数的参数 void *arg; //下一个任务的地址 struct task *next; } Cthread_task;
对线程池进行初始化,主要完成对参数的初始化和创建线程,在线程创建时需要调用下面的线程运行函数:
static Cthread_pool *pool = NULL; void pool_init (int max_thread_num) { int i = 0; pool = (Cthread_pool *) malloc (sizeof (Cthread_pool)); pthread_mutex_init (&(pool->queue_lock), NULL); /*初始化条件变量*/ pthread_cond_init (&(pool->queue_ready), NULL); //没有任务,头结点为空 pool->queue_head = NULL; //最大线程个数 pool->max_thread_num = max_thread_num; //现在任务为0 pool->cur_task_size = 0; //线程池开始工作 pool->shutdown = 0; //申请存放线程池id的数组 pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); for (i = 0; i < max_thread_num; i++) { //创建线程,线程属性为空,参数也设置为空 pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); } }
线程运行函数,线程运行函数编程遵循如下步骤:
如果当前没有任务,线程被阻塞,等待任务加入唤醒线程
如果有任务加入,线程会被唤醒,需要取下链表的头任务并对相应的参数做修改,注意这里需要加上互斥锁,最后运行任务函数。
如果线程池要销毁了,需要做相应的操作。
void * thread_routine (void *arg) { printf ("starting thread 0x%x\n", pthread_self ()); while (1) { //加上互斥锁 pthread_mutex_lock (&(pool->queue_lock)); //如果没有任务,则阻塞,等待被唤醒 while (pool->cur_task_size == 0 && !pool->shutdown) { printf ("thread 0x%x is waiting\n", pthread_self ()); pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); } /*线程池要销毁了*/ if (pool->shutdown) { /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ pthread_mutex_unlock (&(pool->queue_lock)); printf ("thread 0x%x will exit\n", pthread_self ()); pthread_exit (NULL); } printf ("thread 0x%x is starting to work\n", pthread_self ()); /*待处理任务减1,并取出链表中的头元素*/ pool->cur_task_size--; Cthread_task *task = pool->queue_head; pool->queue_head = task->next; //解锁 pthread_mutex_unlock (&(pool->queue_lock)); /*调用回调函数,执行任务*/ (*(task->process)) (task->arg); free (task); task = NULL; } /*这一句应该是不可达的*/ pthread_exit (NULL); }
往任务列表中加入一个任务,加入任务需要对当前任务分配一个任务节点并登记,然后加入任务列表,最后唤醒一个线程:
/*向线程池中加入任务*/ int pool_add_task (void *(*process) (void *arg), void *arg) { /*构造一个新任务*/ Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task)); task->process = process; task->arg = arg; task->next = NULL; pthread_mutex_lock (&(pool->queue_lock)); /*将任务加入到等待队列中*/ Cthread_task *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) member = member->next; member->next = task; } else { pool->queue_head = task; } pool->cur_task_size++; pthread_mutex_unlock (&(pool->queue_lock)); //唤醒一个线程 //加入 pthread_cond_signal (&(pool->queue_ready)); return 0; }
线程池销毁函数,线程池销毁时,需要等待正在运行的线程退出,然后把阻塞的线程唤醒,最后销毁线程和其他一些参数:
/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 把任务运行完后再退出*/ int pool_destroy () { if (pool->shutdown) return -1;/*防止两次调用*/ pool->shutdown = 1; /*唤醒所有等待线程,线程池要销毁了*/ pthread_cond_broadcast (&(pool->queue_ready)); /*阻塞等待线程退出,否则就成僵尸了*/ int i; for (i = 0; i < pool->max_thread_num; i++) pthread_join (pool->threadid[i], NULL); free (pool->threadid); /*销毁等待队列*/ Cthread_task *head = NULL; while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); } /*条件变量和互斥量也别忘了销毁*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free (pool); /*销毁后指针置空是个好习惯*/ pool=NULL; return 0; }
3、线程池编程实例
由于采用了线程,在编译的时候需要加上-lpthread#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
typedef struct task
{
//任务需要执行的函数
void *(*process) (void *arg);
//执行函数的参数
void *arg;
//下一个任务的地址
struct task *next;
} Cthread_task;
/*线程池结构*/
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
/*链表结构,线程池中所有等待任务*/
Cthread_task *queue_head;
/*是否销毁线程池*/
int shutdown;
/*存放线程id的指针*/
pthread_t *threadid;
/*线程池中线程数目*/
int max_thread_num;
/*当前等待的
4000
任务数*/
int cur_task_size;
} Cthread_pool;
void *thread_routine (void *arg);
static Cthread_pool *pool = NULL;
void pool_init (int max_thread_num)
{
int i = 0;
pool = (Cthread_pool *) malloc (sizeof (Cthread_pool));
pthread_mutex_init (&(pool->queue_lock), NULL);
/*初始化条件变量*/
pthread_cond_init (&(pool->queue_ready), NULL);
//没有任务,头结点为空
pool->queue_head = NULL;
//最大线程个数
pool->max_thread_num = max_thread_num;
//现在任务为0
pool->cur_task_size = 0;
//线程池开始工作
pool->shutdown = 0;
//申请存放线程池id的数组
pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
for (i = 0; i < max_thread_num; i++)
{
//创建线程,线程属性为空,参数也设置为空
pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);
}
}
/*向线程池中加入任务*/
int pool_add_task (void *(*process) (void *arg), void *arg)
{
/*构造一个新任务*/
Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task));
task->process = process;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock (&(pool->queue_lock));
/*将任务加入到等待队列中*/
Cthread_task *member = pool->queue_head;
if (member != NULL)
{
while (member->next != NULL)
member = member->next;
member->next = task;
}
else
{
pool->queue_head = task;
}
pool->cur_task_size++;
pthread_mutex_unlock (&(pool->queue_lock));
//唤醒一个线程
//加入
pthread_cond_signal (&(pool->queue_ready));
return 0;
}
/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 把任务运行完后再退出*/
int pool_destroy ()
{
if (pool->shutdown)
return -1;/*防止两次调用*/
pool->shutdown = 1;
/*唤醒所有等待线程,线程池要销毁了*/
pthread_cond_broadcast (&(pool->queue_ready));
/*阻塞等待线程退出,否则就成僵尸了*/
int i;
for (i = 0; i < pool->max_thread_num; i++)
pthread_join (pool->threadid[i], NULL);
free (pool->threadid);
/*销毁等待队列*/
Cthread_task *head = NULL;
while (pool->queue_head != NULL)
{
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free (head);
}
/*条件变量和互斥量也别忘了销毁*/
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
free (pool);
/*销毁后指针置空是个好习惯*/
pool=NULL;
return 0;
}
void * thread_routine (void *arg) { printf ("starting thread 0x%x\n", pthread_self ()); while (1) { //加上互斥锁 pthread_mutex_lock (&(pool->queue_lock)); //如果没有任务,则阻塞,等待被唤醒 while (pool->cur_task_size == 0 && !pool->shutdown) { printf ("thread 0x%x is waiting\n", pthread_self ()); pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); } /*线程池要销毁了*/ if (pool->shutdown) { /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ pthread_mutex_unlock (&(pool->queue_lock)); printf ("thread 0x%x will exit\n", pthread_self ()); pthread_exit (NULL); } printf ("thread 0x%x is starting to work\n", pthread_self ()); /*待处理任务减1,并取出链表中的头元素*/ pool->cur_task_size--; Cthread_task *task = pool->queue_head; pool->queue_head = task->next; //解锁 pthread_mutex_unlock (&(pool->queue_lock)); /*调用回调函数,执行任务*/ (*(task->process)) (task->arg); free (task); task = NULL; } /*这一句应该是不可达的*/ pthread_exit (NULL); }
void * myprocess (void *arg)
{
printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
sleep (1);/*休息一秒,延长任务的执行时间*/
return NULL;
}
int main (int argc, char **argv)
{
pool_init (3);/*线程池中最多三个活动线程*/
/*连续向池中投入10个任务*/
int *workingnum = (int *) malloc (sizeof (int) * 10);
int i;
for (i = 0; i < 10; i++)
{
workingnum[i] = i;
pool_add_task (myprocess, &workingnum[i]);
}
/*等待所有任务完成*/
sleep (5);
/*销毁线程池*/
pool_destroy ();
free (workingnum);
return 0;
}
4、改进后的服务器代码
#include<stdio.h> #include<string.h> #include<sys/socket.h> #include<sys/types.h> #include<netinet/in.h> #include<errno.h> #include<fcntl.h> #include<unistd.h> #include <openssl/err.h> #include <openssl/ssl.h> #include <pthread.h> #define port 3333 typedef struct task { void *(*process) (int arg); int arg; struct task *next; } Cthread_task; /*线程池结构*/ typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; /*链表结构,线程池中所有等待任务*/ Cthread_task *queue_head; /*是否销毁线程池*/ int shutdown; pthread_t *threadid; /*线程池中线程数目*/ int max_thread_num; /*当前等待的任务数*/ int cur_task_size; } Cthread_pool; static Cthread_pool *pool = NULL; void *thread_routine (void *arg); int sockfd; struct sockaddr_in sockaddr; struct sockaddr_in client_addr; int sin_size; SSL_CTX *ctx; void pool_init (int max_thread_num) { int i = 0; pool = (Cthread_pool *) malloc (sizeof (Cthread_pool)); pthread_mutex_init (&(pool->queue_lock), NULL); /*初始化条件变量*/ pthread_cond_init (&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_task_size = 0; pool->shutdown = 0; pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); for (i = 0; i < max_thread_num; i++) { pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); } } void * thread_routine (void *arg) { printf ("starting thread 0x%x\n", pthread_self ()); while (1) { pthread_mutex_lock (&(pool->queue_lock)); while (pool->cur_task_size == 0 && !pool->shutdown) { printf ("thread 0x%x is waiting\n", pthread_self ()); pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); } /*线程池要销毁了*/ if (pool->shutdown) { /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ pthread_mutex_unlock (&(pool->queue_lock)); printf ("thread 0x%x will exit\n", pthread_self ()); pthread_exit (NULL); } printf ("thread 0x%x is starting to work\n", pthread_self ()); /*待处理任务减1,并取出链表中的头元素*/ pool->cur_task_size--; Cthread_task *task = pool->queue_head; pool->queue_head = task->next; pthread_mutex_unlock (&(pool->queue_lock)); /*调用回调函数,执行任务*/ (*(task->process)) (task->arg); free (task); task = NULL; } /*这一句应该是不可达的*/ pthread_exit (NULL); } /*向线程池中加入任务*/ int pool_add_task (void *(*process) (int arg), int arg) { /*构造一个新任务*/ Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task)); task->process = process; task->arg = arg; task->next = NULL; pthread_mutex_lock (&(pool->queue_lock)); /*将任务加入到等待队列中*/ Cthread_task *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) member = member->next; member->next = task; } else { pool->queue_head = task; } pool->cur_task_size++; pthread_mutex_unlock (&(pool->queue_lock)); //唤醒一个线程 //加入 pthread_cond_signal (&(pool->queue_ready)); return 0; } void handle(char cmd,SSL *ssl) { char filename[30]={0}; int FileNameSize=0; int fd; int filesize=0; int count=0,totalrecv=0; char buf[1024]; struct stat fstat; switch(cmd) { case 'U': { //接收文件名 SSL_read(ssl, &FileNameSize, 4); SSL_read(ssl, (void *)filename, FileNameSize); filename[FileNameSize]='\0'; //创建文件 if((fd = open(filename,O_RDWR|O_CREAT)) == -1) { perror("creat:"); _exit(0); } //接收文件长度 SSL_read(ssl, &filesize, 4); //接收文件 while((count = SSL_read(ssl,(void *)buf,1024)) > 0) { write(fd,&buf,count); totalrecv += count; if(totalrecv == filesize) break; } //关闭文件 close(fd); } break; case 'D': { //接收文件名 SSL_read(ssl, &FileNameSize, 4); SSL_read(ssl, filename, FileNameSize); filename[FileNameSize]='\0'; //打开文件 if((fd = open(filename,O_RDONLY)) == -1) { perror("creat:"); _exit(0); } //发送文件长度和文件名 if((stat(filename,&fstat)) == -1) return; SSL_write(ssl,&fstat.st_size,4); while((count = read(fd,(void *)buf,1024)) > 0) { SSL_write(ssl,&buf,count); } close(fd); } break; } } void *myprocess(int args) { SSL *ssl; int tmp_fd = args; char cmd; //产生新的SSL ssl = SSL_new(ctx); SSL_set_fd(ssl,tmp_fd); SSL_accept(ssl); //处理事件 while(1) { SSL_read(ssl,&cmd,1); if(cmd == 'Q') { SSL_shutdown(ssl); SSL_free(ssl); close(tmp_fd); break; } else { handle(cmd,ssl); } } return NULL; } int main() { int newfd; //初始化线程池 pool_init(5); //建立连接 //SSL连接 SSL_library_init(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); ctx = SSL_CTX_new(SSLv23_server_method()); //载入数字证书 SSL_CTX_use_certificate_file(ctx,"./cacert.pem",SSL_FILETYPE_PEM); //载入私钥 SSL_CTX_use_PrivateKey_file(ctx,"./privkey.pem",SSL_FILETYPE_PEM); SSL_CTX_check_private_key(ctx); //创建socket if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1) { perror("socket:"); _exit(0); } memset(&sockaddr,0, sizeof(sockaddr)); sockaddr.sin_family = AF_INET; sockaddr.sin_port = htons(port); sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); //绑定地址 if(bind(sockfd,(struct sockaddr *)&sockaddr,sizeof(sockaddr)) == -1) { perror("bind:"); _exit(0); } //监听 if(listen(sockfd,10) == -1) { perror("listen"); } while(1) { //连接 if((newfd = accept(sockfd, (struct sockaddr *)(&client_addr),&sin_size)) == -1) { perror("accept:"); _exit(0); } //给线程池添加任务 pool_add_task(myprocess,newfd); } close(sockfd); SSL_CTX_free(ctx); return 0; }
相关文章推荐
- 网络安全传输系统(6)-其它改进
- 网络安全传输系统(7)-总结
- 网络安全传输系统(2)-框架搭建
- 网络安全传输系统(5)-账号管理系统
- [国嵌攻略][178][网络安全传输系统框架搭建]
- 下载基于REST、SPDY、异步IO深入规模化网络、信息安全、通信优化企业应用系统
- 负载均衡 性能优化,网络安全,https,分布式系统,日志分析,离线数据分析视频教程
- 网络安全传输系统(1)-功能介绍
- 网络安全传输系统(3)-加密传输
- [国嵌攻略][177][网络安全传输系统模型设计]
- 熟悉Linux系统安全和优化(全)
- 六大必杀技 系统安全之IE全面优化设置
- Linux 网络安全和优化 (第一部分)--- Jephe Wu 翻译整理
- 任天行网络安全管理系统
- 网络传输过程中三种安全机制综述
- 网络安全讲座之八:IDS系统
- 网络卫星股市行情实时传输系统 v2.3.25 下载
- 【信息安全】怎样保证银行的IT系统安全--网络安全老三样的断想
- 系统优化安全加速度