zoukankan      html  css  js  c++  java
  • libevent笔记4:Filter_bufferevent过滤器

    Filter_bufferevent是一种基于bufferevent的过滤器,其本身也是一个bufferevent。能够对底层bufferevent输入缓存区中的数据进行操作(加/解密等)后再读取,同样也能在一定的操作后再将数据写入底层bufferevent的输出缓存区。需要注意的是,在创建Filter_bufferevent后,底层bufferevent的读写回调函数就不会再生效了,而缓存区的回调函数依旧有效。

    Filter_bufferevent相关函数

    struct bufferevent *bufferevent_filter_new (struct bufferevent underlying, bufferevent_filter_cb input_filter, bufferevent_filter_cb output_filter, int options, void(free_context)(void *), void *ctx):创建一个过滤器,参数列表如下:

    • struct bufferevent *underlying:需要过滤的底层bufferevent;
    • bufferevent_filter_cb input_filter/bufferevent_filter_cb output_filter:对底层bufferevent的输入/输出缓存器进行操作的过滤器函数,这两个过滤器函数会在其源缓存区中有数据时触发;
    • int option:关于bufferevent的设置;
    • void(*free_context)(void *):释放时调用的函数,返回值及参数列表均为空;
    • void *ctx:传递给过滤器函数的参数;

    typedef enum bufferevent_filter_result(* bufferevent_filter_cb)(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx):需要自定义,并传送给bufferevent_filter_new()的过滤器函数,参数列表如下:

    • struct evbuffer *src:需要处理的数据的源缓存区,即从这个缓存中取需要过滤的数据;
    • struct evbuffer *dst:需要处理的数据的目的缓存区,即将处理后的数据放入这个缓存中;
    • ev_ssize_t dst_limit:目标缓存区的长度,过滤器可以忽略这个参数;
    • enum bufferevent_flush_mode mode:过滤模式,来告诉过滤器数据是因为什么进过滤器的。
    • void *ctx:上一个函数中给定的参数;
    • typedef enum bufferevent_filter_result:过滤函数的返回值,包括:BEV_OK(正常);BEV_NEED_MORE(需要继续读数据);BEV_ERROR(发生错误);

    过滤器的源/目缓存区

    过滤器函数的参数列表中包含了过滤器的源/目的缓存区。而对于输入/输出过滤器,源/目缓存区是不同,而这对理解过滤器至关重要。可以使用一段简单的程序来确定过滤器的参数与底层bufferevent及Filter_bufferevent的关系,客户端正常连接服务器并发生数据,服务器创建一个Filter_bufferevent并两个bufferevent的一共4个缓存区,过滤器则输出源/目的缓存区的地址。

    • 客户端:
    #include <stdio.h>
    #include <signal.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <string.h>
    #include <event2/event.h>
    #include <event2/bufferevent.h>
     
    void read_cb(struct bufferevent *bev, void *arg)
    {
        char buf[1024] = {0}; 
        bufferevent_read(bev, buf, sizeof(buf));
        printf(buf);
    }
     
    void write_cb(struct bufferevent *bev, void *arg)
    {
       printf("我是写缓冲区的回调函数...您已发送
    "); 
    }
     
    void event_cb(struct bufferevent *bev, short events, void *arg)
    {
        if (events & BEV_EVENT_EOF)
        {
            printf("connection closed
    ");  
        }
        else if(events & BEV_EVENT_ERROR)   
        {
            printf("some other error
    ");
        }
        else if(events & BEV_EVENT_CONNECTED)
        {
            printf("服务器已连接
    ");
            return;
        }
        
        bufferevent_free(bev);
        printf("free bufferevent...
    ");
    }
     
    void send_cb(evutil_socket_t fd, short what, void *arg)
    {
        char buf[1024] = {0}; 
        struct bufferevent* bev = (struct bufferevent*)arg;
        read(fd, buf, sizeof(buf));
        bufferevent_write(bev, buf, strlen(buf)-1);
    }
     
    void signal_cb(evutil_socket_t sig, short events, void *user_data)
    {
        struct event_base *base = user_data;
        struct timeval delay = { 2, 0 };
        printf("Caught an interrupt signal; exiting cleanly in two seconds.
    ");
        event_base_loopexit(base, &delay);
    }
    
    int main(int argc, const char* argv[])
    {
        struct event_base* base;
        base = event_base_new();
     
     
        struct bufferevent* bev;
        bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
     
        // 连接服务器
        struct sockaddr_in serv;
        memset(&serv, 0, sizeof(serv));
        serv.sin_family = AF_INET;
        serv.sin_port = htons(9995);
        evutil_inet_pton(AF_INET, "127.0.0.1", &serv.sin_addr.s_addr);
        bufferevent_socket_connect(bev, (struct sockaddr*)&serv, sizeof(serv));
     
        // 设置回调
        bufferevent_setcb(bev, read_cb, NULL, event_cb, NULL);
        bufferevent_enable(bev, EV_READ | EV_PERSIST);
        // 创建一个事件
        struct event* ev = event_new(base, STDIN_FILENO, 
                                     EV_READ|EV_PERSIST, send_cb, bev);
    
        //for Ctrl+C
        struct event *signal_event;
        signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
        event_add(signal_event, NULL);
        event_add(ev, NULL);  
        event_base_dispatch(base);
        event_base_free(base);
        return 0;
    }
    
    • 服务端:
    #include <event2/bufferevent.h>
    #include <event2/event.h>
    #include <event2/buffer.h>
    #include <arpa/inet.h>// interner address
    #include <unistd.h>//os
    #include <stdio.h>
    #include <malloc.h>
    #include <string.h>
    #include <event2/listener.h>
    #include <sys/types.h>
    #include <errno.h>
    
    enum bufferevent_filter_result input_cb(struct evbuffer *src, struct evbuffer *dst, 
    		ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx)
    {
       //输出缓存区的地址
       printf("the src and dst in fun input_cb: %ld, %ld
    ", src, dst);
       //清空源缓存区,避免被一直调用
       evbuffer_drain(src, 1024);
       return BEV_OK;   
    }
    
    enum bufferevent_filter_result output_cb(struct evbuffer *src, struct evbuffer *dst,
                    ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx)
    {
       //输出缓存区的地址
       printf("the src and dst in fun output_cb: %ld, %ld
    ", src, dst);
       //清空源缓存区,避免被一直调用
       evbuffer_drain(src, 1024);
       return BEV_OK;
    }
    
    
    void read_cb(struct bufferevent *bev, void *arg)
    {
       char buf[1024] = {0};
       bufferevent_read(bev, buf, 1024);
       printf("%s
    ", buf);
    }
    
    void write_cb(struct bufferevnet *bev, void *arg)
    {
       printf("write_cb
    "); 
    }
    
    void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    		 struct sockaddr *addr, int len, void *ptr)
    {
       struct sockaddr_in *caddr = (struct sockaddr_in *)addr;
       struct event_base *base = (struct event_base *)ptr;
       
       //init bufferevent
       struct bufferevent *bev;
       bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
       
       bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
       bufferevent_enable(bev, EV_WRITE | EV_READ);
       //bufferevent_write(bev, "Hello client!", strlen("Hello client!")+1);
       
       struct bufferevent *filter_bev = bufferevent_filter_new(bev, input_cb, output_cb, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
       //设置Filter_bufferevent的读写函数
       bufferevent_setcb(filter_bev, read_cb, write_cb, NULL, NULL);
       bufferevent_enable(filter_bev, EV_READ|EV_WRITE);
       //输出底层bufferevent的缓存区地址
       printf("the input evbuffer and output evbuffer of underlying bufferevent: %ld, %ld
    ", bufferevent_get_input(bev), bufferevent_get_output(bev));
       //输出Filter_bufferevent的缓存区地址
       printf("the input evbuffer and output evbuffer of filter bufferevent: %ld, %ld
    ", bufferevent_get_input(filter_bev), bufferevent_get_output(filter_bev));
       //先Filter_bufferevent写
       bufferevent_write(filter_bev, "abc", sizeof("abc"));
    }
    
    
    int main(int argc, const char *argv[])
    {
       //init server
       struct sockaddr_in servaddr;
       memset(&servaddr, 0, sizeof(servaddr));
       servaddr.sin_family = AF_INET;
       servaddr.sin_port = htons(9995);
       servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    
       //init event_base
       struct event_base *base;
       base = event_base_new();
    
       //init linstener
       struct evconnlistener *listener;
       listener = evconnlistener_new_bind(base, listener_cb, base, 
    		   LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE_PORT, 36, 
    		   (struct socketaddr *)&servaddr, sizeof(servaddr));
       event_base_dispatch(base);
    
       evconnlistener_free(listener);
       event_base_free(base);
    }
    

    服务端运行结果:

    sunminming@sunminming:~/libevent/filter$ ./addrdemo 
    the input evbuffer and output evbuffer of underlying bufferevent: 94493149330208, 94493149330352
    the input evbuffer and output evbuffer of filter bufferevent: 94493149331184, 94493149331328
    the src and dst in fun output_cb: 94493149331328, 94493149330352
    the src and dst in fun input_cb: 94493149330208, 94493149331184
    

    可以看出来:

    • 输入过滤器中的源缓存区与底层buferevent的输入缓存区地址一致,目的缓存区与Filter_bufferevent的输入缓存区地址一致。
    • 输出过滤器中的源缓存区与Filter_bufferevent的输出缓存区地址一致,目的缓存区与底层bufferevent的输出缓存区地址一致。
      因此,4个缓存区与2个过滤器函数的关系可以画成下图:

    Demo

    之后就是过滤器正常使用的Demo:输入过滤器会将底层bufferevent的接受到的数据加1处理,而输出过滤器则会将输出的数据复制一份加在原数据的末尾。

    • 服务端:
    #include <event2/bufferevent.h>
    #include <event2/event.h>
    #include <event2/buffer.h>
    #include <arpa/inet.h>// interner address
    #include <unistd.h>//os
    #include <stdio.h>
    #include <malloc.h>
    #include <string.h>
    #include <event2/listener.h>
    #include <sys/types.h>
    #include <errno.h>
    
    enum bufferevent_filter_result input_cb(struct evbuffer *src, struct evbuffer *dst, 
    		ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx)
    {
       //处理输入的数据:从源缓存区读,并加1后放入目的缓存区
       char buf[1024];
       memset(buf, '', sizeof(buf));
       evbuffer_remove(src, buf, sizeof(buf));
       int len = strlen(buf);
       printf("%d", len);
       for(int i = 0; i < len; ++i)
       {
          ++buf[i];
       }
       buf[2*len] = '
    ';
       evbuffer_add(dst, buf, len);
       return BEV_OK;   
    }
    
    enum bufferevent_filter_result output_cb(struct evbuffer *src, struct evbuffer *dst,
                    ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx)
    {
       //处理输出的数据:从源缓存区读,复制后放入目的缓存区
       char buf[1024] = {0};
       memset(buf, '', sizeof(buf));
       evbuffer_remove(src, buf, sizeof(buf));
       int len = strlen(buf);
       for(int i = len; i < 2*len; ++i)
       {
          buf[i] = buf[i - len];
       }
       evbuffer_add(dst, buf, 2*len);
       return BEV_OK;
    }
    
    
    void read_cb(struct bufferevent *bev, void *arg)
    {
       char buf[1024] = {0};
       bufferevent_read(bev, buf, 1024);
       printf("%s
    ", buf);
    }
    
    void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    		 struct sockaddr *addr, int len, void *ptr)
    {
       struct sockaddr_in *caddr = (struct sockaddr_in *)addr;
       struct event_base *base = (struct event_base *)ptr;
       
       //init bufferevent
       struct bufferevent *bev;
       bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
       
       bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
       bufferevent_enable(bev, EV_WRITE | EV_READ);
       
       struct bufferevent *filter_bev = bufferevent_filter_new(bev, input_cb, output_cb, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
       bufferevent_setcb(filter_bev, read_cb, NULL, NULL, NULL);
       bufferevent_enable(filter_bev, EV_READ|EV_WRITE);
    
       bufferevent_write(filter_bev, "abc", sizeof("abc"));
    }
    
    
    int main(int argc, const char *argv[])
    {
       //init server
       struct sockaddr_in servaddr;
       memset(&servaddr, 0, sizeof(servaddr));
       servaddr.sin_family = AF_INET;
       servaddr.sin_port = htons(9995);
       servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    
       //init event_base
       struct event_base *base;
       base = event_base_new();
    
       //init linstener
       struct evconnlistener *listener;
       listener = evconnlistener_new_bind(base, listener_cb, base, 
    		   LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE_PORT, 36, 
    		   (struct socketaddr *)&servaddr, sizeof(servaddr));
       event_base_dispatch(base);
    
       evconnlistener_free(listener);
       event_base_free(base);
    }
    

    客户端与上一节相同。流程见注释。

  • 相关阅读:
    hibernate -- 分页模糊查询中setParameter 和setParameterList
    HTTP协议状态码详解(HTTP Status Code)
    远程桌面全屏显示
    将中文标点符号替换成英文标点符号
    MySQL 三种关联查询的方式: ON vs USING vs 传统风格
    java如何遍历map的所有的元素(各种方法)
    JS处理Cookie
    js追加子元素
    JAVA编程思想(2)
    1047. Student List for Course (25)
  • 原文地址:https://www.cnblogs.com/sunminming/p/12004089.html
Copyright © 2011-2022 走看看