zoukankan      html  css  js  c++  java
  • Linux C++ 网络编程学习系列(6)——多路IO之epoll高级用法

    poll实现多路IO

    1. 源码地址:https://github.com/whuwzp/linuxc/tree/master/epoll_libevent
    2. 源码说明:

    1. 概要

    1.1 libevent的优势

    1. 采用了回调函数,这里就是epoll_event.data.ptr的使用,使用ptr而不是fd, 指向自定义的结构体,结构体中含有回调函数和需要的参数
    2. 把read和write分割开,提高效率(因为对方如果接收端缓冲区满,我们是不能发送的,设定等待EPOLLOUT信号有利于提升效率),read完就删除节点,添加write节点;write完就删除,添加read

    因为EPOLLIN_ET信号处理完之后,我们可以不会再read了,所以修改为EPOLLOUT,等待write

    EPOLLOUT的用处

    2. 核心代码

    //myepoll.h
    #ifndef MyEPOLL_H_ 
    #define MyEPOLL_H_
    
    #include <string.h> 
    #include <stdio.h> 
    #include <sys/epoll.h> 
    #define EPOLLIN_ET (EPOLLIN | EPOLLET)
    //#define EPOLLIN_ET EPOLLIN
    struct myevent_s{ 
        int fd;
    	uint32_t events;
    	int status;
    	char buf[1024];
    	int len;
    	void * arg;
    	void (*callback)(void*);
    }; 
    
    void event_set(struct myevent_s* myevt, int fd, uint32_t events, void(*callback)(void*)); 
    
    void event_add(int epfd,  struct myevent_s * myevt);
    void event_del(int epfd,  struct myevent_s * myevt);
    void event_mod(int epfd,  struct myevent_s * myevt);
    #endif
    

    核心: myevent_s这个结构体中有回调函数,还有arg,这个将会指向myevents_s自己,这样callback就可以调用结构体中的数据了(相当于全都封装在一起了)

    //myepoll.cpp
    #include "myepoll.h"
    
    void event_set(struct myevent_s *myevt, int fd, uint32_t events,
                   void (*callback)(void *arg)) {
        myevt->fd     = fd;
        myevt->events = events;
        myevt->status = 1;
        myevt->arg    = myevt; //核心, 设置指向自己
        memset(myevt->buf, 0, sizeof(myevt->buf));
        myevt->len      = 0;
        myevt->callback = callback;
    }
    
    void event_add(int epfd, struct myevent_s *myevt) {//封装add
        struct epoll_event evt;
        evt.events   = myevt->events;
        evt.data.ptr = myevt;
        epoll_ctl(epfd, EPOLL_CTL_ADD, myevt->fd, &evt);
    }
    void event_del(int epfd, struct myevent_s *myevt) {//封装del
        myevt->status = 0;
        epoll_ctl(epfd, EPOLL_CTL_DEL, myevt->fd, nullptr);
    }
    void event_mod(int epfd, struct myevent_s *myevt) {//删除自己,并添加另一个, write和read
    	epoll_ctl(epfd, EPOLL_CTL_DEL, myevt->fd, nullptr);
    
    	struct epoll_event evt;
    	if (myevt->events & EPOLLIN_ET) {
            myevt->events = EPOLLOUT;
        } else {
            myevt->events = EPOLLIN_ET;
        }
        evt.events   = myevt->events;
        evt.data.ptr = myevt;
    	
        epoll_ctl(epfd, EPOLL_CTL_ADD, myevt->fd, &evt);
    }
    
    //server.cpp
    #include "include/myepoll.h"
    #include "include/wrap.h"
    #include <ctype.h>
    #include <fcntl.h>
    #include <string.h>
    #include <sys/socket.h>
    #include <sys/types.h> /* See NOTES */
    #include <unistd.h>
    #define MAX_CLIENT 1024
    #define PORT 6666
    
    static int                g_epfd;
    static struct epoll_event g_epoll_evts[MAX_CLIENT];
    static struct myevent_s   g_my_evts[MAX_CLIENT];
    
    void callback_write(void *arg);
    void callback_read(void *arg);
    void handler(char *in, char *out) {
        for (int i = 0; i < (int)strlen(out) + 1; ++i) {
            out[i] = toupper(in[i]);
        }
    }
    
    void callback_write(void *arg) {
        printf("callback_write...
    ");
        struct myevent_s *myevt = (struct myevent_s *)arg;
        Write(myevt->fd, myevt->buf, (size_t)myevt->len);
        printf("send: %s
    ", myevt->buf);
        memset(myevt->buf, 0, sizeof(myevt->buf));
    
        event_mod(g_epfd, myevt); //删除write, 添加read
        myevt->callback = callback_read; //修改回调函数为read
        printf("change to read
    ");
    }
    
    void callback_read(void *arg) {
        printf("callback_read...
    ");
        struct myevent_s *myevt = (struct myevent_s *)arg;
        int               ret   = 0;
        myevt->len              = 0;
    
        while (true) {
            if (myevt->len >= (int)sizeof(myevt->buf)) break;
    
            ret = (int)Read(myevt->fd, myevt->buf + myevt->len, 2);
            // ret = (int)Read(myevt->fd, myevt->buf + ret,
            // sizeof(myevt->buf) - (unsigned long)myevt->len);
            printf("ret: %d
    ", ret);
            if (ret > 0) { //继续读
                myevt->len += ret;
            } else if (ret == 0) { //异常
                Close(myevt->fd);
                event_del(g_epfd, myevt);
                return;
            } else if ((ret == -1) && (myevt->len == 0)) {//说明一个都没读
                return;
            } else if ((ret == -1) && (myevt->len > 0)) {//非阻塞时返回-1为正常
                break;
            }
        }
    
        printf("recv: %s
    ", myevt->buf);
        handler(myevt->buf, myevt->buf); //转大写处理
    
        event_mod(g_epfd, myevt); //删除read,添加write
        myevt->callback = callback_write;//改为write
        printf("change to write
    ");
    }
    
    void callback_accept(void *arg) {
        printf("callback_accept...
    ");
        struct myevent_s * myevt = (struct myevent_s *)arg;
        struct sockaddr_in addr;
        int                cfd = -1;
        int                i   = 0;
        socklen_t          len = sizeof(addr);
    
        cfd = Accept(myevt->fd, (struct sockaddr *)&addr, &len);
        printf("=========new client: socket %d %s:%d=============
    ", cfd,
               inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
    
    	fcntl(cfd, F_SETFL, O_NONBLOCK); //设置为非阻塞
        for (i = 0; i < MAX_CLIENT; ++i) {
            if (g_my_evts[i].status == 0) { break; }
        }
        event_set(&g_my_evts[i], cfd, EPOLLIN_ET, callback_read); //设置myevent_s
        event_add(g_epfd, &g_my_evts[i]);//添加cfd的监听,最初为read
    }
    
    int initsocket(int &lfd, int port) {
        int opt = 0;
        g_epfd  = epoll_create(MAX_CLIENT);//create
    
        lfd = Socket(AF_INET, SOCK_STREAM, 0);
        fcntl(lfd, F_SETFL, O_NONBLOCK);//非阻塞
        opt = 1;
        Setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, (socklen_t)sizeof(opt));
    
        struct sockaddr_in addr;
        memset(&addr, 0, sizeof(addr));
        addr.sin_family      = AF_INET;
        addr.sin_port        = htons((uint16_t)port);
        addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        Bind(lfd, (struct sockaddr *)&addr, sizeof(addr));
    
        event_set(&g_my_evts[MAX_CLIENT - 1], lfd, EPOLLIN_ET, callback_accept);//添加
        event_add(g_epfd, &g_my_evts[MAX_CLIENT - 1]);
    
        Listen(lfd, 20);
        return 0;
    }
    
    int main() {
        int               lfd     = 0;
        int               i       = 0;
        int               nselect = 0;
        struct myevent_s *myevt   = nullptr;
    
        for (i = 0; i < MAX_CLIENT; ++i) {
            g_my_evts[i].status = 0;
        }
    
        initsocket(lfd, PORT);
        while (true) {
            printf("waiting...
    ");
            nselect = epoll_wait(g_epfd, g_epoll_evts, MAX_CLIENT + 1, 1000);
            for (i = 0; i < nselect; ++i) {
                myevt = (struct myevent_s *)(g_epoll_evts[i].data.ptr);
    
                if (myevt->fd == lfd) {
                    myevt->callback(myevt->arg);//回调
                    continue;
                }
    
                if ((g_epoll_evts[i].events & EPOLLIN_ET) &&
                    (myevt->events & EPOLLIN_ET)) {
                    myevt->callback(myevt->arg);//回调
                }
                if ((g_epoll_evts[i].events & EPOLLOUT) &&
                    (myevt->events & EPOLLOUT)) {
                    myevt->callback(myevt->arg);//回调
                }
            }
            sleep(1);
        }
     
    }
    

    3. 参考网址

    1. https://www.bilibili.com/video/av53016117?p=69
  • 相关阅读:
    Java异常简介
    mysql索引
    关于this问题
    Centos 7 安装jdk1.7
    Java 将html导出word格式
    tomcat生成ssl证书
    数据库连接池的作用
    jquery的异步获取返回值为中文时乱码解决方法
    JQuery中ajax跨域问题
    JAVA 非对称加密算法RSA
  • 原文地址:https://www.cnblogs.com/whuwzp/p/linux_cpp_network_6_epoll_libevent.html
Copyright © 2011-2022 走看看