您的位置:首页 > 其它

libevent带负载均衡的多线程使用示例

2015-01-03 19:49 393 查看
功能:

主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。

Makefile

eventtest : eventtest.c

gcc -Wall -g -levent -lpthread -o eventtest eventtest.c

.PHONY : clean

clean :

rm eventtest -f

eventtest.c

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <pthread.h>

#include <sys/types.h>

#include <sys/socket.h>

#include <event.h>

typedef struct {

    pthread_t tid;

    struct event_base *base;

    struct event event;

    int read_fd;

    int write_fd;

}LIBEVENT_THREAD;

typedef struct {

    pthread_t tid;

    struct event_base *base;

}DISPATCHER_THREAD;

const int thread_num = 10;

LIBEVENT_THREAD *threads;

DISPATCHER_THREAD dispatcher_thread;

int last_thread = 0;

static void

thread_libevent_process(int fd, short which, void *arg)

{

    int ret;

    char buf[128];

    LIBEVENT_THREAD *me = arg;

    if (fd != me->read_fd) {

        printf("thread_libevent_process error : fd != me->read_fd\n");

        exit(1);

    }

    ret = read(fd, buf, 128);

    if (ret > 0) {

        buf[ret] = '\0';

        printf("thread %llu receive message : %s\n", (unsigned
long long)me->tid, buf);

    }

    return;

}

static void *

worker_thread(void *arg)

{

    LIBEVENT_THREAD *me = arg;

    me->tid = pthread_self();

    event_base_loop(me->base, 0);

    return NULL;

}

static void

timeout_cb(int fd, short event, void *arg)

{

    struct timeval tv;

    struct event *timeout = arg;

    int tid = (last_thread + 1) % thread_num;        //memcached中线程负载均衡算法

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    write(thread->write_fd, "Hello
world!", sizeof("Hello world!") - 1);

    evutil_timerclear(&tv);

    tv.tv_sec = 1;

    event_add(timeout, &tv);

}

int

main (int argc, char *argv[])

{

    int ret;

    int i;

    int fd[2];

    struct event timeout;

    struct timeval tv;

    pthread_t tid;

    dispatcher_thread.base = event_init();

    if (dispatcher_thread.base == NULL) {

        perror("event_init( base )");

        return 1;

    }

    dispatcher_thread.tid = pthread_self();

    threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));

    if (threads == NULL) {

        perror("calloc");

        return 1;

    }

    for (i = 0; i < thread_num; i++) {

        

        ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);

        if (ret == -1) {

            perror("socketpair()");

            return 1;

        }

        threads[i].read_fd = fd[1];

        threads[i].write_fd = fd[0];

        threads[i].base = event_init();

        if (threads[i].base == NULL) {

            perror("event_init()");

            return 1;

        }

        event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);

        event_base_set(threads[i].base, &threads[i].event);

        if (event_add(&threads[i].event, 0) == -1) {

            perror("event_add()");

            return 1;

        }

    }

    for (i = 0; i < thread_num; i++) {

        pthread_create(&tid, NULL, worker_thread, &threads[i]);

    }

    evtimer_set(&timeout, timeout_cb, &timeout);

    event_base_set(dispatcher_thread.base, &timeout);

    evutil_timerclear(&tv);

    tv.tv_sec = 1;

    event_add(&timeout, &tv);

    event_base_loop(dispatcher_thread.base, 0);

    return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: