您的位置:首页 > 理论基础 > 计算机网络

简单进程池实现多TCP客户服务

2012-03-28 09:31 253 查看
根据预定义进程数创建进程池。

父子进程通信使用的IPC方式为:UNIX域套接字

父进程listen,aeecpt,并将连接套接字发送到子进程,交由子进程处理该连接。

子进程处理完毕,与父进程通信,实现资源回收,并在下一连接到来交由该完毕的子进程。

#include "unp.h"
#include <assert.h>
#define IDLE		0
#define BUSY		1
#define END		2

typedef struct pro {
pid_t	id;		/*进程的进程ID*/
int 		fd;		/*与父进程通信的描述符*/
int		flags;	/*进程当前状态*/
int		cnt;		/*进程服务的次数*/
}process;
process	*mypid;		/*进程池*/

ssize_t write__the_fd(int fd, void *ptr, size_t nbytes, int sendfd);
pid_t make_child( int i, int listenfd );
ssize_t read_the_fd(int fd, void *ptr, size_t nbytes, int *recvfd);
void child_func( void);
void clr(int signo);
int	main( int argc, char **argv ) {
int 		nchild, fdmax, listenfd, connfd, i, len, nselect;
char		c;
struct sockaddr_in		addr, clientaddr;
signal(SIGCHLD, clr);
signal(SIGINT, clr);
fd_set	rset, oset;
if(argc != 2)
err_quit("args");
listenfd = socket( AF_INET, SOCK_STREAM, 0 );/*服务器监听套接字*/
if( listenfd < 0 ) {
err_sys("socket");
}
nchild = atoi(argv[1]);
mypid = (struct pro *)malloc( nchild * sizeof( struct pro) );
bzero( &addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port	 = htons(SERV_PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(listenfd, (SA *)&addr, sizeof(struct sockaddr));
fdmax = listenfd; 						/*用于select */
for( i = 0; i < nchild; i++ ) {
make_child(i, listenfd);				/*创建子进程并等待给客户服务*/
}									/*传入listenfd子进程关闭监听套接字*/
int idle = nchild;						/*当前空闲的子进程*/
len = sizeof(struct sockaddr);
FD_ZERO(&oset);
FD_ZERO(&rset);
listen(listenfd,5);
FD_SET(listenfd, &oset);
for( ; ;) {
if(idle == 0)
FD_CLR(listenfd, &oset);		/*若没有空闲子进程,则禁止accept新连接*/
else
FD_SET(listenfd, &oset);
rset = oset;
nselect = select(fdmax+1, &rset, NULL, NULL, NULL);
if( FD_ISSET(listenfd, &rset) ) {		/*收到客户端连接请求*/
connfd = accept(listenfd, (SA *)&clientaddr, &len);
for( i = 0; i < nchild; i++) {
if(mypid[i].flags == IDLE) {	/*查找可供服务的子进程*/
break;
}
}
/*  */
assert(i < nchild);
/*  */
//connfd = accept(listenfd, (SA *)&clientaddr, &len);
printf("the id is %d \n",i);
mypid[i].flags = BUSY;  				/*改变子进程状态*/
mypid[i].cnt ++;						/*子进程服务次数*/
FD_SET(mypid[i].fd, &oset);				/*加入描述集,等待子进程*/
fdmax = max(fdmax ,mypid[i].fd);
idle --;								/*空闲进程减少*/
/*通知子进程*/
write_the_fd(mypid[i].fd, " ", 1,connfd);/*把连接套接字传送给子进程*/
close(connfd);						/*因为有有一个confd在飞,所以这个可以关闭*/
if(--nselect  ==0)
continue;
}
else {
for(i = 0; i < nchild; i++ ) {
if(FD_ISSET(mypid[i].fd , &rset)) {
read(mypid[i].fd, &c, 1);
mypid[i].flags = IDLE;
FD_CLR(mypid[i].fd, &oset);
idle ++;
}
if(--nselect  == 0)
break;
}

}

}
}
void clr(int signo) {
pid_t pid;
while( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) {
printf("The child %d is done\n", pid);
}
sleep(2);
kill(getpid(), SIGTERM);
return;
}
pid_t make_child( int i, int listenfd ) {
pid_t 	child;
int		fd[2];
socketpair( AF_LOCAL, SOCK_STREAM,0, fd);
if( (child = fork( ) ) < 0 ) {
err_quit("fork ");
}
if( child > 0 ) {  /*父进程*/
//printf("the  i is  %d\n", i);
close( fd[1] );
mypid[i].id = child;
mypid[i].fd= fd[0];
mypid[i].flags = IDLE;
mypid[i].cnt = 0;
return child;
}
/*子进程*/
close( fd[0] );
close(listenfd);
dup2( fd[1], STDERR_FILENO);
close( fd[1]);
child_func();
}

void child_func( void) {
int 		connfd, n;
char		c;
char		buf[MAXLINE];
for( ; ; ) {
n = read_fd(STDERR_FILENO, &c, 1, &connfd);
if(n <0 )
err_sys("read_fd");
again:
while( (n = read(connfd, buf, sizeof(buf))) > 0 ){
writen(connfd, buf ,n);
}
if((n < 0)&&( errno == EINTR))
goto again;
else if(n < 0)
err_sys("error");
else if(n ==0) {
close(connfd);
write(STDERR_FILENO, " ", 1);
}
}
}
ssize_t write_the_fd(int fd, void *ptr, size_t nbytes, int sendfd) {
struct msghdr	msg;
struct iovec	iov[1];
union {
struct cmsghdr	cm;
char				control[CMSG_SPACE(sizeof(int))];
} control_un;
struct cmsghdr	*cmptr;
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
cmptr = CMSG_FIRSTHDR(&msg);
cmptr->cmsg_len = CMSG_LEN(sizeof(int));
cmptr->cmsg_level = SOL_SOCKET;
cmptr->cmsg_type = SCM_RIGHTS;
*((int *) CMSG_DATA(cmptr)) = sendfd;
msg.msg_name = NULL;
msg.msg_namelen = 0;
iov[0].iov_base = ptr;
iov[0].iov_len = nbytes;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
return(sendmsg(fd, &msg, 0));
}
ssize_t read_the_fd(int fd, void *ptr, size_t nbytes, int *recvfd) {
struct msghdr	msg;
struct iovec	iov[1];
ssize_t			n;
int				newfd;
union {
struct cmsghdr	cm;
char				control[CMSG_SPACE(sizeof(int))];
} control_un;
struct cmsghdr	*cmptr;

msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
msg.msg_name = NULL;
msg.msg_namelen = 0;

iov[0].iov_base = ptr;
iov[0].iov_len = nbytes;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

if ( (n = recvmsg(fd, &msg, 0)) <= 0)
return(n);
if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL &&
cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
if (cmptr->cmsg_level != SOL_SOCKET)
err_quit("control level != SOL_SOCKET");
if (cmptr->cmsg_type != SCM_RIGHTS)
err_quit("control type != SCM_RIGHTS");
*recvfd = *((int *) CMSG_DATA(cmptr));
} else
*recvfd = -1;
return(n);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: