Socket通信过程中环形缓冲区应用
2013-11-11 20:15
239 查看
实验室的项目需求,需要做一个转发服务器,简单来说服务器由客户端A接收数据,转发给客户端B,可以有多个A与多个B存在,他们之间存在一个对应关系,主要是通过查询数据库来得到对应的关系,这里不细说。但是当客户端A的数量到达100个时,B接收到的数据会有延迟,这里说一下A每秒发送的数据量在256B左右。通过netstat命令,发现数据大量缓存在服务器的接收缓冲区中。
经过网上的查询,发现瓶颈在于:我用同一个线程去接收数据与转发数据(用epoll实现,通过epoll获得读数据事件,然后读取数据转发)。但是这样效率不高,应该用两个线程来实现,一个线程用来转发数据,另一个用来接收数据,中间通过一个缓冲区来进行沟通。这个模型就与我们操作系统中学习的消费者和生产者模式相似。
由于以前也没有写过环形缓冲区的例子,这里先用一个小的实例来做实验,实例中同样包括服务器和客户端。服务器一个线程用来接收数据放入缓冲区,一个线程用来读取缓冲区中的数据并进行打印。
至于环形缓冲区的代码,我是参考先人的代码,在此基础上加以应用,先人代码网址如下:
/article/6311281.html
先人的代码写的真心不错,代码风格也很规范,将条件同步应用到了缓冲区的同步问题中。
现在奉上在下的代码:
这是服务器端的代码,其中有两个线程,一个接收数据,另一个打印接收的数据。
客户端代码,客户端十分简单,就是不断向服务器发送数据:
环形缓冲区主要代码和socket通信粘包问题处理的代码:
下面是运行结果:
我编译了两个客户端,同时向服务器发送数据,一个发送Data Number is [0-9] ,另一个发送Data Id is [0-9];
服务器的输出如下:
客户端输出:
达到了预期的效果,下阶段的目标是把环形缓冲区应用到服务器中,希望一且顺利。。。
源代码:
ringBuffer.zip
经过网上的查询,发现瓶颈在于:我用同一个线程去接收数据与转发数据(用epoll实现,通过epoll获得读数据事件,然后读取数据转发)。但是这样效率不高,应该用两个线程来实现,一个线程用来转发数据,另一个用来接收数据,中间通过一个缓冲区来进行沟通。这个模型就与我们操作系统中学习的消费者和生产者模式相似。
由于以前也没有写过环形缓冲区的例子,这里先用一个小的实例来做实验,实例中同样包括服务器和客户端。服务器一个线程用来接收数据放入缓冲区,一个线程用来读取缓冲区中的数据并进行打印。
至于环形缓冲区的代码,我是参考先人的代码,在此基础上加以应用,先人代码网址如下:
/article/6311281.html
先人的代码写的真心不错,代码风格也很规范,将条件同步应用到了缓冲区的同步问题中。
现在奉上在下的代码:
这是服务器端的代码,其中有两个线程,一个接收数据,另一个打印接收的数据。
#include<stdio.h> #include<sys/types.h> #include<sys/socket.h> #include<string.h> #include<netinet/in.h> #include<pthread.h> #include"ytsSocket.h" #define SERV_IP "127.0.0.1" #define CLIENT_PORT 6666 #define MONITOR_PORT 5555 int Bind_socket(); void *Client_Service(void*); void *Output_Service(void*); struct prodcons buffer; /* 全局的环形缓冲区 */ int main(int argc, char *argv[]) { int clifd,clientfd; socklen_t clilen; pthread_t ntid; int err; char buf[32]; // struct prodcons buffer; /* 全局的环形缓冲区 */ /* 初始化全局的缓冲区 */ init_buffer(&buffer); struct sockaddr_in cliaddr; if((clifd=Bind_socket())==-1) /* 绑定套接字 */ { printf ("创建套接字失败\n"); pthread_exit((void*)-1); } if((err=pthread_create(&ntid,NULL,Output_Service,NULL))!=0) { printf("can not create thread:%s\n",strerror(err)); return -1; } for( ; ; ) { // printf ("waiting for connect...\n"); clilen=sizeof(cliaddr); if((clientfd=accept(clifd,(struct sockaddr*)&cliaddr,&clilen))<0) { perror("accept"); pthread_exit((void*)-1); } inet_ntop(AF_INET,&cliaddr.sin_addr,buf,sizeof(buf)); printf ("Recieve connection from %s\n",buf); if((err=pthread_create(&ntid,NULL,Client_Service,(void*)&clientfd))!=0) { printf("can not create thread:%s\n",strerror(err)); return -1; } int rst; } return 0; } /* 为客户端服务的线程 */ void* Client_Service(void*arg) { int clientfd; clientfd=*(int *)arg; int rstn; for( ; ; ) { rstn=receive_message(clientfd,&buffer); if(rstn==-1) { close(clientfd); pthread_exit((void*)-1); }else if(rstn==0) { close(clientfd); printf ("peer close connection\n"); pthread_exit((void*)-1); } else { // buffer[rstn]=0; //printf("receive from client data: %s\n",buffer); } sleep(1); } } /* 建立套接字 */ int Bind_socket() { int clifd; socklen_t clilen; struct sockaddr_in servaddr_cli,servaddr_mon,cliaddr; if((clifd=socket(AF_INET,SOCK_STREAM,0))<0) { perror("socket"); return -1; } bzero(&servaddr_cli,sizeof(servaddr_cli)); servaddr_cli.sin_family=AF_INET; servaddr_cli.sin_port=htons(CLIENT_PORT); servaddr_cli.sin_addr.s_addr=htonl(INADDR_ANY); if( bind(clifd,(struct sockaddr*)&servaddr_cli,sizeof(servaddr_cli))<0) { perror("bind"); return -1; } if(listen(clifd,5)<0) { perror("listen"); return -1; } return clifd; } void* Output_Service(void*arg) { for( ; ;) { print_buffer(&buffer); } }
客户端代码,客户端十分简单,就是不断向服务器发送数据:
#include<stdio.h> #include<sys/types.h> #include<sys/socket.h> #include<string.h> #include<netinet/in.h> #include<pthread.h> #include<unistd.h> #include "ytsSocket.h" #define SERV_IP "127.0.0.1" #define CLIENT_PORT 6666 #define MONITOR_PORT 5555 int main(int argc, char *argv[]) { int clifd; struct sockaddr_in servaddr; if( (clifd=socket(AF_INET,SOCK_STREAM,0))<0) perror("socket"); bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_port=htons(CLIENT_PORT); inet_pton(AF_INET,SERV_IP,&servaddr.sin_addr); if(connect(clifd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0) perror("connect"); char buffer[32]; int i,rstn; for( i=0;i<10;i++) { snprintf(buffer,sizeof(buffer),"Data Id is : %d",i); printf ("%s\n",buffer); /* if((rstn=write(clifd,buffer,strlen(buffer)))<0) */ /* perror("write"); */ SendMessageClient(clifd,buffer,strlen(buffer)); usleep(1000000); } return 0; }
环形缓冲区主要代码和socket通信粘包问题处理的代码:
#include"ytsSocket.h" /* 向对端发送数据,由共享缓冲区中读取数据 */ int SendMessage(int socketFd,char * sendBuffer) { char len[5]; int sendLen=strlen(sendBuffer); snprintf(len,sizeof(len),"%04d",sendLen); writen(socketFd,len,sizeof(len)-1); int sendRst=writen(socketFd,sendBuffer,sendLen); return sendRst; } int SendMessageClient(int socketFd,char * sendBuffer,int sendLen) { char len[5]; snprintf(len,sizeof(len),"%04d",sendLen); //printf("len:%s\n",len); if(send(socketFd,len,sizeof(len)-1,0)<0) { perror("send :"); return -1; } int sendRecv; if((sendRecv=send(socketFd,sendBuffer,sendLen,0))<0) { perror("send sendBuffer:"); return -1; } return sendRecv; } int ReceiveMessage(int sockFd,char recvMsg[]) { char msgSize[5]; int sizeLen=sizeof(msgSize); readn(sockFd,msgSize,sizeLen-1); msgSize[sizeLen-1]=0; int msgLen=atoi(msgSize); int rcvLen=readn(sockFd,recvMsg,msgLen); if(rcvLen>0) { recvMsg[rcvLen]=0; /* 如果成功接收数据,在数据结尾后加入‘0/’ */ } return rcvLen; } int readn(int fd,void *vptr,int n) { int nleft; int nread; char *ptr; ptr=(char*)vptr; nleft=n; while(nleft>0) { if( (nread=read(fd,ptr,nleft))<0) { if(errno==EINTR)//read again { nread=0; } else { return -1; //pthread_exit((void*)0);//end thread } } else if (nread==0)//receive FIN { close(fd); //close socket //pthread_exit((void*)0);//end thread return 0; } nleft-=nread; ptr+=nread; } return (n-nleft); } int writen(int fd,const void * vptr,int n) { int nleft; int nwritten; const char *ptr; ptr=(char*)vptr; nleft=n; while(nleft>0) { if( (nwritten=write(fd,ptr,nleft))<=0) { if(nwritten<0 && errno==EINTR) nwritten=0; else { return -1; //pthread_exit((void*)0);//end thread } } nleft-=nwritten; ptr+=nwritten; } } /* 接收数据,并向共享缓冲区中写入数据 */ int receive_message(int fd,struct prodcons*buffer) { // printf ("recevie message\n"); int rstn; /* 接收数据 */ pthread_mutex_lock(&buffer->lock); //printf ("size:%d\n",buffer->size); /* 等待缓冲区未满 */ while(buffer->size==BUFFER_SIZE) { pthread_cond_wait(&buffer->notfull,&buffer->lock); } if((rstn=ReceiveMessage(fd,buffer->buffer[buffer->writepos]))<=0) { pthread_mutex_unlock(&buffer->lock); return rstn; } else { buffer->writepos++; if(buffer->writepos>=BUFFER_SIZE) { buffer->writepos=0; } buffer->size++; pthread_cond_signal(&buffer->notempty); pthread_mutex_unlock(&buffer->lock); return rstn; } } /* 发送数据,读共享缓冲区 */ int send_message(int fd,struct prodcons *buffer) { int rstn; /* 记录发送数据的个数 */ pthread_mutex_lock(&buffer->lock); while(buffer->size==0) { pthread_cond_wait(&buffer->notempty,&buffer->lock); } /* 读取缓冲区中的数据,进行发送 */ if((rstn=SendMessage(fd,buffer->buffer[buffer->readpos]))<=0) { pthread_mutex_unlock(&buffer->lock); return rstn; } else { buffer->readpos++; if(buffer->readpos>=BUFFER_SIZE) { buffer->readpos=0; } buffer->size--; pthread_cond_signal(&buffer->notfull); pthread_mutex_unlock(&buffer->lock); return rstn; } } /* 初始化缓冲区相关数据 */ void init_buffer(struct prodcons*buffer) { pthread_mutex_init(&buffer->lock,NULL); pthread_cond_init(&buffer->notempty,NULL); pthread_cond_init(&buffer->notfull,NULL); buffer->readpos=0; buffer->writepos=0; buffer->size=0; } //输出共享buffer中的信息 void print_buffer(struct prodcons*buffer) { /* 记录发送数据的个数 */ pthread_mutex_lock(&buffer->lock); while(buffer->size==0) { pthread_cond_wait(&buffer->notempty,&buffer->lock); } /* 读取缓冲区中的数据,进行发送 */ printf ("%s\n",buffer->buffer[buffer->readpos]); buffer->readpos++; if(buffer->readpos>=BUFFER_SIZE) { buffer->readpos=0; } buffer->size--; pthread_cond_signal(&buffer->notfull); pthread_mutex_unlock(&buffer->lock); }
下面是运行结果:
我编译了两个客户端,同时向服务器发送数据,一个发送Data Number is [0-9] ,另一个发送Data Id is [0-9];
服务器的输出如下:
[yts@ytsCentos ringBuffer]$ ./main Recieve connection from 127.0.0.1 Data Id is : 0 Recieve connection from 127.0.0.1 Data Number is : 0 Data Id is : 1 Data Number is : 1 Data Id is : 2 Data Number is : 2 Data Id is : 3 Data Number is : 3 Data Id is : 4 Data Number is : 4 Data Id is : 5 Data Number is : 5 Data Id is : 6 Data Number is : 6 Data Id is : 7 Data Number is : 7 Data Id is : 8 Data Number is : 8 Data Id is : 9 Data Number is : 9 peer close connection peer close connection
客户端输出:
[yts@ytsCentos ringBuffer]$ ./client Data Number is : 0 Data Number is : 1 Data Number is : 2 Data Number is : 3 Data Number is : 4 Data Number is : 5 Data Number is : 6 Data Number is : 7 Data Number is : 8 Data Number is : 9
[yts@ytsCentos ringBuffer]$ ./client1 Data Id is : 0 Data Id is : 1 Data Id is : 2 Data Id is : 3 Data Id is : 4 Data Id is : 5 Data Id is : 6 Data Id is : 7 Data Id is : 8 Data Id is : 9
达到了预期的效果,下阶段的目标是把环形缓冲区应用到服务器中,希望一且顺利。。。
源代码:
ringBuffer.zip
相关文章推荐
- JAVA Socket应用学习——基于TCP协议实现的网络通信
- Linux socket编程入门及客户端服务器端通信实现 – 提高篇:TCP连接过程分析
- tcp通讯的方式并使用socket建立客户端与服务器的通信的过程
- socket通信的环形缓冲 .
- 简单socket通信过程(TCP)
- 手写JAVA NIO实现Socket通信及其过程中注意的问题
- 使用android进行Socket通信实现多人聊天应用
- socket 的通信过程
- JAVA TCP/IP Socket通信机制以及应用
- 基于TCP/UDP的Socket网络通信系列之Java中的InetAddress的应用(二)
- 无废话WCF入门教程二[WCF应用的通信过程]
- ucenter应用通信过程
- 无废话WCF入门教程二[WCF应用的通信过程]
- 应用通信协议的处理过程
- Android应用与framework的socket通信实例
- Socket通信过程和函数详解
- 调试一个socket通信bug的心理过程和反思
- WCF系统之WCF应用的通信过程
- 一起talk C栗子吧(第一百四十七回:C语言实例--流socket通信过程)
- 无废话WCF入门教程二[WCF应用的通信过程]