您的位置:首页 > 其它

libevent带负载均衡的多线程使用示例

2014-04-23 18:33 375 查看
转自: http://blog.chinaunix.net/uid-756931-id-353318.html
libevent带负载均衡的多线程使用示例
2011-06-03 11:17:35

分类: LINUX

功能:
主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。

Makefile

eventtest : eventtest.c
gcc -Wall -g -levent -lpthread -o eventtest eventtest.c
.PHONY : clean
clean :
rm eventtest -f

eventtest.c

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <pthread.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <event.h>

typedef struct {

pthread_t tid;

struct event_base *base;

struct event event;

int read_fd;

int write_fd;

}LIBEVENT_THREAD;

typedef struct {

pthread_t tid;

struct event_base *base;

}DISPATCHER_THREAD;

const int thread_num
= 10;

LIBEVENT_THREAD *threads;

DISPATCHER_THREAD dispatcher_thread;

int last_thread
= 0;

static void

thread_libevent_process(int fd, short which, void
*arg)

{

int ret;

char buf[128];

LIBEVENT_THREAD *me
= arg;

if (fd
!= me->read_fd)
{

printf("thread_libevent_process error : fd != me->read_fd\n");

exit(1);

}

ret = read(fd, buf, 128);

if (ret
> 0)
{

buf[ret]
= '\0';

printf("thread %llu receive message : %s\n",
(unsigned long long)me->tid, buf);

}

return;

}

static void *

worker_thread(void
*arg)

{

LIBEVENT_THREAD *me
= arg;

me->tid
= pthread_self();

event_base_loop(me->base, 0);

return NULL;

}

static void

timeout_cb(int fd, short event, void
*arg)

{

struct timeval tv;

struct event *timeout
= arg;

int tid =
(last_thread + 1)
% thread_num; //memcached中线程负载均衡算法

LIBEVENT_THREAD *thread
= threads + tid;

last_thread = tid;

write(thread->write_fd,
"Hello world!", sizeof("Hello world!")
- 1);

evutil_timerclear(&tv);

tv.tv_sec
= 1;

event_add(timeout,
&tv);

}

int

main (int argc, char
*argv[])

{

int ret;

int i;

int fd[2];

struct event timeout;

struct timeval tv;

pthread_t tid;

dispatcher_thread.base
= event_init();

if (dispatcher_thread.base
==
NULL) {

perror("event_init( base )");

return 1;

}

dispatcher_thread.tid
= pthread_self();

threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));

if (threads
==
NULL) {

perror("calloc");

return 1;

}

for (i
= 0; i
< thread_num; i++)
{



ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);

if (ret
==
-1) {

perror("socketpair()");

return 1;

}

threads[i].read_fd
= fd[1];

threads[i].write_fd
= fd[0];

threads[i].base
= event_init();

if (threads[i].base
==
NULL) {

perror("event_init()");

return 1;

}

event_set(&threads[i].event, threads[i].read_fd,
EV_READ | EV_PERSIST, thread_libevent_process,
&threads[i]);

event_base_set(threads[i].base,
&threads[i].event);

if (event_add(&threads[i].event,
0) ==
-1)
{

perror("event_add()");

return 1;

}

}

for (i
= 0; i
< thread_num; i++)
{

pthread_create(&tid,
NULL, worker_thread,
&threads[i]);

}

evtimer_set(&timeout, timeout_cb,
&timeout);

event_base_set(dispatcher_thread.base,
&timeout);

evutil_timerclear(&tv);

tv.tv_sec
= 1;

event_add(&timeout,
&tv);

event_base_loop(dispatcher_thread.base, 0);

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: