zoukankan      html  css  js  c++  java
  • Linux C++ TCP通信基于poll 和 select模型

    wrap.h 

    #ifndef __WRAP_H_
    #define __WRAP_H_
    
    #include <stdlib.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <sys/socket.h>
    
    void perr_exit(const char *s);
    int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);
    int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
    int Connect(int fd, const struct sockaddr *sa, socklen_t salen);
    int Listen(int fd, int backlog);
    int Socket(int family, int type, int protocol);
    ssize_t Read(int fd, void *ptr, size_t nbytes);
    ssize_t Write(int fd, const void *ptr, size_t nbytes);
    int Close(int fd);
    ssize_t Readn(int fd, void *vptr, size_t n);
    ssize_t Writen(int fd, const void *vptr, size_t n);
    ssize_t my_read(int fd, char *ptr);
    ssize_t Readline(int fd, void *vptr, size_t maxlen);
    
    #endif // __WRAP_H_

    wrap.cpp

    #include "wrap.h"
    
    void perr_exit(const char *s)
    {
        perror(s);
        exit(-1);
    }
    
    int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
    {
        int n;
    
    again:
        if ((n = accept(fd, sa, salenptr)) < 0)
        {
            if ((errno == ECONNABORTED) || (errno == EINTR))
            {
                goto again;
            }
            else
            {
                perr_exit("accept error");
            }
        }
        return n;
    }
    
    int Bind(int fd, const struct sockaddr *sa, socklen_t salen)
    {
        int n;
    
        if ((n = bind(fd, sa, salen)) < 0)
            perr_exit("bind error");
    
        return n;
    }
    
    int Connect(int fd, const struct sockaddr *sa, socklen_t salen)
    {
        int n;
        n = connect(fd, sa, salen);
        if (n < 0) {
            perr_exit("connect error");
        }
    
        return n;
    }
    
    int Listen(int fd, int backlog)
    {
        int n;
    
        if ((n = listen(fd, backlog)) < 0)
            perr_exit("listen error");
    
        return n;
    }
    
    int Socket(int family, int type, int protocol)
    {
        int n;
    
        if ((n = socket(family, type, protocol)) < 0)
            perr_exit("socket error");
    
        return n;
    }
    
    ssize_t Read(int fd, void *ptr, size_t nbytes)
    {
        ssize_t n;
    
    again:
        if ( (n = read(fd, ptr, nbytes)) == -1) {
            if (errno == EINTR)
                goto again;
            else
                return -1;
        }
    
        return n;
    }
    
    ssize_t Write(int fd, const void *ptr, size_t nbytes)
    {
        ssize_t n;
    
    again:
        if ((n = write(fd, ptr, nbytes)) == -1) {
            if (errno == EINTR)
                goto again;
            else
                return -1;
        }
        return n;
    }
    
    int Close(int fd)
    {
        int n;
        if ((n = close(fd)) == -1)
            perr_exit("close error");
    
        return n;
    }
    
    /*参三: 应该读取的字节数*/                          //socket 4096  readn(cfd, buf, 4096)   nleft = 4096-1500
    ssize_t Readn(int fd, void *vptr, size_t n)
    {
        size_t  nleft;              //usigned int 剩余未读取的字节数
        ssize_t nread;              //int 实际读到的字节数
    
        char *ptr = (char*)vptr;
        // n 未读取字节数
        nleft = n;
    
        while (nleft > 0) {
            if ((nread = read(fd, ptr, nleft)) < 0) {
                if (errno == EINTR)
                    nread = 0;
                else
                    return -1;
            } else if (nread == 0)
                break;
    
            nleft -= nread;   //nleft = nleft - nread 
            ptr += nread;
        }
        return n - nleft;
    }
    
    ssize_t Writen(int fd, const void *vptr, size_t n)
    {
        size_t nleft;
        ssize_t nwritten;
    
    
        const char *ptr = (const char*)vptr;
        nleft = n;
        while (nleft > 0) {
            if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
                if (nwritten < 0 && errno == EINTR)
                    nwritten = 0;
                else
                    return -1;
            }
            nleft -= nwritten;
            ptr += nwritten;
        }
        return n;
    }
    
    ssize_t my_read(int fd, char *ptr)
    {
        static int read_cnt;
        static char *read_ptr;
        static char read_buf[100];
    
        if (read_cnt <= 0)
        {
    again:
            if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {   //"hello
    "
                if (errno == EINTR)
                    goto again;
                return -1;
            } else if (read_cnt == 0)
                return 0;
    
            read_ptr = read_buf;
        }
        read_cnt--;
        *ptr = *read_ptr++;
    
        return 1;
    }
    
    /*readline --- fgets*/    
    //传出参数 vptr
    ssize_t Readline(int fd, void *vptr, size_t maxlen)
    {
        ssize_t n, rc;
        char c;
        char *ptr = (char*)vptr;
    
        for (n = 1; n < maxlen; n++) {
            if ((rc = my_read(fd, &c)) == 1) {   //ptr[] = hello
    
                *ptr++ = c;
                if (c == '
    ')
                    break;
            } else if (rc == 0) {
                *ptr = 0;
                return n-1;
            } else
                return -1;
        }
        *ptr = 0;
    
        return n;
    }

    客户端代码

    #include <sys/types.h>
    #include <unistd.h>
    #include <string.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <time.h>
    #include <iostream>
    using namespace std;
    
    #include "wrap.h"
    
    #define SERV_IP "127.0.0.1"
    #define SERV_PORT 6666
    
    char *randstr(char *str, const int len)
    {
        srand(time(NULL));
        int i;
        for (i = 0; i < len; ++i)
        {
            switch ((rand() % 3))
            {
            case 1:
                str[i] = 'A' + rand() % 26;
                break;
            case 2:
                str[i] = 'a' + rand() % 26;
                break;
            default:
                str[i] = '0' + rand() % 10;
                break;
            }
        }
        str[++i] = '';
        return str;
    }
    
    int main(void)
    {
        char buf[BUFSIZ];
        // 创建 TCP 套接字
        int sfd = Socket(AF_INET, SOCK_STREAM, 0);
    
        sockaddr_in serv_addr;
        bzero(&serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
    
        // 将文本字符串格式转换成网络字节序的二进制地址
        inet_pton(AF_INET, SERV_IP, &serv_addr.sin_addr.s_addr);
        // 返回以网络字节序表示的16位整数
        serv_addr.sin_port = htons(SERV_PORT);
    
        Connect(sfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
    
        while (true)
        {
            randstr(buf, 5);
            string str = "PID = ";
            str.append(to_string(getpid()));
            str.append(", ");
            str.append(buf);
            int r = Write(sfd, str.c_str(), str.size());
            cout << "PID = " << getpid() << ", Write len = " << r << endl;
            int len = Read(sfd, buf, sizeof(buf));
            cout << "PID = " << getpid() << ", Read len = " << len << endl;
            cout << buf << endl << endl;
    
            sleep(rand() % 3 + 1);
        }
    
        // 关闭套接字
        Close(sfd);
    
        return 0;
    }

    服务器代码

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <poll.h>
    #include <arpa/inet.h>
    #include <ctype.h>
    #include <iostream>
    using namespace std;
    
    #include "wrap.h"
    
    #define MAXLINE 80
    #define SERV_PORT 6666
    #define OPEN_MAX 1024
    
    /**** 1.select 模型 ****/
    // (1) select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数.
    // (2)解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力.
    void testSelect()
    {
        int i, maxi;
    
        // 自定义数组client, 防止遍历1024个文件描述符  FD_SETSIZE默认为1024
        int client[FD_SETSIZE];
        int listenfd, connfd, sockfd;
        char buf[BUFSIZ], str[INET_ADDRSTRLEN];         /* #define INET_ADDRSTRLEN 16 */
    
        struct sockaddr_in clie_addr, serv_addr;
        socklen_t clie_addr_len;
        fd_set rset, allset;                            /* rset 读事件文件描述符集合 allset用来暂存 */
    
        listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    
        int opt = 1;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    
        bzero(&serv_addr, sizeof(serv_addr));
        serv_addr.sin_family= AF_INET;
        serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        serv_addr.sin_port= htons(SERV_PORT);
    
        Bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
        Listen(listenfd, 128);
    
        // 将来用作client[]的下标, 初始值指向0个元素之前下标位置
        maxi = -1;
        for (i = 0; i < FD_SETSIZE; i++)
        {
            /* 用-1初始化client[] */
            client[i] = -1;
        }
    
    
        FD_ZERO(&allset);
        // 构造select监控文件描述符集
        FD_SET(listenfd, &allset);
        // 起初 listenfd 即为最大文件描述符
        int maxfd = listenfd;
    
        while(true)
        {
            /* 每次循环时都从新设置select监控信号集 */
            rset = allset;
            int nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
            if (nready < 0)
            {
                perror("select error");
                exit(-1);
            }
    
            if (FD_ISSET(listenfd, &rset))
            {
                // 说明有新的客户端链接请求
                clie_addr_len = sizeof(clie_addr);
                connfd = Accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);       /* Accept 不会阻塞 */
                printf("received from %s at PORT %d
    ", inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)), ntohs(clie_addr.sin_port));
    
                for (i = 0; i < FD_SETSIZE; i++)
                {
                    if (client[i] < 0)
                    {
                        // 找client[]中没有使用的位置,保存accept返回的文件描述符到client[]里
                        client[i] = connfd;
                        break;
                    }
                }
    
    
                if (i == FD_SETSIZE)
                {
                    // 达到select能监控的文件个数上限 1024
                    fputs("too many clients
    ", stderr);
                    exit(1);
                }
    
                // 向监控文件描述符集合allset添加新的文件描述符connfd
                FD_SET(connfd, &allset);
                if (connfd > maxfd)
                {
                    maxfd = connfd;
                }
    
    
                // 保证maxi存的总是client[]最后一个元素下标
                if (i > maxi)
                {
                    maxi = i;
                }
    
                nready--;
                if (nready == 0)
                {
                    continue;
                }
            }
    
            for (i = 0; i <= maxi; i++)
            {
                // 检测哪个clients 有数据就绪
                if ((sockfd = client[i]) < 0)
                {
                    continue;
                }
    
                if (FD_ISSET(sockfd, &rset))
                {
                    int n = Read(sockfd, buf, sizeof(buf));
                    if (n == 0)
                    {
                        // 当client关闭链接时,服务器端也关闭对应链接
                        Close(sockfd);
                        // 解除select对此文件描述符的监控
                        FD_CLR(sockfd, &allset);
                        client[i] = -1;
                    }
                    else if (n > 0)
                    {
                        for (int j = 0; j < n; j++)
                        {
                            // 把小写字母转换为大写字母
                            buf[j] = toupper(buf[j]);
                        }
                        Write(sockfd, buf, n);
                        cout << buf << endl;
                    }
    
                    nready--;
                    if (nready == 0)
                    {
                        // 跳出for循环, 但还在while循环中
                        break;
                    }
                }
            }
        }
        Close(listenfd);
    }
    
    /**** 2.poll 模型 ****/
    void testPoll()
    {
        int i, listenfd, connfd, sockfd;
    
        char buf[MAXLINE], str[INET_ADDRSTRLEN];
        socklen_t clilen;
        struct sockaddr_in cliaddr, servaddr;
    
        listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    
        // 端口復用
        int opt = 1;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(SERV_PORT);
    
        Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
        Listen(listenfd, 128);
    
        struct pollfd client[OPEN_MAX];
        // 要监听的第一个文件描述符 存入client[0]
        client[0].fd = listenfd;
        // listenfd监听普通读事件
        client[0].events = POLLIN;
    
        for (i = 1; i < OPEN_MAX; i++)
        {
            // 用-1初始化client[]里剩下元素,0也是文件描述符,不能用
            client[i].fd = -1;
        }
    
        // client[]数组有效元素中最大元素下标
        int maxi = 0;
    
        while(true)
        {
            // 阻塞监听是否有客户端链接请求
            int nready = poll(client, maxi+1, -1);
    
            // listenfd有读事件就绪
            if (client[0].revents & POLLIN)
            {
                clilen = sizeof(cliaddr);
                // 接收客户端请求 Accept 不会阻塞
                connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
                printf("received from %s at PORT %d
    ", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));
    
                for (i = 1; i < OPEN_MAX; i++)
                {
                    if (client[i].fd < 0)
                    {
                         // 找到client[]中空闲的位置,存放accept返回的connfd
                        client[i].fd = connfd;
                        break;
                    }
                }
    
                if (i == OPEN_MAX)
                {
                    // 达到了最大客户端数
                    perr_exit("too many clients");
                }
    
                // 设置刚刚返回的connfd,监控读事件
                client[i].events = POLLIN;
                if (i > maxi)
                {
                    // 更新client[]中最大元素下标
                    maxi = i;
                }
    
                nready--;
                if (nready == 0)
                {
                    // 没有更多就绪事件时,继续回到poll阻塞
                    continue;
                }
            }
    
            for (i = 1; i <= maxi; i++)
            {
                // 前面的if没满足,说明没有listenfd满足. 检测client[] 看是那个connfd就绪
                if ((sockfd = client[i].fd) < 0)
                {
                    continue;
                }
    
                if (client[i].revents & POLLIN)
                {
                    ssize_t n = Read(sockfd, buf, MAXLINE);
                    if (n < 0)
                    {
                        // connection reset by client
                        if (errno == ECONNRESET)
                        {
                            /* 收到RST标志 */
                            printf("client[%d] aborted connection
    ", i);
                            Close(sockfd);
                            // poll中不监控该文件描述符,直接置为-1即可,不用像select中那样移除
                            client[i].fd = -1;
                        }
                        else
                        {
                            perr_exit("read error");
                        }
                    }
                    else if (n == 0)
                    {
                        // 说明客户端先关闭链接
                        printf("client[%d] closed connection
    ", i);
                        Close(sockfd);
                        client[i].fd = -1;
                    }
                    else
                    {
                        for (int j = 0; j < n; j++)
                        {
                            buf[j] = toupper(buf[j]);
                        }
                        Writen(sockfd, buf, n);
                        cout << buf << endl;
                    }
    
                    nready--;
                    if (nready == 0)
                    {
                        // 跳出for循环, 但还在while循环中
                        break;
                    }
                }
            }
        }
        return;
    }
    
    int main()
    {
        testPoll();
        return 0;
    }
  • 相关阅读:
    更新Centos 8 内核
    Docker安装
    微服务学习实战笔记 4.1-系统部署篇-Centos 8 下 安装配置K8S
    安装supervisor
    微服务学习实战笔记 4.2-系统部署篇-搭建 Harbor 镜像仓库服务器
    SRS流媒体服务器安装
    微服务学习实战笔记 1.1-系统架构篇-技术选型
    .Net Core 3.0 使用 Serilog 把日志记录到 SqlServer
    IdentityServer4 自定义授权模式
    IdentityServer4 保护.net framework webapi
  • 原文地址:https://www.cnblogs.com/duxie/p/15091595.html
Copyright © 2011-2022 走看看