您的位置:首页 > 理论基础 > 计算机网络

epoll+线程池实现http文件下载

2016-08-01 17:16 274 查看
server.c

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<fcntl.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include"threadpool.h"
#include<dlfcn.h>
#include<time.h>
#define MAX_LEN 1024
#define MAX_SIZE 100

void *worker(void *arg){
char buf[MAX_LEN];
int sock = *(int *)arg;
free(arg);
int len = recv(sock,&buf,sizeof(buf),0);
send(sock,&buf,sizeof(buf),0);
close(sock);
}

int main(){
int server_sockfd;
int client_sockfd;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;
int sin_size;
int epoll_fd;
struct epoll_event ev;
struct epoll_event events[MAX_SIZE];
int nfds;
struct threadpool *pool;
pool = threadpool_init(16,100);
memset(&server_addr,0,sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(6665);
if( (server_sockfd = socket(PF_INET,SOCK_STREAM,0)) < 0 ){
perror("socket");
exit(EXIT_FAILURE);
}
int on=1;
setsockopt(server_sockfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
if( bind(server_sockfd,(struct sockaddr *)&server_addr,sizeof(struct sockaddr)) < 0 ){
perror("bind");
exit(EXIT_FAILURE);
}
listen(server_sockfd,5);
int old_option = fcntl(server_sockfd,F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(server_sockfd,F_SETFL,new_option);

if( (epoll_fd = epoll_create(MAX_SIZE)) < 0 ){
perror("create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = server_sockfd;
if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_sockfd,&ev) < 0 ){
perror("ctl");
exit(EXIT_FAILURE);
}
while(1){
nfds = epoll_wait(epoll_fd,events,MAX_SIZE,-1);
if( nfds < 0 ){
perror("wait");
exit(EXIT_FAILURE);
}
int i;
for( i = 0 ; i < nfds ; ++i ){
if( events[i].data.fd == server_sockfd ){
sin_size = sizeof(struct sockaddr_in);
if( (client_sockfd = accept(server_sockfd,(struct sockaddr *)&client_addr,&sin_size)) < 0 ){
perror("accept");
exit(EXIT_FAILURE);
}
fcntl(client_sockfd,F_SETFL,fcntl(client_sockfd,F_GETFD,0)|O_NONBLOCK);
ev.events = EPOLLIN|EPOLLET;
ev.data.fd = client_sockfd;
if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,client_sockfd,&ev) < 0 ){
perror("CTL");
exit(EXIT_FAILURE);
}
printf("accpet!\n");
}
else{
int *ptr = malloc(sizeof(int));
*ptr = events[i].data.fd;
threadpool_add_job(pool,worker,(void *)ptr);
}
}
}
threadpool_destroy(pool);
close(server_sockfd);
return 0;
}

threadpool.h
#ifndef threadpool_H_
#define threadpool_H_

#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>

struct job{
void* (*callback_func)(void *arg);
void *arg;
struct job *next;
};

struct threadpool{
int thread_num;
int queue_max_num;
int queue_cur_num;
int queue_close;
int pool_close;
struct job *head;
struct job *rear;
pthread_t *pthreads;
pthread_mutex_t mutex;
pthread_cond_t queue_empty;
pthread_cond_t queue_not_empty;
pthread_cond_t queue_not_full;
};

void* threadpool_func(void *arg);

struct threadpool* threadpool_init(int thread_num,int queue_max_num);

int threadpool_add_job(struct threadpool *pool,void* (*callback_func)(void *arg),void *arg);

int threadpool_destroy(struct threadpool *pool);

#endifthreadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>

void* threadpool_func(void* arg){
struct threadpool *pool = (struct threadpool*)arg;
struct job *pjob = NULL;
while(1){
pthread_mutex_lock(&(pool->mutex));
while( (pool->queue_cur_num == 0) && !pool->pool_close ){
pthread_cond_wait(&(pool->queue_not_empty),&(pool->mutex));
}
if( pool->pool_close ){
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
}
pool->queue_cur_num--;
pjob = pool->head;
if( pool->queue_cur_num == 0 ){
pool->head = pool->rear = NULL;
pthread_cond_signal(&(pool->queue_empty));
}
else{
pool->head = pjob->next;
}
if( pool->queue_cur_num == pool->queue_max_num-1 ){
pthread_cond_broadcast(&(pool->queue_not_full));
}
pthread_mutex_unlock(&(pool->mutex));
(*(pjob->callback_func))(pjob->arg);
free(pjob);
pjob = NULL;
}
}

struct threadpool* threadpool_init(int thread_num,int queue_max_num){
struct threadpool *pool = NULL;
pool = malloc(sizeof(struct threadpool));
if( pool == NULL ){
perror("malloc threadpool failed");
return NULL;
}
pool->thread_num = thread_num;
pool->queue_max_num = queue_max_num;
pool->queue_cur_num = 0;
pool->head = NULL;
pool->rear = NULL;
if( pthread_mutex_init(&(pool->mutex),NULL) ){
perror("init mutex failed");
return NULL;
}
if( pthread_cond_init(&(pool->queue_empty),NULL) ){
perror("init queue_empty failed");
return NULL;
}
if( pthread_cond_init(&(pool->queue_not_empty),NULL) ){
perror("init queue_not_empty failed");
return NULL;
}
if( pthread_cond_init(&(pool->queue_not_full),NULL) ){
perror("init queue_not_full failed");
return NULL;
}
pool->pthreads = malloc(sizeof(pthread_t)*thread_num);
if( pool->pthreads == NULL ){
perror("malloc pthreads failed");
return NULL;
}
pool->queue_close = 0;
pool->pool_close = 0;
int i;
for( i = 0 ; i < pool->thread_num ; ++i ){
pthread_create(&(pool->pthreads[i]),NULL,threadpool_func,(void *)pool);
}
return pool;
}

int threadpool_add_job(struct threadpool* pool,void* (*callback_func)(void *arg),void *arg){
assert(pool!=NULL);
assert(callback_func!=NULL);
assert(arg!=NULL);
pthread_mutex_lock(&(pool->mutex));
while( (pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close) ){
pthread_cond_wait(&(pool->queue_not_full),&(pool->mutex));
}
if( pool->queue_close || pool->pool_close ){
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
struct job *pjob = (struct job*)malloc(sizeof(struct job));
if( pjob == NULL ){
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
pjob->callback_func = callback_func;
pjob->arg = arg;
pjob->next = NULL;
if( pool->head == NULL ){
pool->head = pool->rear = pjob;
pthread_cond_broadcast(&(pool->queue_not_empty));
}
else{
pool->rear->next = pjob;
pool->rear = pjob;
}
pool->queue_cur_num++;
pthread_mutex_unlock(&(pool->mutex));
return 0;
}

int threadpool_destroy(struct threadpool *pool){
assert(pool!=NULL);
pthread_mutex_lock(&(pool->mutex));
if( pool->queue_close || pool->pool_close ){
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
pool->queue_close = 1;
while( pool->queue_cur_num != 0 ){
pthread_cond_wait(&(pool->queue_empty),&(pool->mutex));
}
pool->pool_close = 1;
pthread_mutex_unlock(&(pool->mutex));
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_cond_broadcast(&(pool->queue_not_full));
int i;
for( i = 0 ; i < pool->thread_num ; ++i ){
pthread_join(pool->pthreads[i],NULL);
}
pthread_mutex_destroy(&(pool->mutex));
pthread_cond_destroy(&(pool->queue_empty));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
free(pool->pthreads);
struct job *p;
while( pool->rear != NULL ){
p = pool->head;
pool->head = p->next;
free(p);
}
free(pool);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: