您的位置:首页 > 其它

Socket通信过程中环形缓冲区应用

2013-11-11 20:15 239 查看
  实验室的项目需求,需要做一个转发服务器,简单来说服务器由客户端A接收数据,转发给客户端B,可以有多个A与多个B存在,他们之间存在一个对应关系,主要是通过查询数据库来得到对应的关系,这里不细说。但是当客户端A的数量到达100个时,B接收到的数据会有延迟,这里说一下A每秒发送的数据量在256B左右。通过netstat命令,发现数据大量缓存在服务器的接收缓冲区中。

  经过网上的查询,发现瓶颈在于:我用同一个线程去接收数据与转发数据(用epoll实现,通过epoll获得读数据事件,然后读取数据转发)。但是这样效率不高,应该用两个线程来实现,一个线程用来转发数据,另一个用来接收数据,中间通过一个缓冲区来进行沟通。这个模型就与我们操作系统中学习的消费者和生产者模式相似。

  由于以前也没有写过环形缓冲区的例子,这里先用一个小的实例来做实验,实例中同样包括服务器和客户端。服务器一个线程用来接收数据放入缓冲区,一个线程用来读取缓冲区中的数据并进行打印。

  至于环形缓冲区的代码,我是参考先人的代码,在此基础上加以应用,先人代码网址如下:

  /article/6311281.html

  先人的代码写的真心不错,代码风格也很规范,将条件同步应用到了缓冲区的同步问题中。

  现在奉上在下的代码:

  这是服务器端的代码,其中有两个线程,一个接收数据,另一个打印接收的数据。

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<string.h>
#include<netinet/in.h>
#include<pthread.h>

#include"ytsSocket.h"

#define SERV_IP "127.0.0.1"
#define CLIENT_PORT 6666
#define MONITOR_PORT 5555

int Bind_socket();
void *Client_Service(void*);
void *Output_Service(void*);

struct prodcons buffer;         /* 全局的环形缓冲区 */

int main(int argc, char *argv[])
{

int clifd,clientfd;
socklen_t clilen;
pthread_t ntid;
int err;
char buf[32];

// struct prodcons buffer;         /* 全局的环形缓冲区 */

/* 初始化全局的缓冲区 */
init_buffer(&buffer);

struct sockaddr_in cliaddr;
if((clifd=Bind_socket())==-1) /* 绑定套接字 */
{
printf ("创建套接字失败\n");
pthread_exit((void*)-1);
}
if((err=pthread_create(&ntid,NULL,Output_Service,NULL))!=0)
{
printf("can not create thread:%s\n",strerror(err));
return -1;
}

for( ; ; )
{

// printf ("waiting for connect...\n");
clilen=sizeof(cliaddr);
if((clientfd=accept(clifd,(struct sockaddr*)&cliaddr,&clilen))<0)
{
perror("accept");
pthread_exit((void*)-1);
}
inet_ntop(AF_INET,&cliaddr.sin_addr,buf,sizeof(buf));
printf ("Recieve connection from %s\n",buf);

if((err=pthread_create(&ntid,NULL,Client_Service,(void*)&clientfd))!=0)
{
printf("can not create thread:%s\n",strerror(err));
return -1;
}

int rst;
}

return 0;
}

/* 为客户端服务的线程 */
void* Client_Service(void*arg)
{
int clientfd;

clientfd=*(int *)arg;

int rstn;

for( ;  ; )
{
rstn=receive_message(clientfd,&buffer);

if(rstn==-1)
{
close(clientfd);
pthread_exit((void*)-1);

}else if(rstn==0)
{
close(clientfd);
printf ("peer close connection\n");
pthread_exit((void*)-1);
}
else
{
// buffer[rstn]=0;
//printf("receive from client data: %s\n",buffer);
}

sleep(1);
}

}

/* 建立套接字 */
int Bind_socket()
{
int clifd;

socklen_t clilen;

struct sockaddr_in servaddr_cli,servaddr_mon,cliaddr;

if((clifd=socket(AF_INET,SOCK_STREAM,0))<0)
{
perror("socket");
return -1;
}

bzero(&servaddr_cli,sizeof(servaddr_cli));
servaddr_cli.sin_family=AF_INET;
servaddr_cli.sin_port=htons(CLIENT_PORT);
servaddr_cli.sin_addr.s_addr=htonl(INADDR_ANY);

if( bind(clifd,(struct sockaddr*)&servaddr_cli,sizeof(servaddr_cli))<0)
{
perror("bind");
return -1;
}

if(listen(clifd,5)<0)
{
perror("listen");
return -1;
}

return clifd;
}

void* Output_Service(void*arg)
{

for( ; ;)
{
print_buffer(&buffer);
}

}


  客户端代码,客户端十分简单,就是不断向服务器发送数据:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<string.h>
#include<netinet/in.h>
#include<pthread.h>
#include<unistd.h>

#include "ytsSocket.h"
#define SERV_IP "127.0.0.1"
#define CLIENT_PORT 6666
#define MONITOR_PORT 5555

int main(int argc, char *argv[])
{
int clifd;
struct sockaddr_in servaddr;

if( (clifd=socket(AF_INET,SOCK_STREAM,0))<0)
perror("socket");

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(CLIENT_PORT);
inet_pton(AF_INET,SERV_IP,&servaddr.sin_addr);

if(connect(clifd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
perror("connect");

char buffer[32];
int i,rstn;
for( i=0;i<10;i++)
{
snprintf(buffer,sizeof(buffer),"Data Id is : %d",i);
printf ("%s\n",buffer);

/* if((rstn=write(clifd,buffer,strlen(buffer)))<0) */
/*     perror("write"); */
SendMessageClient(clifd,buffer,strlen(buffer));
usleep(1000000);
}

return 0;
}


  环形缓冲区主要代码和socket通信粘包问题处理的代码:

#include"ytsSocket.h"

/* 向对端发送数据,由共享缓冲区中读取数据 */
int SendMessage(int socketFd,char * sendBuffer)
{

char len[5];
int sendLen=strlen(sendBuffer);
snprintf(len,sizeof(len),"%04d",sendLen);
writen(socketFd,len,sizeof(len)-1);

int sendRst=writen(socketFd,sendBuffer,sendLen);
return sendRst;
}

int SendMessageClient(int socketFd,char * sendBuffer,int sendLen)
{
char len[5];
snprintf(len,sizeof(len),"%04d",sendLen);
//printf("len:%s\n",len);
if(send(socketFd,len,sizeof(len)-1,0)<0)
{
perror("send :");
return -1;
}

int sendRecv;
if((sendRecv=send(socketFd,sendBuffer,sendLen,0))<0)
{
perror("send sendBuffer:");
return -1;
}

return sendRecv;
}

int ReceiveMessage(int sockFd,char recvMsg[])
{

char msgSize[5];
int sizeLen=sizeof(msgSize);
readn(sockFd,msgSize,sizeLen-1);
msgSize[sizeLen-1]=0;
int msgLen=atoi(msgSize);

int rcvLen=readn(sockFd,recvMsg,msgLen);
if(rcvLen>0)
{
recvMsg[rcvLen]=0;      /* 如果成功接收数据,在数据结尾后加入‘0/’ */
}
return rcvLen;

}

int readn(int fd,void *vptr,int n)
{
int nleft;
int nread;
char *ptr;
ptr=(char*)vptr;
nleft=n;
while(nleft>0)
{
if( (nread=read(fd,ptr,nleft))<0)
{
if(errno==EINTR)//read again
{
nread=0;
}
else
{

return -1;

//pthread_exit((void*)0);//end thread
}
}
else if (nread==0)//receive FIN
{
close(fd);   //close socket
//pthread_exit((void*)0);//end thread
return 0;
}

nleft-=nread;
ptr+=nread;
}

return (n-nleft);
}

int writen(int fd,const void * vptr,int n)
{
int nleft;
int nwritten;
const char *ptr;

ptr=(char*)vptr;
nleft=n;
while(nleft>0)
{
if( (nwritten=write(fd,ptr,nleft))<=0)
{
if(nwritten<0 && errno==EINTR)
nwritten=0;
else
{
return -1;
//pthread_exit((void*)0);//end thread
}

}

nleft-=nwritten;
ptr+=nwritten;
}
}

/* 接收数据,并向共享缓冲区中写入数据 */
int receive_message(int fd,struct prodcons*buffer)
{
// printf ("recevie message\n");
int rstn;                   /* 接收数据 */
pthread_mutex_lock(&buffer->lock);
//printf ("size:%d\n",buffer->size);
/* 等待缓冲区未满 */
while(buffer->size==BUFFER_SIZE)
{
pthread_cond_wait(&buffer->notfull,&buffer->lock);
}

if((rstn=ReceiveMessage(fd,buffer->buffer[buffer->writepos]))<=0)
{
pthread_mutex_unlock(&buffer->lock);
return rstn;
}
else
{
buffer->writepos++;

if(buffer->writepos>=BUFFER_SIZE)
{
buffer->writepos=0;
}
buffer->size++;
pthread_cond_signal(&buffer->notempty);
pthread_mutex_unlock(&buffer->lock);

return rstn;
}

}

/* 发送数据,读共享缓冲区 */
int send_message(int fd,struct prodcons *buffer)
{
int rstn;                   /* 记录发送数据的个数 */
pthread_mutex_lock(&buffer->lock);

while(buffer->size==0)
{
pthread_cond_wait(&buffer->notempty,&buffer->lock);
}

/* 读取缓冲区中的数据,进行发送 */
if((rstn=SendMessage(fd,buffer->buffer[buffer->readpos]))<=0)
{
pthread_mutex_unlock(&buffer->lock);
return rstn;
}
else
{
buffer->readpos++;
if(buffer->readpos>=BUFFER_SIZE)
{
buffer->readpos=0;
}
buffer->size--;

pthread_cond_signal(&buffer->notfull);
pthread_mutex_unlock(&buffer->lock);
return rstn;
}
}

/* 初始化缓冲区相关数据 */
void init_buffer(struct prodcons*buffer)
{

pthread_mutex_init(&buffer->lock,NULL);
pthread_cond_init(&buffer->notempty,NULL);
pthread_cond_init(&buffer->notfull,NULL);
buffer->readpos=0;
buffer->writepos=0;
buffer->size=0;

}

//输出共享buffer中的信息
void print_buffer(struct prodcons*buffer)
{
/* 记录发送数据的个数 */
pthread_mutex_lock(&buffer->lock);

while(buffer->size==0)
{
pthread_cond_wait(&buffer->notempty,&buffer->lock);
}

/* 读取缓冲区中的数据,进行发送 */
printf ("%s\n",buffer->buffer[buffer->readpos]);
buffer->readpos++;
if(buffer->readpos>=BUFFER_SIZE)
{
buffer->readpos=0;
}

buffer->size--;

pthread_cond_signal(&buffer->notfull);
pthread_mutex_unlock(&buffer->lock);

}


下面是运行结果:

我编译了两个客户端,同时向服务器发送数据,一个发送Data Number is [0-9] ,另一个发送Data Id is [0-9];

服务器的输出如下:

[yts@ytsCentos ringBuffer]$ ./main
Recieve connection from 127.0.0.1
Data Id is : 0
Recieve connection from 127.0.0.1
Data Number is : 0
Data Id is : 1
Data Number is : 1
Data Id is : 2
Data Number is : 2
Data Id is : 3
Data Number is : 3
Data Id is : 4
Data Number is : 4
Data Id is : 5
Data Number is : 5
Data Id is : 6
Data Number is : 6
Data Id is : 7
Data Number is : 7
Data Id is : 8
Data Number is : 8
Data Id is : 9
Data Number is : 9
peer close connection
peer close connection


客户端输出:

[yts@ytsCentos ringBuffer]$ ./client
Data Number is : 0
Data Number is : 1
Data Number is : 2
Data Number is : 3
Data Number is : 4
Data Number is : 5
Data Number is : 6
Data Number is : 7
Data Number is : 8
Data Number is : 9


[yts@ytsCentos ringBuffer]$ ./client1
Data Id is : 0
Data Id is : 1
Data Id is : 2
Data Id is : 3
Data Id is : 4
Data Id is : 5
Data Id is : 6
Data Id is : 7
Data Id is : 8
Data Id is : 9


达到了预期的效果,下阶段的目标是把环形缓冲区应用到服务器中,希望一且顺利。。。

源代码:

ringBuffer.zip
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: