您的位置:首页 > 其它

使用应用程序测试网卡收发UDP数据包性能

2017-12-20 22:06 1036 查看
通常使用iperf进行网卡性能测试,但是有些情况下需要自己编写应用程序来进行UDP数据包的收发,根据时间间隔统计出网卡的收发包信息。

 

UDP发送端代码如下:

 

#define _GNU_SOURCE // sendmmsg

#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <errno.h>

#include "common.h"

struct state {
struct net_addr *target_addr;
int packets_in_buf;
const char *payload;
int payload_sz;
int src_port;
};

void thread_loop(void *userdata) {
struct state *state = userdata;

struct mmsghdr *messages = calloc(state->packets_in_buf, sizeof(struct mmsghdr));
struct iovec *iovecs = calloc(state->packets_in_buf, sizeof(struct iovec));

int fd = net_connect_udp(state->target_addr, state->src_port);

int i;
for (i = 0; i < state->packets_in_buf; i++) {
struct iovec *iovec = &iovecs[i];
struct mmsghdr *msg = &messages[i];

msg->msg_hdr.msg_iov = iovec;
msg->msg_hdr.msg_iovlen = 1;

iovec->iov_base = (void*)state->payload;
iovec->iov_len = state->payload_sz;
}

while (1) {
int r = sendmmsg(fd, messages, state->packets_in_buf, 0);
if (r <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}

if (errno == ECONNREFUSED) {
continue;
}
PFATAL("sendmmsg()");
}
int i, bytes = 0;
for (i = 0; i < r; i++) {
struct mmsghdr *msg = &messages[i];
/* char *buf = msg->msg_hdr.msg_iov->iov_base; */
int len = msg->msg_len;
msg->msg_hdr.msg_flags = 0;
msg->msg_len = 0;
bytes += len;
}
}
}

int main(int argc, const char *argv[])
{
int packets_in_buf = 1024;
const char *payload = (const char[32]){0};
int payload_sz = 32;

if (argc == 1) {
FATAL("Usage: %s [target ip:port] [target ...]", argv[0]);
}

struct net_addr *target_addrs = calloc(argc-1, sizeof(struct net_addr));
int thread_num = argc - 1;

int t;
for (t = 0; t < thread_num; t++) {
const char *target_addr_str = argv[t+1];
parse_addr(&target_addrs[t], target_addr_str);

fprintf(stderr, "[*] Sending to %s, send buffer %i packets\n",
addr_to_str(&target_addrs[t]), packets_in_buf);
}

struct state *array_of_states = calloc(thread_num, sizeof(struct state));

for (t = 0; t < thread_num; t++) {
struct state *state = &array_of_states[t];
state->target_addr = &target_addrs[t];
state->packets_in_buf = packets_in_buf;
state->payload = payload;
state->payload_sz = payload_sz;
state->src_port = 11404;
thread_spawn(thread_loop, state);
}

while (1) {
struct timeval timeout =
NSEC_TIMEVAL(MSEC_NSEC(1000UL));
while (1) {
int r = select(0, NULL, NULL, NULL, &timeout);
if (r != 0) {
continue;
}
if (TIMEVAL_NSEC(&timeout) == 0) {
break;
}
}
// pass
}
return 0;
}


 

 

UDP接收端代码如下:

 

#define _GNU_SOURCE // for recvmmsg

#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include "common.h"

#define MTU_SIZE (2048-64*2)
#define MAX_MSG 512

struct state {
int fd;
volatile uint64_t bps;
volatile uint64_t pps;
struct mmsghdr messages[MAX_MSG];
char buffers[MAX_MSG][MTU_SIZE];
struct iovec iovecs[MAX_MSG];
} __attribute__ ((aligned (64)));

struct state *state_init(struct state *s) {
int i;
for (i = 0; i < MAX_MSG; i++) {
char *buf = &s->buffers[i][0];
struct iovec *iovec = &s->iovecs[i];
struct mmsghdr *msg = &s->messages[i];

msg->msg_hdr.msg_iov = iovec;
msg->msg_hdr.msg_iovlen = 1;

iovec->iov_base = buf;
iovec->iov_len = MTU_SIZE;
}
return s;
}

static void thread_loop(void *userdata)
{
struct state *state = userdata;

while (1) {
/* Blocking recv. */
int r = recvmmsg(state->fd, &state->messages[0], MAX_MSG, MSG_WAITFORONE, NULL);
if (r <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
PFATAL("recvmmsg()");
}

int i, bytes = 0;
for (i = 0; i < r; i++) {
struct mmsghdr *msg = &state->messages[i];
/* char *buf = msg->msg_hdr.msg_iov->iov_base; */
int len = msg->msg_len;
msg->msg_hdr.msg_flags = 0;
msg->msg_len = 0;
bytes += len;
}
__atomic_fetch_add(&state->pps, r, 0);
__atomic_fetch_add(&state->bps, bytes, 0);
}
}

int main(int argc, const char *argv[])
{
const char *listen_addr_str = "0.0.0.0:4321";
int recv_buf_size = 4*1024;
int thread_num = 1;
int reuseport = 0;

switch (argc) {
case 4:
reuseport = atoi(argv[3]);
case 3:
thread_num = atoi(argv[2]);
case 2:
listen_addr_str = argv[1];
case 1:
break;
default:
FATAL("Usage: %s [listen ip:port] [fork cnt] [reuseport]", argv[0]);
}

struct net_addr listen_addr;
parse_addr(&listen_addr, listen_addr_str);

int main_fd = -1;
if (reuseport == 0) {
fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n",
addr_to_str(&listen_addr), recv_buf_size / 1024);

main_fd = net_bind_udp(&listen_addr, 0);
net_set_buffer_size(main_fd, recv_buf_size, 0);
}

struct state *array_of_states = calloc(thread_num, sizeof(struct state));

int t;
for (t = 0; t < thread_num; t++) {
struct state *state = &array_of_states[t];
state_init(state);
if (reuseport == 0) {
state->fd = main_fd;
} else {
fprintf(stderr, "[*] Starting udpreceiver on %s, recv buffer %iKiB\n",
addr_to_str(&listen_addr), recv_buf_size / 1024);

int fd = net_bind_udp(&listen_addr, 1);
net_set_buffer_size(fd, recv_buf_size, 0);
state->fd = fd;
}
thread_spawn(thread_loop, state);
}

uint64_t last_pps = 0;
uint64_t last_bps = 0;

while (1) {
struct timeval timeout =
NSEC_TIMEVAL(MSEC_NSEC(1000UL));
while (1) {
int r = select(0, NULL, NULL, NULL, &timeout);
if (r != 0) {
continue;
}
if (TIMEVAL_NSEC(&timeout) == 0) {
break;
}
}

uint64_t now_pps = 0, now_bps = 0;
for (t = 0; t < thread_num; t++) {
struct state *state = &array_of_states[t];
now_pps += __atomic_load_n(&state->pps, 0);
now_bps += __atomic_load_n(&state->bps, 0);
}

double delta_pps = now_pps - last_pps;
double delta_bps = now_bps - last_bps;
last_pps = now_pps;
last_bps = now_bps;

printf("%7.3fM pps %7.3fMiB / %7.3fMb\n",
delta_pps / 1000.0 / 1000.0,
delta_bps / 1024.0 / 1024.0,
delta_bps * 8.0 / 1000.0 / 1000.0 );
}

return 0;
}


 所包含的头文件如下:

common.h

#define ERRORF(x...) fprintf(stderr, x)

#define FATAL(x...)                                                            \
do {                                                                   \
ERRORF("[-] PROGRAM ABORT : " x);                              \
ERRORF("\n\tLocation : %s(), %s:%u\n\n", __FUNCTION__,         \
__FILE__, __LINE__);                                    \
exit(EXIT_FAILURE);                                            \
} while (0)

#define PFATAL(x...)                                                           \
do {                                                                   \
ERRORF("[-] SYSTEM ERROR : " x);                               \
ERRORF("\n\tLocation : %s(), %s:%u\n", __FUNCTION__, __FILE__, \
__LINE__);                                              \
perror("      OS message ");                                   \
ERRORF("\n");                                                  \
exit(EXIT_FAILURE);                                            \
} while (0)

#define TIMESPEC_NSEC(ts) ((ts)->tv_sec * 1000000000ULL + (ts)->tv_nsec)
#define TIMEVAL_NSEC(ts)                                                       \
((ts)->tv_sec * 1000000000ULL + (ts)->tv_usec * 1000ULL)
#define NSEC_TIMESPEC(ns)                                                      \
(struct timespec) { (ns) / 1000000000ULL, (ns) % 1000000000ULL }
#define NSEC_TIMEVAL(ns)                                                       \
(struct timeval)                                                       \
{                                                                      \
(ns) / 1000000000ULL, ((ns) % 1000000000ULL) / 1000ULL         \
}
#define MSEC_NSEC(ms) ((ms)*1000000ULL)

/* net.c */
struct net_addr
{
int ipver;
struct sockaddr_in sin4;
struct sockaddr_in6 sin6;
struct sockaddr *sockaddr;
int sockaddr_len;
};

void parse_addr(struct net_addr *netaddr, const char *addr);
const char *addr_to_str(struct net_addr *addr);
int net_bind_udp(struct net_addr *addr, int reuseport);
void net_set_buffer_size(int cd, int max, int send);
void net_gethostbyname(struct net_addr *shost, const char *host, int port);
int net_connect_udp(struct net_addr *addr, int src_port);

struct thread;
struct thread *thread_spawn(void (*callback)(void *), void *userdata);
net.c
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "common.h"

const char *str_quote(const char *s)
{
static char buf[1024];
int r = snprintf(buf, sizeof(buf), "\"%.*s\"", (int)sizeof(buf) - 4, s);
if (r >= (int)sizeof(buf)) {
buf[sizeof(buf) - 1] = 0;
}
return buf;
}

void net_set_buffer_size(int cd, int max, int send)
{
int i, flag;

if (send) {
flag = SO_SNDBUF;
} else {
flag = SO_RCVBUF;
}

int size = 0;

for (i = 0; i < 10 && size; i++) {
int bef;
socklen_t size = sizeof(bef);
if (getsockopt(cd, SOL_SOCKET, flag, &bef, &size) < 0) {
PFATAL("getsockopt(SOL_SOCKET)");
break;
}
if (bef >= max) {
break;
}

size = bef * 2;
if (setsockopt(cd, SOL_SOCKET, flag, &size, sizeof(size)) < 0) {
// don't log error, just break
break;
}
}
}

void parse_addr(struct net_addr *netaddr, const char *addr) {
char *colon = strrchr(addr, ':');
if (colon == NULL) {
FATAL("You forgot to specify port");
}
int port = atoi(colon+1);
if (port < 0 || port > 65535) {
FATAL("Invalid port number %d", port);
}
char host[255];
int addr_len = colon-addr > 254 ? 254 : colon-addr;
strncpy(host, addr, addr_len);
host[addr_len] = '\0';
net_gethostbyname(netaddr, host, port);
}

void net_gethostbyname(struct net_addr *shost, const char *host, int port)
{
memset(shost, 0, sizeof(struct net_addr));

struct in_addr in_addr;
struct in6_addr in6_addr;

/* Try ipv4 address first */
if (inet_pton(AF_INET, host, &in_addr) == 1) {
goto got_ipv4;
}

/* Then ipv6 */
if (inet_pton(AF_INET6, host, &in6_addr) == 1) {
goto got_ipv6;
}

FATAL("inet_pton(%s)", str_quote(host));
return;

got_ipv4:
shost->ipver = 4;
shost->sockaddr = (struct sockaddr*)&shost->sin4;
shost->sockaddr_len = sizeof(shost->sin4);
shost->sin4.sin_family = AF_INET;
shost->sin4.sin_port = htons(port);
shost->sin4.sin_addr = in_addr;
return;

got_ipv6:
shost->ipver = 6;
shost->sockaddr = (struct sockaddr*)&shost->sin6;
shost->sockaddr_len = sizeof(shost->sin4);
shost->sin6.sin6_family = AF_INET6;
shost->sin6.sin6_port = htons(port);
shost->sin6.sin6_addr = in6_addr;
return;
}

const char *addr_to_str(struct net_addr *addr) {
char dst[INET6_ADDRSTRLEN + 1];
int port = 0;

switch (addr->ipver) {
case 4: {
inet_ntop(AF_INET, &addr->sin4.sin_addr, dst, INET6_ADDRSTRLEN);
port = ntohs(addr->sin4.sin_port);
} break;
case 16: {
inet_ntop(AF_INET6, &addr->sin6.sin6_addr, dst, INET6_ADDRSTRLEN);
port = ntohs(addr->sin6.sin6_port);
} break;
default:
dst[0] = '?';
dst[1] = 0x00;
}

static char buf[255];
snprintf(buf, sizeof(buf), "%s:%i", dst, port);
return buf;
}

int net_bind_udp(struct net_addr *shost, int reuseport)
{
int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0) {
PFATAL("socket()");
}

int one = 1;
int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEADDR)");
}

if (reuseport) {
one = 1;
r = setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, (char*)&one, sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEPORT)");
}
}

if (bind(sd, shost->sockaddr, shost->sockaddr_len) < 0) {
PFATAL("bind()");
}

return sd;
}

struct thread {
pthread_t thread_id;
void (*callback)(void *userdata);
void *userdata;
};

static void *_thread_start(void *userdata)
{
struct thread *thread = userdata;

/* Direct all signals to main thread. */
sigset_t set;
sigfillset(&set);
int r = pthread_sigmask(SIG_SETMASK, &set, NULL);
if (r != 0) {
PFATAL("pthread_sigmask()");
}

thread->callback(thread->userdata);
return NULL;
}

struct thread *thread_spawn(void (*callback)(void *), void *userdata)
{
struct thread *thread = calloc(1, sizeof(struct thread));
thread->callback = callback;
thread->userdata = userdata;
int r = pthread_create(&thread->thread_id, NULL, _thread_start, thread);
if (r != 0) {
PFATAL("pthread_create()");
}
return thread;
}

int net_connect_udp(struct net_addr *shost, int src_port)
{
int sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0) {
PFATAL("socket()");
}

int one = 1;
int r = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
sizeof(one));
if (r < 0) {
PFATAL("setsockopt(SO_REUSEADDR)");
}

if (src_port > 1 && src_port < 65536) {
struct net_addr src;
memset(&src, 0, sizeof(struct net_addr));
char buf[32];
snprintf(buf, sizeof(buf), "0.0.0.0:%d", src_port);
parse_addr(&src, buf);
if (bind(sd, src.sockaddr, src.sockaddr_len) < 0) {
PFATAL("bind()");
}
}

if (-1 == connect(sd, shost->sockaddr, shost->sockaddr_len)) {
/* is non-blocking, so we don't get error at that point yet */
if (EINPROGRESS != errno) {
PFATAL("connect()");
return -1;
}
}

return sd;
}


编译脚本如下:

build.sh

#!/bin/sh
set +e

clang -O3 -Wall -Wextra -Wno-unused-parameter \
-ggdb -g -pthread \
-o udpreceiver1 udpreceiver1.c \
net.c

clang -O3 -Wall -Wextra -Wno-unused-parameter \
-ggdb -g -pthread \
-o udpsender udpsender.c \
net.c


命令如下:

发送端./udpsender 192.168.100.100:5000 […]
可指定多个线程

绑核使用 

taskset –c 1,2 ./udpsender 192.168.100.100:5000192.168.100.100:5000


 

接收端 ./udpreceiver 192.168.100.100:5000

绑核使用 

taskset –c 1 ./udpreceiver 192.168.100.100:5000


 

代码中可修改的地方如下:

1、  发送数据包大小,默认为32

2、  一次发送数据包的个数,默认为1024

3、  接收数据包的缓存区,默认为4K

4、  一次接收的最大数据包个数,默认为512

5、  UDP数据包MTU大小,默认为2048-128

实际运行情况如下:



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: