zoukankan      html  css  js  c++  java
  • libevent带负载均衡的多线程使用示例

    功能:
    主线程根据负载工作线程负载均衡算法,每隔一秒钟向特定的工作线程发送一条字符串信息,工作线程简单的把字符串信息打开出来。
     
    Makefile
     
    1. eventtest : eventtest.c
    2. gcc -Wall -g -levent -lpthread -o eventtest eventtest.c
    3. .PHONY : clean
    4. clean :
    5. rm eventtest -f
     
    eventtest.c
     
    1. #include <stdio.h>
    2. #include <stdlib.h>
    3. #include <unistd.h>
    4. #include <pthread.h>
    5. #include <sys/types.h>
    6. #include <sys/socket.h>
    7. #include <event.h>
    8. typedef struct {
    9.     pthread_t tid;
    10.     struct event_base *base;
    11.     struct event event;
    12.     int read_fd;
    13.     int write_fd;
    14. }LIBEVENT_THREAD;
    15. typedef struct {
    16.     pthread_t tid;
    17.     struct event_base *base;
    18. }DISPATCHER_THREAD;
    19. const int thread_num = 10;
    20. LIBEVENT_THREAD *threads;
    21. DISPATCHER_THREAD dispatcher_thread;
    22. int last_thread = 0;
    23. static void
    24. thread_libevent_process(int fd, short which, void *arg)
    25. {
    26.     int ret;
    27.     char buf[128];
    28.     LIBEVENT_THREAD *me = arg;
    29.     if (fd != me->read_fd) {
    30.         printf("thread_libevent_process error : fd != me->read_fd ");
    31.         exit(1);
    32.     }
    33.     ret = read(fd, buf, 128);
    34.     if (ret > 0) {
    35.         buf[ret] = '0';
    36.         printf("thread %llu receive message : %s ", (unsigned long long)me->tid, buf);
    37.     }
    38.     return;
    39. }
    40. static void *
    41. worker_thread(void *arg)
    42. {
    43.     LIBEVENT_THREAD *me = arg;
    44.     me->tid = pthread_self();
    45.     event_base_loop(me->base, 0);
    46.     return NULL;
    47. }
    48. static void
    49. timeout_cb(int fd, short event, void *arg)
    50. {
    51.     struct timeval tv;
    52.     struct event *timeout = arg;
    53.     int tid = (last_thread + 1) % thread_num;        //memcached中线程负载均衡算法
    54.     LIBEVENT_THREAD *thread = threads + tid;
    55.     last_thread = tid;
    56.     write(thread->write_fd, "Hello world!", sizeof("Hello world!") - 1);
    57.     evutil_timerclear(&tv);
    58.     tv.tv_sec = 1;
    59.     event_add(timeout, &tv);
    60. }
    61. int
    62. main (int argc, char *argv[])
    63. {
    64.     int ret;
    65.     int i;
    66.     int fd[2];
    67.     struct event timeout;
    68.     struct timeval tv;
    69.     pthread_t tid;
    70.     dispatcher_thread.base = event_init();
    71.     if (dispatcher_thread.base == NULL) {
    72.         perror("event_init( base )");
    73.         return 1;
    74.     }
    75.     dispatcher_thread.tid = pthread_self();
    76.     threads = calloc(thread_num, sizeof(LIBEVENT_THREAD));
    77.     if (threads == NULL) {
    78.         perror("calloc");
    79.         return 1;
    80.     }
    81.     for (i = 0; i < thread_num; i++) {
    82.         
    83.         ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
    84.         if (ret == -1) {
    85.             perror("socketpair()");
    86.             return 1;
    87.         }
    88.         threads[i].read_fd = fd[1];
    89.         threads[i].write_fd = fd[0];
    90.         threads[i].base = event_init();
    91.         if (threads[i].base == NULL) {
    92.             perror("event_init()");
    93.             return 1;
    94.         }
    95.         event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
    96.         event_base_set(threads[i].base, &threads[i].event);
    97.         if (event_add(&threads[i].event, 0) == -1) {
    98.             perror("event_add()");
    99.             return 1;
    100.         }
    101.     }
    102.     for (i = 0; i < thread_num; i++) {
    103.         pthread_create(&tid, NULL, worker_thread, &threads[i]);
    104.     }
    105.     evtimer_set(&timeout, timeout_cb, &timeout);
    106.     event_base_set(dispatcher_thread.base, &timeout);
    107.     evutil_timerclear(&tv);
    108.     tv.tv_sec = 1;
    109.     event_add(&timeout, &tv);
    110.     event_base_loop(dispatcher_thread.base, 0);
    111.     return 0;
    112. }


  • 相关阅读:
    匿名内部类详解
    成员内部类详解
    内部类
    局部内部类详解
    switch
    Enum 类型
    循环
    标号
    软件开发模型
    RUP
  • 原文地址:https://www.cnblogs.com/lchb/p/3235215.html
Copyright © 2011-2022 走看看