zoukankan      html  css  js  c++  java
  • libev简单使用

    libev简单使用

    https://zhuanlan.zhihu.com/p/163282654

    socket编程最紧迫的需求就是有一个好用的buffer对象可以用,就像libevent的evbuffer类似的东西,现在我们定义一个简单的buffer

    buffer.h

    #pragma once
    
    #include <stddef.h>
    
    #ifdef __cplusplus
    extern "C" {
    #endif
    
    struct buffer {
        unsigned char *head, *data, *tail, *end;
    };
    
    void buffer_init(struct buffer *buf);
    void buffer_free(struct buffer *buf);
    
    size_t buffer_length(const struct buffer *buf);
    size_t buffer_size(const struct buffer *buf);
    unsigned char *buffer_data(const struct buffer *buf);
    
    void buffer_append_data(struct buffer *buf, const void *data, size_t len);
    void buffer_append_null(struct buffer *buf);
    void buffer_append_char(struct buffer *buf, char c);
    void buffer_append_printf(struct buffer *buf, const char *fmt, ...);
    
    void buffer_drain(struct buffer *buf, size_t len);
    void buffer_reset(struct buffer *buf);
    
    void buffer_hexdump(const struct buffer *buf);
    
    #ifdef __cplusplus
    }
    #endif

    我们对buffer的简单需求,就是可以在末尾添加数据(buffer_append_data),获取一块连续的内存(buffer_data),消费指定长度的数据(buffer_drain),这些操作内部都自动完成内存管理,使用者不需要考虑内存问题。

    C语言事件循环库除了大名鼎鼎的libevent,还有libev。后者更加轻量。下面我们已libev为例,实现简单的io_copy功能,也即从一个描述符读数据,透传到另一个描述符。

     

    先看使用示例:

    开一个终端运行

    $ nc -v -l 9999
    Listening on [0.0.0.0] (family 0, port 9999)

    运行

    $ cc *.c -lev
    $ ./a.out 127.0.0.1 9999 < /var/log/syslog

    上面的示例程序把标准输入拷贝到127.0.0.1:9999标示的socket上

    下面看main.c文件

    // main.c

    #include "io.h"
    #include "xnet.h"
    #include <ev.h>
    #include <stdio.h>
    #include <stdlib.h>
    
    static void die(const char *s)
    {
        perror(s);
        exit(1);
    }
    
    static void usage(const char *name)
    {
        fprintf(stderr, "usage: %s ip port
    ", name);
        exit(1);
    }
    
    static void completion()
    {
        fprintf(stderr, "done
    ");
    }
    
    static void error(const char *errmsg)
    {
        fprintf(stderr, "error: %s
    ", errmsg);
    }
    
    int main(int argc, char *argv[])
    {
        struct ev_loop *loop;
        int connfd;
        struct io io;
    
        if (argc != 3)
            usage(argv[0]);
    
        connfd = dial_tcp(argv[1], atoi(argv[2]));
        if (connfd == -1)
            die("dial_tcp");
    
        loop = ev_default_loop(0);
    
        io_init(&io, loop, completion, error);
        io_copy(&io, connfd, 0); // 从描述符0拷贝数据转发给描述符connfd
    
        ev_run(loop, 0);
    }
    
    

     

    定义struct io

    // io.h

    #pragma once
    
    #include "buffer.h"
    #include <ev.h>
    
    #ifdef __cplusplus
    extern "C" {
    #endif
    
    typedef void (*completion_cb)();
    typedef void (*error_cb)(const char *errmsg);
    
    struct io {
        struct ev_io src_watcher;
        struct ev_io dst_watcher;
        struct buffer buf;
        struct ev_loop *loop;
        completion_cb completion;
        error_cb error;
        int src_done;
    };
    
    void io_init(struct io *io, struct ev_loop *loop, completion_cb completion, error_cb error);
    void io_copy(struct io *io, int dst, int src);
    
    #ifdef __cplusplus
    }
    #endif

    // io.c

    #include "io.h"
    #include <errno.h>
    #include <unistd.h>
    
    #define HIGH_WATER_MARK (1024 * 1024 * 4)
    #define LOW_WATER_MARK  (1024 * 1024 * 1)
    
    static void io_error(struct io *io)
    {
        ev_io_stop(io->loop, &io->src_watcher);
        ev_io_stop(io->loop, &io->dst_watcher);
    
        if (io->error)
            io->error(strerror(errno));
    }
    
    static void io_completion(struct io *io)
    {
        ev_io_stop(io->loop, &io->src_watcher);
        ev_io_stop(io->loop, &io->dst_watcher);
    
        if (io->completion)
            io->completion();
    }
    
    static void src_read_cb(struct ev_loop *loop, ev_io *w, int revents)
    {
        char buf[4096];
        ssize_t nread;
        struct io *io;
    
        io = (struct io *)w->data;
    
        nread = read(w->fd, buf, sizeof(buf));
        switch (nread) {
        case -1:
            io_error(io);
            return;
        case 0:
            ev_io_stop(loop, &io->src_watcher);
            ev_io_start(loop, &io->dst_watcher);
            io->src_done = 1; // 标示读完成了
            return;
        default:
            buffer_append_data(&io->buf, buf, nread); // 把读到的数据添加到buffer末尾
            ev_io_start(loop, &io->dst_watcher);
    
            if (buffer_length(&io->buf) > HIGH_WATER_MARK) // buffer过大时停止读
                ev_io_stop(loop, &io->src_watcher);
            return;
        }
    }
    
    static void dst_write_cb(struct ev_loop *loop, ev_io *w, int revents)
    {
        ssize_t nwritten;
        struct io *io;
    
        io = (struct io *)w->data;
    
        nwritten = write(w->fd, buffer_data(&io->buf), buffer_length(&io->buf));
        if (nwritten == -1) {
            io_error(io);
            return;
        }
    
        buffer_drain(&io->buf, nwritten); // 消费掉nwritten大小的数据
    
        if ((buffer_length(&io->buf) < LOW_WATER_MARK) && !io->src_done) // buffer过小时,开始读
            ev_io_start(loop, &io->src_watcher);
    
        if ((buffer_length(&io->buf) == 0) && io->src_done)
            io_completion(io);
    }
    
    void io_init(struct io *io, struct ev_loop *loop, completion_cb completion, error_cb error)
    {
        buffer_init(&io->buf);
    
        io->loop = loop;
        io->completion = completion;
        io->error = error;
        io->src_done = 0;
    }
    
    void io_copy(struct io *io, int dst, int src)
    {
        ev_io_init(&io->src_watcher, src_read_cb, src, EV_READ);
        ev_io_init(&io->dst_watcher, dst_write_cb, dst, EV_WRITE);
    
        io->src_watcher.data = io;
        io->dst_watcher.data = io;
    
        ev_io_start(io->loop, &io->src_watcher);
        ev_io_start(io->loop, &io->dst_watcher);
    }

    buffer.c实现

    #include "buffer.h"
    #include <assert.h>
    #include <stdarg.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    
    static inline void *xrealloc(void *ptr, size_t size)
    {
        void *p = realloc(ptr, size);
        if (!p) {
            fprintf(stderr, "realloc(%p, %zu): out of memory
    ", ptr, size);
            exit(1);
        }
        return p;
    }
    
    static inline size_t buffer_headroom(const struct buffer *buf)
    {
        return buf->data - buf->head;
    }
    
    static inline size_t buffer_tailroom(const struct buffer *buf)
    {
        return buf->end - buf->tail;
    }
    
    static inline void move_data(struct buffer *buf)
    {
        size_t data_len;
    
        data_len = buffer_length(buf);
        memmove(buf->head, buf->data, data_len);
        buf->data = buf->head;
        buf->tail = buf->data + data_len;
    }
    
    static void buffer_grow(struct buffer *buf, size_t len)
    {
        size_t size, new_size, data_len;
    
        assert(buf->head == buf->data);
    
        size = buffer_size(buf) + len;
        new_size = 1024;
        while (new_size < size)
            new_size <<= 1;
    
        data_len = buffer_length(buf);
        buf->head = xrealloc(buf->head, new_size);
        buf->data = buf->head;
        buf->tail = buf->data + data_len;
        buf->end = buf->head + new_size;
    }
    
    static void grow_if_needed(struct buffer *buf, size_t len)
    {
        if (buffer_tailroom(buf) >= len)
            return;
    
        if (buffer_headroom(buf) > 0) {
            move_data(buf);
            if (buffer_tailroom(buf) >= len)
                return;
        }
    
        buffer_grow(buf, len);
    }
    
    void buffer_init(struct buffer *buf)
    {
        memset(buf, 0, sizeof(struct buffer));
    }
    
    void buffer_free(struct buffer *buf)
    {
        free(buf->head);
        memset(buf, 0, sizeof(struct buffer));
    }
    
    size_t buffer_length(const struct buffer *buf)
    {
        return buf->tail - buf->data;
    }
    
    size_t buffer_size(const struct buffer *buf)
    {
        return buf->end - buf->head;
    }
    
    unsigned char *buffer_data(const struct buffer *buf)
    {
        return buf->data;
    }
    
    void buffer_append_data(struct buffer *buf, const void *data, size_t len)
    {
        grow_if_needed(buf, len);
        memcpy(buf->tail, data, len);
        buf->tail += len;
    }
    
    void buffer_append_null(struct buffer *buf)
    {
        char null = '';
        buffer_append_data(buf, &null, 1);
    }
    
    void buffer_append_char(struct buffer *buf, char c)
    {
        buffer_append_data(buf, &c, 1);
    }
    
    void buffer_append_printf(struct buffer *buf, const char *fmt, ...)
    {
        va_list ap;
        int npr;
        char *str;
    
        va_start(ap, fmt);
        npr = vsnprintf(NULL, 0, fmt, ap);
        va_end(ap);
    
        str = alloca(npr);
    
        va_start(ap, fmt);
        vsprintf(str, fmt, ap);
        va_end(ap);
    
        buffer_append_data(buf, str, npr);
    }
    
    void buffer_drain(struct buffer *buf, size_t len)
    {
        buf->data += len;
    }
    
    void buffer_reset(struct buffer *buf)
    {
        buf->data = buf->tail = buf->head;
    }
    
    void buffer_hexdump(const struct buffer *buf)
    {
        size_t len, i;
    
        len = buffer_length(buf);
        for (i = 0; i < len; i++)
            printf("%02x", buf->data[i]);
        printf("
    ");
    }

    xnet.h

    #pragma once
    
    #ifdef __cplusplus
    extern "C" {
    #endif
    
    int dial_tcp(const char *ip, int port);
    
    #ifdef __cplusplus
    }
    #endif

    xnet.c

    #include "xnet.h"
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <string.h>
    #include <sys/socket.h>
    #include <unistd.h>
    
    static int dial_net(int socktype, const char *ip, uint16_t port)
    {
        int fd, rc;
        struct sockaddr_in addr;
    
        fd = socket(AF_INET, socktype, 0);
        if (fd == -1)
            return -1;
    
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip);
    
        rc = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
        if (rc == -1) {
            close(fd);
            return -1;
        }
        return fd;
    }
    
    int dial_tcp(const char *ip, int port)
    {
        return dial_net(SOCK_STREAM, ip, port);
    }

    主要思想就是一边从源描述符读,写到buffer里,一边从buffer里消费数据,写到socket。

    ==================== End

  • 相关阅读:
    关于MySQL与SQLLite的Group By排序原理的差别
    利用Java针对MySql封装的jdbc框架类 JdbcUtils 完整实现(包括增删改查、JavaBean反射原理,附源代码)
    深入java并发Lock一
    windows 7 SDK和DDK下载地址
    用cocos2d-x 3.2 实现的FlappyBird
    Java替代C语言的可能性
    Spring Quartz结合Spring mail定期发送邮件
    web小流量实验方案
    paip.微信菜单直接跳转url和获取openid流程总结
    【java web】java运行预编译Groovy脚本
  • 原文地址:https://www.cnblogs.com/lsgxeva/p/14579828.html
Copyright © 2011-2022 走看看