zoukankan      html  css  js  c++  java
  • C++20协程实例:协程化的IOCP服务端/客户端

    VC支持协程已经有一段时间了,之前一直想不明白协程的意义在哪里,前几天拉屎的时候突然灵光一闪:

    以下是伪代码:

    task server() {
        for (;;) {
            sock_context s = co_await io.accept();
            for (;;) {
                auto buf = co_await io.recv(s);
                if (!buf.length())
                    break;
    
                std::cout << buf.data() << std::endl;
                int n = co_await io.send(s, "收到!", strlen("收到!") + 1);
            }
            co_await io.close(s);
        }
    }

    如果把IO库对外的接口做成上面这样,那岂不是看起来和最简单的阻塞模型相同的代码结构,但它的内在其实是异步的,用单线程相同的代码就能支撑一堆连接通信。

    所以才有了接下来的研究(闲出屁才研究的),好在研究出成品了。

    最终我也明白协程的意义了:

      协程化的库越多,C++程序员的门槛会越低,做上层开发的程序员可以不用知道协程的细节,只要知道如何正确使用库即可。

    好了,真正介绍协程细节的文章有一大堆,不用我来写,我直接放代码,有兴趣的可以参考我的实现以及那些细节文章自己做:

    2021/12/23:我最近使用了一个边缘应用试毒了这个库,一系列修修补补过后,还是很好用的。

    2021/12/23:备注:最好不要用lambda函数作为协程函数,它可能会异常,也可能不会,这属于编译器bug带来的玄学。

    #pragma once
    #include <WinSock2.h>
    #include <MSWSock.h>
    #include <ws2tcpip.h>
    #pragma comment(lib, "ws2_32.lib")
    #include <coroutine>
    #include <string>
    #include <functional>
    #include <thread>
    #include "logger.hpp"
    #include <random>
    
    
    /**
    * 最近花了点时间学习了一下C++20协程,初步改造实现了IOCP协程化的网络IO库
    * 此前基于回调分发的机制,由于上层协议解析所需的各种上下文,导致这个库是模板化的,
    * 现在有了协程,上层协议上下文已经可以在协程函数中实现,消除了模板化,也变得易于维护了一丢丢。
    * 但目前协程还有多少坑是未知的,是好是坏还得再看。
    * 使用协程,就意味着,这个库几乎完全失去了多线程的能力,
    * 要维护好一个内部是多线程,外皮是协程的IO库,我承认我没那个脑子。
    * 我个人当前的状态是不考虑过度设计,只追求上层代码优雅简洁,10几万并发对我而言已经满足了。
    * 如果这还不够用,那就意味着该放弃协程了,协程不是完全没有损耗的,根据我的测试,协程相比回调函数分发的方式,有15%左右的性能损耗。
    */
    #pragma warning(push)
    #pragma warning(disable:4996)
    namespace aqx{
    
        static int init_winsock() {
            WSADATA wd;
            return WSAStartup(MAKEWORD(2, 2), &wd);
        }
    
        static aqx::log nlog;
    
    #ifndef _nf
    #define _nf ((size_t)-1)
    #endif
    #ifndef __AQX_TIME_HPP
    #define __AQX_NOW_FUNC
        using clock64_t = long long;
        template<typename period = std::milli>
        clock64_t now() {
            const clock64_t _Freq = _Query_perf_frequency();
            const clock64_t _Ctr = _Query_perf_counter();
            const clock64_t _Whole = (_Ctr / _Freq) * period::den;
            const clock64_t _Part = (_Ctr % _Freq) * period::den / _Freq;
            return _Whole + _Part;
        }
    #endif
    
        /** 
        * 操作码与状态码定义
        */
        struct net_status {
            static constexpr unsigned int s_accept = 0x01;
            static constexpr unsigned int s_connect = 0x02;
            static constexpr unsigned int s_read = 0x04;
            static constexpr unsigned int s_write = 0x08;
            static constexpr unsigned int s_close = 0x10;
            static constexpr unsigned int s_exec = 0x20;
    
            static constexpr unsigned int t_activated = 0x40;
    
            static constexpr unsigned int t_acceptor = 0x0100;
            static constexpr unsigned int t_connector = 0x0200;
            static constexpr unsigned int t_await_undo = 0x0400;
    
            static constexpr unsigned int t_await_accept = 0x010000;
            static constexpr unsigned int t_await_connect = 0x020000;
            static constexpr unsigned int t_await_read = 0x040000;
            static constexpr unsigned int t_await_write = 0x080000;
            static constexpr unsigned int t_await_close = 0x100000;
            static constexpr unsigned int t_await = 0xFF0000;
        };
    
        /** net_base 主要负责衔接操作系统
        * 不考虑过度设计,写得比较辣鸡,能用就行。
        */
        class net_base {
        public:
            net_base() {
                fd = INVALID_SOCKET;
                hIocp = NULL;
                AcceptEx = NULL;
                ConnectEx = NULL;
                DisconnectEx = NULL;
                StreamCapacity = 1440;
                Timeout = 0;
                DataBacklog = 0;
                WorkerThreadId = 0;
            }
    
            static bool sockaddr_from_string(sockaddr_in& _Addr, const std::string& _Dest) {
                _Addr.sin_addr.S_un.S_addr = INADDR_NONE;
    
                size_t pos = _Dest.find(":");
                if(pos == _nf) {
                    nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data());
                    return false;
                }
    
                auto strip = _Dest.substr(0, pos);
                auto strport = _Dest.substr(pos + 1);
                strport.erase(strport.find_last_not_of("\r\n\t ") + 1);
                strport.erase(0, strport.find_first_not_of("\r\n\t "));
                unsigned short port = (unsigned short)atoi(strport.c_str());
                if (!port) {
                    nlog("%s->目标端口号错误:(%s)\n", __FUNCTION__, _Dest.data());
                    return false;
                }
                
                strip.erase(strip.find_last_not_of("\r\n\t ") + 1);
                strip.erase(0, strip.find_first_not_of("\r\n\t "));
                auto it = std::find_if(strip.begin(), strip.end(), [](char c)->bool {
                    return ((c < '0' || c > '9') && (c != '.'));
                    });
                _Addr.sin_family = AF_INET;
                _Addr.sin_port = htons(port);
                if (it != strip.end()) {
                    hostent* host = gethostbyname(strip.c_str());
                    if (!host) {
                        nlog("%s->错误的目标域名:(%s)\n", __FUNCTION__, _Dest.data());
                        return false;
                    }
                    _Addr.sin_addr = *(in_addr*)(host->h_addr_list[0]);
                }
                else {
                    _Addr.sin_addr.S_un.S_addr = inet_addr(strip.c_str());
                }
    
                if (_Addr.sin_addr.S_un.S_addr == INADDR_NONE) {
                    nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data());
                    return false;
                }
                return true;
            }
    
            static void sockaddr_any(sockaddr_in& _Addr, unsigned short _Port) {
                _Addr.sin_family = AF_INET;
                _Addr.sin_port = htons(_Port);
                _Addr.sin_addr.S_un.S_addr = INADDR_ANY;
            }
    
            static void sockaddr_local(sockaddr_in& _Addr, unsigned short _Port) {
                _Addr.sin_family = AF_INET;
                _Addr.sin_port = htons(_Port);
                _Addr.sin_addr.S_un.S_addr = INADDR_LOOPBACK;
            }
    
            static void* getmswsfunc(SOCKET s, GUID guid) {
                DWORD dwBytes;
                void* lpResult = nullptr;
                WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
                    sizeof(guid), &lpResult, sizeof(lpResult), &dwBytes, NULL, NULL);
                return lpResult;
            }
    
            static std::string sockaddr_to_string(const sockaddr_in &_Addr) {
                char buf[256];
                sprintf(buf, "%d.%d.%d.%d:%d", _Addr.sin_addr.S_un.S_un_b.s_b1,
                    _Addr.sin_addr.S_un.S_un_b.s_b2,
                    _Addr.sin_addr.S_un.S_un_b.s_b3,
                    _Addr.sin_addr.S_un.S_un_b.s_b4,
                    htons(_Addr.sin_port));
                std::string _Result = buf;
                return _Result;
            }
    
        private:
            int init(int _StreamCapacity, int _DataBacklog, int _Timeout) {
                if (fd != INVALID_SOCKET) {
                    return 0;
                }
                auto reterr = [this](int n) {
                    if (fd != INVALID_SOCKET) {
                        closesocket(fd);
                        fd = INVALID_SOCKET;
                    }
                    return n;
                };
                StreamCapacity = _StreamCapacity;
                Timeout = _Timeout;
                if (Timeout < 0) {
                    nlog("%s->Timeout必须>=0", __FUNCTION__);
                    return reterr(-1);
                }
                DataBacklog = _DataBacklog;
                fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
                if (fd == INVALID_SOCKET) {
                    nlog("%s->创建套接字失败:%d", __FUNCTION__, WSAGetLastError());
                    return reterr(-1);
                }
                ConnectEx = (LPFN_CONNECTEX)getmswsfunc(fd, WSAID_CONNECTEX);
                if (!ConnectEx) {
                    nlog("%s->获取 ConnectEx 地址失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    return reterr(-2);
                }
                AcceptEx = (LPFN_ACCEPTEX)getmswsfunc(fd, WSAID_ACCEPTEX);
                if (!AcceptEx) {
                    nlog("%s->获取 AcceptEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    return reterr(-3);
                }
                
                // 我已经不止一次做过DisconnectEx的测试,最终结论都是DisconnectEx并不能提高并发连接数。
                // DisconnectEx 在想象中会更快是因为用IOCP队列锁去换系统全局锁带来了性能提升。
                // 还有一种方法是开一个线程搞个表去阻塞调用DisconnectEx,完事之后直接AcceptEx,也就最终把全局内核锁完全转嫁成你自己的锁了。
                // DisconnectEx首先是不同的操作系统行为不一致,真正保险的做法只能在对方关闭连接时,调用DisconnectEx来复用。
                // 对于IOCP来说,也就是在WSARecv或者WSASend 从 GetQueuedCompletionStatus 返回之后,第2个参数transferred == 0时
                // 同时它受到TCP TIME_WAIT状态的影响
                // 系统存在大量TIME_WAIT套接字时,最终得到的效果是,用了更多内存,去换来了更少的并发连接数。
    
                /*DisconnectEx = (LPFN_DISCONNECTEX)getmswsfunc(fd, WSAID_DISCONNECTEX);
                if (!DisconnectEx) {
                    nlog("%s->获取 DisconnectEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    return reterr(-4);
                }*/
    
                hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
                if (!hIocp) {
                    nlog("%s->创建完成端口失败,错误号:%d", __FUNCTION__, GetLastError());
                    return reterr(-5);
                }
                CreateIoCompletionPort((HANDLE)fd, hIocp, 0, 0);
                return 0;
            }
    
            void close() {
                if (fd != INVALID_SOCKET) {
                    closesocket(fd);
                    fd = INVALID_SOCKET;
                }
    
                if (hIocp) {
                    CloseHandle(hIocp);
                    hIocp = NULL;
                }
            }
    
            BOOL Accept(SOCKET s, char* _Data, LPOVERLAPPED _Overlapped) {
                DWORD _Received = 0;
                return AcceptEx(fd, s, _Data, 0, sizeof(SOCKADDR_IN) << 1, sizeof(SOCKADDR_IN) << 1, &_Received, _Overlapped);
            }
    
            BOOL Connect(SOCKET s, sockaddr* _Addr, int _AddrLen, LPOVERLAPPED _Overlapped) {
                DWORD _Sent = 0;
                return ConnectEx(s, _Addr, _AddrLen, nullptr, 0, &_Sent, _Overlapped);
            }
    
            /*BOOL Disconnect(SOCKET s, LPOVERLAPPED _Overlapped) {
                return DisconnectEx(s, _Overlapped, TF_REUSE_SOCKET, 0);
            }*/
    
            /* 使用了C++11的条件变量与互斥锁实现了同步消息来保证多线程安全IO处理,本质上只是多线程Output
            * 因为完成端口未实现同步消息机制,所以这种操作无论如何都至少要涉及到两个锁(一个IOCP锁,一个其他锁):
            * 1、采用动态new delete,这种方式最坏的情况要经过那把系统全局的大锁,不可取。
            * 2、采用一个我们自己的锁对象,当前使用的方式。
            * 3、每个套接字上下文拥有一个独立的锁对象,总觉得在这种了不起就才10几万并发IO的场景,锁竞争带来的性能损耗不该发展到这一步。
            */
            int SafeIOMessage(DWORD dwNumberOfBytesTransferred, ULONG_PTR dwCompletionKey) {
                std::unique_lock<std::mutex> lock(safeIO.mtx);
                safeIO.cv.wait(lock, [this]() {
                    return (safeIO.s & 1);
                });
                if (safeIO.s == -1)
                    return -1;
                safeIO.s = 0;
                PostQueuedCompletionStatus(hIocp, dwNumberOfBytesTransferred, dwCompletionKey, 0);
                safeIO.cv.wait(lock, [this]() {
                    return (safeIO.s & 3);
                });
                if (safeIO.s == -1)
                    return -1;
                int _Result = safeIO.result;
                safeIO.s = 1;
                safeIO.cv.notify_all();
                return _Result;
            }
    
            void InitSafeIO() {
                std::lock_guard<std::mutex> lg(safeIO.mtx);
                safeIO.s = 1;
            }
    
            void ExitSafeIO() {
                std::lock_guard<std::mutex> lg(safeIO.mtx);
                safeIO.s = -1;
                safeIO.cv.notify_all();
            }
    
            void SafeIOResult(int _Result) {
                // 理论上来说,IOCP工作者线程不需要在此处加锁,实际情况未知,我个人是以悲观的态度对待这个问题
                std::lock_guard<std::mutex> lg(safeIO.mtx);
                safeIO.result = _Result;
                safeIO.s = 2;
                safeIO.cv.notify_all();
            }
    
        private:
            friend class sock;
            friend class netio;
            friend class coio;
            SOCKET fd;
            HANDLE hIocp;
            LPFN_ACCEPTEX AcceptEx;
            LPFN_CONNECTEX ConnectEx;
            LPFN_DISCONNECTEX DisconnectEx;
            int StreamCapacity;
            int Timeout;
            int DataBacklog;
            DWORD WorkerThreadId;
    
            struct safeio_send_struct {
                sock* s;
                void* buf;
                int len;
            };
    
            struct SAFEIO {
                std::mutex mtx;
                std::condition_variable cv;
                int s = -1;
                int result = 0;
            }safeIO;
            
        };
    
        /*直接继承一个std::string来作为套接字的各种缓冲区*/
        class sock_buffer : public std::string {
        public:
            using _Basetype = std::string;
            using _Basetype::_Basetype;
            void preset_length(size_t _Length) {
                // 直接在二进制层面去搞VC的std::string结构,修改std::string::length()的返回值
                // 这么做的好处是,免去了std::string::resize()的拷贝问题。
                // 注意这段代码仅适用于VC,G++的std::string结构和VC不一样。
                struct __stlstr {
                    const char str[0x10];
                    size_t len;
                };
                if (this->capacity() < _Length)
                    this->reserve(_Length);
                ((__stlstr*)this)->len = _Length;
            }
        };
    
        /**
        * 协程task
        */
        template<typename _Ty>
        struct net_task_t {
            struct promise_type;
            using _Hty = std::coroutine_handle<promise_type>;
            struct promise_type {
                net_task_t get_return_object() { return { _Hty::from_promise(*this) }; }
                // initial_suspend 里返回return std::suspend_always{};表示协程初始化成功之后就挂起
                // 这里就挂起,是为了给set_sock留出操作的时间,否则一个空函数协程,会在创建完之后直接就销毁。
                auto initial_suspend() { return std::suspend_always{}; }
    
                auto final_suspend() noexcept { 
                    s->on_destroy_coroutine(); 
                    return std::suspend_never{}; 
                }
                void unhandled_exception() { std::terminate(); }
                void return_void() { }
                _Ty* s = nullptr;
            };
            _Hty _Handle;
            void resume() { _Handle.resume(); }
            void destroy() { _Handle.destroy(); }
            void set_sock(_Ty* _s) { _Handle.promise().s = _s; }
        };
    
        /**套接字上下文*/
        class sock {
            // 这是扩展OVERLAPPED结构
            struct binding {
                OVERLAPPED ol;
                int opt;
                sock* s;
            };
    
            /**
            * 返回给协程recv的对象类型
            */
            class sock_data {
                sock_data(sock* _s) : s(_s) {}
            public:
                char* data() { return s->ibuf.data(); }
                void erase(size_t _Count) { s->ibuf.erase(0, _Count); }
                size_t length() { return s->ibuf.length(); }
                void clear() { s->ibuf.clear(); }
    
            private:
                friend class sock;
                sock* s;
            };
    
            /**返回给协程connect和accept的对象类型
            * 用于异步send与close,
            * 其他线程也可以利用这个对象通信,已经处理了线程安全问题,但不太效率,因为使用了全局锁。
            */
            class asyncsock {
            public:
                /**
                * send 是未加锁的发送数据
                * 没有多线程需求时,send是安全的
                */
                int send(void* data, int len) {
                    if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                        return s->safe_send(data, len);
                    }
                    else {
                        return s->send(data, len);
                    }
                }
    
                int send(const void* data, int len) {
                    if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                        return s->safe_send(data, len);
                    }
                    else {
                        return s->send(data, len);
                    }
                }
    
                void close() {
                    if (s->v->WorkerThreadId != GetCurrentThreadId()) {
                        s->safe_close();
                    }
                    else {
                        s->close();
                    }
                }
    
                bool isactivated() { return s->isactivated(); }
    
                operator bool() {
                    return (s != nullptr);
                }
    
                sockaddr_in& getsockaddr() {
                    return s->getsockaddr();
                }
    
                // 响应超时,这是用来给客户端发送心跳包的
                // 心跳机制是基于操作系统函数 RegisterWaitForSingleObject实现的
                // 会基于netio::init传入的Timeout参数的2/3的频率发送消息
                // 也就是说,Timeout并不是一个绝对准确的数值,这就是为了要给客户端留出发心跳包的切入点的代价。
                // 例如Timeout设置为6000, 真正超时的客户端,将会再4000-8000ms后被检查出来
                void ontimeout(void(*proc)(asyncsock)) {
                    if (!s)
                        return;
                    s->ontimeout = proc;
                }
                
            private:
                bool operator<(const asyncsock& as) const{
                    return (size_t)s < (size_t)as.s;
                }
                friend typename std::less<asyncsock>;
    
            private:
                friend class netio;
                friend class coio;
                friend class sock;
                sock* s = nullptr;
            };
            
            struct recv_awaitable {
                recv_awaitable(sock* s) : data(s) { }
    
                // 当编译器自动将await_ready以及await_suspend优化为inline时,协程态引发异常
                // 使await_ready强制noline时,没有异常。
                __declspec(noinline) bool await_ready() {
                    // 我当前的vs版本是: vs 2022 17.0.1
                    // 这里发现一个编译bug,只要await_ready与await_suspend同时被inline优化
                    // 最后从流程态切换回协程态时,会获取 __coro_frame_ptr.__resume_address 做为recv_awaitable对象来使用
                    // 紧接着就会引发异常
                    // 最终我发现,目前vc的协程与lambda函数之间存在bug,
                    // 使用lambda作为协程函数时,如果此lambda函数inline,就可能会有各种指针错误。
                    // 我已向vs社区报告过此问题,得到的答复时考虑中,也不知道何时修复。
    
                    if (data.s->st & net_status::t_await_undo) {
                        data.s->ibuf.clear();
                        data.s->st &= (~net_status::t_await_undo);
                        return true;
                    }
                    return false;
                }
    
                void await_suspend(std::coroutine_handle<> handle) { }
                sock_data await_resume() const { 
                    return data; 
                }
                sock_data data;
            };
    
            struct sock_awaitable {
                sock_awaitable(sock* _s) { s.s = _s; }
                __declspec(noinline) bool await_ready() {
                    if (s.s->st & net_status::t_await_undo) {
                        s.s->st &= (~net_status::t_await_undo);
                        return true;
                    }
                    return false;
                }
                void await_suspend(std::coroutine_handle<> handle) { }
                sock::asyncsock await_resume() { return s; }
                sock::asyncsock s;
            };
    
            struct close_awaitable {
                close_awaitable(bool _IsSuspend) : IsSuspend(_IsSuspend) { }
                __declspec(noinline) bool await_ready() { return (IsSuspend == false); }
                void await_suspend(std::coroutine_handle<> handle) { }
                void await_resume() { }
                bool IsSuspend;
            };
    
            struct send_awaitable {
                send_awaitable(sock* _s) : s(_s) {}
                __declspec(noinline) bool await_ready() {
                    if (s->st & net_status::t_await_undo) {
                        s->st &= (~net_status::t_await_undo);
                        return true;
                    }
                    return false;
                }
                void await_suspend(std::coroutine_handle<> handle) { }
                int await_resume() { return s->syncsendlen; }
                sock* s;
            };
    
        public:
            using opcode = net_status;
            sock(net_base* _v) {
                fd = INVALID_SOCKET;
                v = _v;
                st = 0;
                ontimeout = nullptr;
                memset(&input.ol, 0, sizeof(input.ol));
                memset(&output.ol, 0, sizeof(output.ol));
                
                if (v->Timeout)
                    output.ol.hEvent = input.ol.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
                else
                    output.ol.hEvent = input.ol.hEvent = NULL;
                output.s = input.s = this;
                output.opt = opcode::s_write;
                ibuf.reserve(v->StreamCapacity);
                obuf.reserve(v->StreamCapacity);
            }
    
            ~sock() {
                close();
                if (!output.ol.hEvent)
                    return;
                CloseHandle(output.ol.hEvent);
                output.ol.hEvent = output.ol.hEvent = NULL;
                if (st & opcode::t_await) 
                    co.destroy();
            }
    
            void on_destroy_coroutine() {
                close();
                st &= (~opcode::t_connector);
            }
    
            bool isactivated() {
                return ((st & opcode::t_activated) != 0);
            }
    
            int send(void* data, int len) {
                if (!len)
                    return len;
                int n = (int)(obuf.capacity() - obuf.length());
                if (n >= len && !obacklog.length()) {
                    obuf.append((char*)data, len);
                }
                else {
                    if (v->DataBacklog != 0 && obacklog.length() + len > v->DataBacklog) {
                        //积压值超过限制
                        close();
                        return -1;
                    }
                    obacklog.append((char*)data, len);
                }
                return (write() == 0) ? len : -1;
            }
    
            int send(const void* data, int len) {
                return send((void*)data, len);
            }
    
            int safe_send(void* data, int len) {
                net_base::safeio_send_struct param = { this, data, len };
                return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);
            }
    
            int safe_send(const void* data, int len) {
                net_base::safeio_send_struct param = { this, (void*)data, len };
                return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);
            }
    
            int safe_close() {
                return v->SafeIOMessage(opcode::s_close, (ULONG_PTR)this);
            }
    
            void close() {
                if (INVALID_SOCKET == fd)
                    return;
                ontimeout = nullptr;
                closesocket(fd);
                fd = INVALID_SOCKET;
                st &= ~opcode::t_activated;
                st |= opcode::s_close;
                set_timer(false);
                ibuf.clear();
                if (obacklog.capacity() <= 0x0F)
                    return;
                sock_buffer tmp;
                obacklog.swap(tmp);
            }
    
            sockaddr_in& getsockaddr() { return sa; }
    
        private:
            int initfd() {
                if (INVALID_SOCKET != fd) {
                    return 0;
                }
                    
                fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
                if (INVALID_SOCKET == fd) {
                    nlog("%s->创建套接字失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    return -1;
                }
                LINGER linger = { 1, 0 };
                setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));
                int b = 1;
                setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&b, sizeof(b));
                CreateIoCompletionPort((HANDLE)fd, v->hIocp, 0, 0);
                return 0;
            }
    
            int bindlocal() {
                sockaddr_in local;
                local.sin_family = AF_INET;
                local.sin_addr.S_un.S_addr = INADDR_ANY;
                local.sin_port = 0;
                if (SOCKET_ERROR == bind(fd, (LPSOCKADDR)&local, sizeof(local))) {
                    nlog("%s->绑定本地端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                    return -1;
                }
                return 0;
            }
    
            bool set_dest(const std::string& _Dest) {
                return net_base::sockaddr_from_string(sa, _Dest);
            }
    
            void set_timer(bool _Enable) {
                if (_Enable) {
                    if (hTimer)
                        return;
                    RegisterWaitForSingleObject(&hTimer, output.ol.hEvent, [](void* Param, BOOLEAN TimerOrWaitFired) {
                        if (!TimerOrWaitFired)
                            return;
                        sock* p = (sock*)Param;
                        PostQueuedCompletionStatus(p->v->hIocp, 0, (ULONG_PTR)p, nullptr);
                    }, this, (ULONG)v->Timeout * 2 / 3, WT_EXECUTEDEFAULT);
                }
                else {
                    if (!hTimer)
                        return;
                    std::ignore = UnregisterWaitEx(hTimer, NULL);
                    hTimer = NULL;
                }
            }
    
            int nat() {
                sockaddr_in _Addr;
                int _AddrLen = sizeof(_Addr);
                if (-1 == getsockname(fd, (sockaddr*)&_Addr, &_AddrLen))
                    return -1;
                SOCKET fdNat = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
                LINGER linger = { 1, 0 };
                setsockopt(fdNat, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));
                CreateIoCompletionPort((HANDLE)fdNat, v->hIocp, 0, 0);
                if (-1 == bind(fdNat, (sockaddr*)&_Addr, sizeof(_Addr))) {
                    closesocket(fdNat);
                    return -1;
                }
                close();
                fd = fdNat;
                return connect();
            }
    
            int accept() {
                if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {
                    nlog("%s->当前套接字未断开连接!", __FUNCTION__);
                    return -1;
                }
    
                if (initfd())
                    return -1;
                DWORD _Received = 0;
                input.opt = opcode::s_accept;
                st &= (~opcode::s_close);
                st |= opcode::s_accept;
                if (!v->Accept(fd, ibuf.data(), &input.ol)) {
                    int _Error = WSAGetLastError();
                    if (_Error != ERROR_IO_PENDING) {
                        st &= (~opcode::s_accept);
                        nlog("%s->AcceptEx失败, 错误号:", __FUNCTION__, WSAGetLastError());
                        return -1;
                    }
                }
                return 0;
            }
    
            int connect() {
                if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {
                    nlog("%s->当前套接字未断开连接!", __FUNCTION__);
                    return -1;
                }
                if (INVALID_SOCKET == fd) {
                    if (initfd())
                        return -1;
                    if (bindlocal())
                        return -1;
                }
                input.opt = opcode::s_connect;
                st &= (~opcode::s_close);
                st |= opcode::s_connect;
    
                if (!v->Connect(fd, (sockaddr*)&sa, sizeof(sa), &input.ol)) {
                    int _Error = WSAGetLastError();
                    if (_Error != ERROR_IO_PENDING) {
                        nlog("%s->ConnectEx失败, 错误号:", __FUNCTION__, WSAGetLastError());
                        return -1;
                    }
                }
                return 0;
            }
    
            int write() {
                if (!(st & opcode::t_activated)) {
                    return -1;
                }
                if (st & (opcode::s_write | opcode::s_close | opcode::s_accept | opcode::s_connect))
                    return 0;
                if (obacklog.size()) {
                    size_t rl = obuf.capacity() - obuf.length();
                    if (rl > obacklog.length())
                        rl = obacklog.length();
                    if (rl) {
                        obuf.append(obacklog.data(), rl);
                        obacklog.erase(0, rl);
                    }
                }
                WSABUF buf = { (ULONG)(obuf.length()), obuf.data() };
                if (!buf.len)
                    return 0;
                st |= opcode::s_write;
                DWORD _Sent = 0;
                if (SOCKET_ERROR == WSASend(fd, &buf, 1, &_Sent, 0, &(output.ol), NULL)) {
                    int _Error = WSAGetLastError();
                    if (WSA_IO_PENDING != _Error) {
                        st &= (~opcode::s_write);
                        return -1;
                    }
                }
                return 0;
            }
    
            int read() {
                if (!(st & opcode::t_activated)) {
                    return -1;
                }
                if (st & (opcode::s_read | opcode::s_close | opcode::s_accept | opcode::s_connect))
                    return 0;
                WSABUF buf = {
                    (ULONG)(ibuf.capacity() - ibuf.length()),
                    ibuf.data() + ibuf.length()
                };
                if ((int)buf.len <= 0) {
                    return -1;
                }
                DWORD _Received = 0;
                DWORD _Flags = 0;
                st |= opcode::s_read;
                input.opt = opcode::s_read;
                if (SOCKET_ERROR == WSARecv(fd, &buf, 1, &_Received, &_Flags, &(input.ol), NULL)) {
                    int _Error = WSAGetLastError();
                    if (WSA_IO_PENDING != _Error) {
                        st &= ~(opcode::s_read);
                        return -1;
                    }
                }
                return 0;
            }
    
        private:
            friend class coio;
            friend class netio;
            SOCKET fd;
            sockaddr_in sa;
            net_base* v;
            int st;
            binding input, output;
            sock_buffer ibuf, obuf, obacklog;
            HANDLE hTimer;
            aqx::clock64_t rtime;
            net_task_t<sock> co;
            void (*ontimeout)(asyncsock);
            int syncsendlen;
        };
    
        // coio是传参给协程函数的操作对象
        class coio {
            coio(sock* _s) : s(_s) {}
    
        public:
            using asyncsock = sock::asyncsock;
            using sock_awaitable = sock::sock_awaitable;
            using close_awaitable = sock::close_awaitable;
            using send_awaitable = sock::send_awaitable;
            using recv_awaitable = sock::recv_awaitable;
    
            struct nat_awaitable {
                nat_awaitable(bool _ret) : ret(_ret) {  }
                __declspec(noinline) bool await_ready() { return (ret == false); }
                void await_suspend(std::coroutine_handle<> handle) { }
                bool await_resume() { return ret; }
                bool ret;
            };
    
            coio() : s(nullptr) {}
    
            sock_awaitable connect(const std::string& _Dest) {
                if (!s->set_dest(_Dest)) {
                    // 设置目标地址失败时,撤销等待。
                    s->st |= net_status::t_await_undo;
                    return sock_awaitable(s);
                }
    
                // 我使用的协程initial_suspend中是不挂起的, 
                // 所以一个套接字的首次connect操作基本都是由其他线程引发的
                // 而且很可能在await_suspend之前,IOCP队列就已经完成
                if (GetCurrentThreadId() == s->v->WorkerThreadId) {
                    if (s->connect()) {
                        // 连接失败时,撤销等待。
                        s->st |= net_status::t_await_undo;
                        return sock_awaitable(s);
                    }
                }
                else {
                    // 因此,不是IOCP队列线程引发的connect就发送到IOCP队列去处理
                    PostQueuedCompletionStatus(s->v->hIocp, net_status::s_connect, (ULONG_PTR)s, 0);
                }
    
                s->st |= net_status::t_await_connect;
                return sock_awaitable(s);
            }
    
            sock_awaitable accept() {
                // 首次accept虽然也是其他线程调用的(一般是main线程)
                // 但首次accept时,IOCP工作线程尚未启动,因此可以无视掉connect的那个问题。
                s->st |= ((!s->accept()) ? net_status::t_await_accept : net_status::t_await_undo);
                return sock_awaitable(s);
            }
    
            /**
            * 以下几个成员函数中的参数asyncsock _s应该等同于私有成员s,除非强行在外部使用syncio对象
            * 使用参数而不是私有成员的原因是防止在尚未连接前调用IO操作。
            * 私有成员s将专用于accept与connect
            */
            close_awaitable close(asyncsock _s) {
                _s.s->close();
                if ((_s.s->st & 0xFF) == net_status::s_close) {
                    // 如果套接字上已经没有任何IO事件,就让awaitable直接唤醒协程
                    // 通常这才是正常状态,但如果有其他线程异步send时,可能就会有未决IO存在了。
                    return close_awaitable(false);
                }
                _s.s->st |= net_status::t_await_close;
                return close_awaitable(true);
            }
    
            send_awaitable send(asyncsock _s, void *buf, int len) {
                _s.s->syncsendlen = _s.send(buf, len);
                _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);
                return sock::send_awaitable(_s.s);
            }
    
            send_awaitable send(asyncsock _s, const void* buf, int len) {
                _s.s->syncsendlen = _s.send(buf, len);
                _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);
                return sock::send_awaitable(_s.s);
            }
    
            recv_awaitable recv(asyncsock _s) {
                int n = _s.s->read();
                if (n < 0) {
                    _s.s->st |= net_status::t_await_undo;
                }
                else {
                    _s.s->st |= net_status::t_await_read;
                }
                return recv_awaitable(_s.s);
            }
    
            nat_awaitable nat(asyncsock _s, const std::string& _Dest) {
                if ((_s.s->st & 0xFF) != net_status::t_activated) {
                    // nat之前必须保证所有未决IO都已经返回,与打洞服务器保持正常连接状态,否则就是失败。
                    // 到这里失败时,依旧与打洞服务器保持着正常连接。
                    return nat_awaitable(false);
                }
    
                sockaddr_in sa = _s.s->sa;
                if (!_s.s->set_dest(_Dest)) {
                    // 设置目标地址失败
                    // 到这里失败时,依旧与打洞服务器保持着正常连接。
                    _s.s->sa = sa;
                    return nat_awaitable(false);
                }
    
                if (_s.s->nat()) {
                    // 到这一步失败时,与打洞服务器的连接就有可能会断掉
                    // nat失败时,本就应该直接close(); 
                    // 都失败了,我想不出还要跟打洞服务器继续苟合的理由。
                    // 如果所有状态全都对,还失败,可能就是双方正好属于无法穿透的NAT类型环境下。
                    // 我对此研究不多,业界内真正懂行的也不多,资料更是少得可怜,我只知道TCP NAT在代码上的表现为:
                    //     1、与打洞服务器保持连接的这个套接字设置了SO_REUSEADDR,确保这个套接字绑定的本地端口可复用。
                    //          在这个库里我全都设置了可复用,但主要目的是为了缓解TIME_WAIT,并不是为了穿透。
                    //     2、双方通过打洞服务器沟通好各自的远端地址
                    //     3、双方都创建一个新的套接字,并将该套接字绑定到本地与打洞服务器进行连接的那个地址(getsockname可以获得)
                    //          到第 3 步处理好之后,与打洞服务器连接的那个套接字,已经废了,无法再进行通信,此时应该把它close掉。
                    //     4、最后双方都connect对方的地址。
                    _s.s->sa = sa;
                    return nat_awaitable(false);
                }
    
                s->st |= net_status::t_await_connect;
                return nat_awaitable(true);
            }
    
            bool valid() {
                return (s != nullptr);
            }
    
            operator bool () {
                return valid();
            }
    
        private:
            friend class netio;
            sock* s;
        };
    
        /**
        * 可以简单把netio看成是一个容器的作用
        * 它主要用于对接net_base,创建线程,处理IO事件。
        */
        class netio {
            struct IOCP_STATUS {
                DWORD transferred;
                SIZE_T key;
                typename sock::binding* pb;
                BOOL ok;
            };
    
        public:
            /**listener 只是一种简单的参数包装,只是为了方便构造而已
            * 构造参数:
            * _Dest 要监听的地址和端口,格式为:"a.b.c.d:port"
            * _ListenBacklog 系统函数listen的第2个参数
            * _MaxClients 最多同时接受的客户端数量
            */
            class listener {
            public:
                listener() {
                    max_clients = 0;
                    listen_backlog = 0;
                    addr.sin_addr.S_un.S_addr = INADDR_NONE;
                }
    
                listener(const std::string& _Dest, int _ListenBacklog, size_t _MaxClients) {
                    max_clients = _MaxClients;
                    listen_backlog = _ListenBacklog;
                    net_base::sockaddr_from_string(addr, _Dest);
                }
    
            private:
                friend class netio;
                sockaddr_in addr;
                int listen_backlog;
                size_t max_clients;
            };
    
            using asyncsock = sock::asyncsock;
            using sock_data = sock::sock_data;
            using opcode = net_status;
            using task = net_task_t<sock>;
    
            int init(int _StreamCapacity = 1440, int _DataBacklog = 0, int _Timeout = 0) {
                std::lock_guard<std::mutex> lg(mtx);
                return nwb.init(_StreamCapacity, _DataBacklog, _Timeout);
            }
    
            int server(const std::function<task(coio)> &_func, const listener &param) {
                std::lock_guard<std::mutex> lg(mtx);
                if (thd.joinable()) {
                    nlog("%s->netio已启动, 请勿重复调用!", __FUNCTION__);
                    return 0;
                }
    
                if (nwb.fd == INVALID_SOCKET)
                    return -1;
    
                cofunc = _func;
                if (param.addr.sin_addr.S_un.S_addr != INADDR_NONE) {
                    if (SOCKET_ERROR == bind(nwb.fd, (SOCKADDR*)&param.addr, sizeof(SOCKADDR))) {
                        nlog("%s->绑定端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                        nwb.close();
                        return -1;
                    }
    
                    if (SOCKET_ERROR == ::listen(nwb.fd, param.listen_backlog)) {
                        nlog("%s->监听失败,错误号:%d", __FUNCTION__, WSAGetLastError());
                        nwb.close();
                        return -1;
                    }
    
                    for (int i = 0; i < param.max_clients; i++) {
                        sock* psock = new sock(&nwb);
                        a_list.push_back(psock);
                        psock->st |= opcode::t_acceptor;
                        psock->co = cofunc(coio(psock));
                        psock->co.set_sock(psock);
                        psock->co.resume();
                    }
                }
                __start();
                return 0;
            }
    
    
    
            // client是一次性的,专用于客户端
            // 让它返回asyncsock对象的理由是为了给脚本语言预留的
            // 例如可以使用lua去实现类似node.js的那种connect之后不管连没连上就先得到对象去绑定事件的机制。
            asyncsock client(const std::function<task(coio)>& _func) {
                std::lock_guard<std::mutex> lg(mtx);
                coio io;
                asyncsock ret;
                if (!thd.joinable()) {
                    // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的
                    if (nwb.fd == INVALID_SOCKET)
                        return ret;
                    __start();
                }
                io.s = get_connector();
                ret.s = io.s;
                io.s->co = _func(io);
                io.s->co.set_sock(io.s);
                io.s->co.resume();
                return ret;
            }
    
            void exec(const std::function<void()>& _func) {
                if (!thd.joinable()) {
                    // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的
                    if (nwb.fd == INVALID_SOCKET)
                        return;
                    __start();
                }
    
                nwb.SafeIOMessage(opcode::s_exec, (ULONG_PTR)&_func);
            }
    
            void stop() {
                std::lock_guard<std::mutex> lg(mtx);
                if (thd.joinable()) {
                    PostQueuedCompletionStatus(nwb.hIocp, -1, 0, 0);
                    thd.join();
                }
            }
    
            void release() {
                std::lock_guard<std::mutex> lg(mtx);
                if (thd.joinable()) {
                    nlog("%s->nio正在运行,请先stop", __FUNCTION__);
                    return;
                }
    
                for (auto p : a_list) {
                    delete p;
                }
                a_list.clear();
    
                for (auto p : c_list) {
                    delete p;
                }
                c_list.clear();
                nwb.close();
            }
    
        private:
            sock* get_connector() {
                sock* psock = nullptr;
                
                for (auto v : c_list) {
                    if ((v->st & opcode::t_connector) == 0 && ((v->st & 0xFF)| opcode::s_close) == opcode::s_close) {
                        psock = v;
                        break;
                    }
                }
    
                if (!psock) {
                    psock = new sock(&nwb);
                    c_list.push_back(psock);
                }
    
                psock->st |= opcode::t_connector;
                return psock;
            }
    
            void on_connect(sock& s) {
                s.ibuf.clear();
                s.obuf.clear();
                s.obacklog.clear();
                s.rtime = aqx::now();
                if (nwb.Timeout != 0)
                    s.set_timer(true);
                s.st |= opcode::t_activated;
            }
            
            void on_accept(sock &s) {
                // 懒得去调用GetAcceptExSockAddrs,有硬编码可用
    #ifndef _WIN64
                s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x26);
    #else
                s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x20);
    #endif
                on_connect(s);
            }
            
            bool on_resume(sock& s) {
                if (s.st & opcode::t_await) {
                    // 清除所有协程等待标志
                    s.st &= (~opcode::t_await);
    
                    // 唤醒协程
                    s.co.resume();
                    return true;
                }
                return false;
            }
    
            void on_close(sock& s) {
                if ((s.st & 0xFF) == opcode::s_close) {
                    s.st &= ~opcode::s_close;
                    on_resume(s);
                }
            }
    
            bool error_resume(sock &s) {
                int st = s.st & opcode::t_await;
                switch (st) {
                case opcode::t_await_accept:
                case opcode::t_await_connect:
                case opcode::t_await_close:
                    s.st &= (~opcode::t_await);
                    s.co.resume();
                    return true;
                case opcode::t_await_read:
                    s.ibuf.clear();
                    s.st &= (~opcode::t_await);
                    s.co.resume();
                    return true;
                case opcode::t_await_write:
                    s.syncsendlen = -1;
                    s.st &= (~opcode::t_await);
                    s.co.resume();
                    return true;
                default:
                    break;
                }
                return false;
            }
    
            void on_reset(sock &s) {
                if ((s.st & 0xFF) == opcode::s_close) {
                    s.st &= ~opcode::s_close;
                    if (s.st & opcode::t_acceptor) {
                        // 如果服务端协程不在一个循环里,协程返回自动销毁后就会这样
                        // 此时的挽救措施就是创建一个新的协程
                        s.co = cofunc(coio(&s));
                    }
                }
            }
    
            void on_completion(IOCP_STATUS& st) {
                sock& s = *(st.pb->s);
                int op = st.pb->opt;
                s.st &= (~op);
                if (s.st & opcode::s_close)
                    op = 0;
                //nlog("on_completion:%I64X, %d", &s, op);
                switch (op) {
                case 0:
                    break;
                case opcode::s_accept:
                    on_accept(s);
                    break;
                case opcode::s_connect:
                    if (!st.ok && WSAGetLastError() == 1225) {
                        // 出现这种错误,一般是由于服务端没有在监听指定端口,直接被操作系统拒绝了。
                        op = 0;
                        break;
                    }
                    on_connect(s);
                    break;
                case opcode::s_read:
                    if (!st.transferred) {
                        op = 0;
                        break;
                    }
                    s.rtime = aqx::now();
                    s.ibuf.preset_length(s.ibuf.length() + st.transferred);
                    break;
                case opcode::s_write:
                    if (!st.transferred) {
                        op = 0;
                        break;
                    }
                    s.rtime = aqx::now();
                    s.obuf.erase(0, st.transferred);
                    if (s.obuf.length() || s.obacklog.length()) {
                        if (s.write()) {
                            op = 0;
                            break;
                        }
                    }
                    // write操作可能是非协程发起的,协程很可能挂起在recv,因此需要判断一下。
                    if (!(s.st & opcode::t_await_write))
                        return;
                    break;
                }
                
                //nlog("on_completion2:%I64X, %d", &s, op);
                if (!op) {
                    if (error_resume(s))
                        return;
                    // 只有当协程被销毁时,error_resume才会返回false
                    s.close();
                    on_reset(s);
                    return;
                }
                
                on_resume(s);
                if (s.st & opcode::s_close)
                    return on_close(s);
            }
    
            void on_msgtimeout(sock *psock) {
                if (aqx::now() - psock->rtime >= nwb.Timeout && (psock->st & opcode::t_activated)) {
                    psock->close();
                    if (error_resume(*psock))
                        return;
                    on_reset(*psock);
                    return;
                }
    
                if (psock->ontimeout != nullptr) {
                    asyncsock as;
                    as.s = psock;
                    psock->ontimeout(as);
                }
            }
    
            void on_msgconnect(sock* psock) {
                if (psock->connect()) {
                    psock->close();
                    if (error_resume(*psock))
                        return;
                    on_reset(*psock);
                }
            }
    
            void on_msgwrite(net_base::safeio_send_struct* pss) {
                nwb.SafeIOResult(pss->s->send(pss->buf, pss->len));
            }
    
            void on_msgclose(sock* psock) {
                psock->close();
                nwb.SafeIOResult(0);
            }
    
            void __start() {
                thd = std::thread([this]() {
                    nwb.WorkerThreadId = GetCurrentThreadId();
                    srand((unsigned int)aqx::now() + nwb.WorkerThreadId);
                    nwb.InitSafeIO();
                    IOCP_STATUS st = { 0,0,0,0 };
                    //nlog("netio::worker->I/O工作线程 %d 开始!", nwb.WorkerThreadId);
                    
                    for (;;) {
                        st.ok = GetQueuedCompletionStatus(nwb.hIocp,
                            &(st.transferred),
                            &(st.key),
                            (OVERLAPPED**)&(st.pb),
                            INFINITE);
    
                        if (!st.pb) {
    
                            if (st.transferred == -1) 
                                break;
                            
                            switch (st.transferred) {
                            case 0:
                                on_msgtimeout((sock*)st.key);
                                break;
                            case opcode::s_connect:
                                on_msgconnect((sock*)st.key);
                                break;
                            case opcode::s_write: 
                                on_msgwrite((net_base::safeio_send_struct*)st.key);
                                break;
                            case opcode::s_close:
                                on_msgclose((sock*)st.key);
                                break;
                            case opcode::s_exec:
                                (*((std::function<void()>*)st.key))();
                                nwb.SafeIOResult(0);
                                break;
                            }
                            continue;
                        }
                        on_completion(st);
                    }
                    
                    nwb.ExitSafeIO();
                    nwb.WorkerThreadId = 0;
                    //nlog("netio::worker->I/O工作线程 %d 已停止!", nwb.WorkerThreadId);
                });
            }
    
            
    
        private:
            net_base nwb;
            std::list<sock*> a_list;
            std::list<sock*> c_list;
            std::function<task(coio)> cofunc;
            std::thread thd;
            std::mutex mtx;
        };
    }
    
    #pragma warning(pop)

    这个库我已经去除了各种耦合,除了日志库,aqx::log我自己写的一个简单的格式化日志库:

    logger.hpp
    #pragma once
    #include <iostream>
    #include <string>
    #include <time.h>
    #include <stdarg.h>
    #include <mutex>
    #include <vector>
    
    //aqx::log不与aqx其他库耦合
    #if defined(_WIN32) || defined(_WIN64)
    #ifndef _WINDOWS_
    #include <WinSock2.h>
    #endif
    #define __aqxlog_getpid GetCurrentProcessId
    #define __aqxlog_gettid GetCurrentThreadId
    #include <io.h>
    #else
    #if defined(__linux__)
    #include <unistd.h>
    #include <sys/syscall.h>
    #define __aqxlog_getpid getpid
    #define __aqxlog_gettid() syscall(__NR_gettid)
    #endif
    #endif
    
    #pragma warning(push)
    #pragma warning(disable:4996)
    
    namespace aqx {
    
        class log {
        private:
            struct _format_texts {
                std::string time;
                std::string type;
                std::string pid;
                std::string tid;
            };
    
        public:
            static constexpr auto hs_time{ static_cast<int>(1) };
            static constexpr auto hs_type{ static_cast<int>(2) };
            static constexpr auto hs_pid{ static_cast<int>(4) };
            static constexpr auto hs_tid{ static_cast<int>(8) };
    
            log() {
                _stdout_fp = stdout;
                fp = stdout;
                _fmtts = { "%Y/%m/%d %H:%M:%S ", "{%s} ",  "[%d] ",  "(%d) " };
                head_style = log::hs_time;
                head_presize = _gethps();
                _EnableInfo = true;
                _EnableError = false;
                _EnableDebug = false;
                _EnableWarn = false;
                _DefType = "info";
                s.reserve(0x1000);
            }
    
            ~log() {
                if (fp != _stdout_fp)
                    fclose(fp);
            }
    
            void enable(const std::string_view& _Type, bool _Enable) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (_Type == "info")
                    _EnableInfo = _Enable;
                else if (_Type == "error")
                    _EnableError = _Enable;
                else if (_Type == "debug")
                    _EnableDebug = _Enable;
                else if (_Type == "warn")
                    _EnableWarn = _Enable;
            }
    
            void seths(int hs) {
                std::lock_guard<std::mutex> lg(_Mtx);
                head_style = hs;
                head_presize = _gethps();
            }
    
            void sethfmt(int _Style, const char* _Fmt) {
                std::lock_guard<std::mutex> lg(_Mtx);
                switch (_Style) {
                case hs_time:
                    _fmtts.time = _Fmt;
                    break;
                case hs_type:
                    _fmtts.type = _Fmt;
                    break;
                case hs_pid:
                    _fmtts.pid = _Fmt;
                    break;
                case hs_tid:
                    _fmtts.tid = _Fmt;
                    break;
                }
                head_presize = _gethps();
            }
    
            bool setvfs(const char* _FileName, bool _PutStdout = false) {
                std::lock_guard<std::mutex> lg(_Mtx);
                FILE* _tmp = fopen(_FileName, "ab");
                if (!_tmp)
                    return false;
                if (fp != _stdout_fp)
                    fclose(fp);
                fp = _tmp;
                PutStdout = _PutStdout;
                return true;
            }
    
            log& info(const char* _Fmt, ...) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (!_EnableInfo)
                    return *this;
                va_list vl;
                va_start(vl, _Fmt);
                _build("info", _Fmt, vl);
                va_end(vl);
                _putlog();
                return *this;
            }
    
            log& debug(const char* _Fmt, ...) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (!_EnableDebug)
                    return *this;
                va_list vl;
                va_start(vl, _Fmt);
                _build("info", _Fmt, vl);
                va_end(vl);
                _putlog();
                return *this;
            }
    
            log& error(const char* _Fmt, ...) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (!_EnableError)
                    return *this;
                va_list vl;
                va_start(vl, _Fmt);
                _build("info", _Fmt, vl);
                va_end(vl);
                _putlog();
                return *this;
            }
    
            log& warn(const char* _Fmt, ...) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (!_EnableWarn)
                    return *this;
                va_list vl;
                va_start(vl, _Fmt);
                _build("info", _Fmt, vl);
                va_end(vl);
                _putlog();
                return *this;
            }
    
            log& operator()(const char* _Fmt, ...) {
                std::lock_guard<std::mutex> lg(_Mtx);
                if (!_EnableInfo)
                    return *this;
                va_list vl;
                va_start(vl, _Fmt);
                _build(_DefType.c_str(), _Fmt, vl);
                va_end(vl);
                _putlog();
                return *this;
            }
    
        private:
            void _putlog() {
                fputs(s.data(), fp);
                if (fp != _stdout_fp) {
                    //fflush(fp);
                    if (PutStdout)
                        fputs(s.data(), _stdout_fp);
                }
            }
    
            size_t _build(const char* _Type, const char* _Fmt, va_list vl) {
                s.clear();
                size_t n = vsnprintf(nullptr, 0, _Fmt, vl);
                if (n <= 0)
                    return _build_head(_Type);
                if (n >= s.capacity()) {
                    s.clear();
                    s.reserve(n + head_presize);
                }
                size_t _Pos = _build_head(_Type);
                char* p = (char*)s.data();
                _Pos += vsnprintf(p + _Pos, s.capacity(), _Fmt, vl);
                char c = p[_Pos - 1];
    #ifdef _WINDOWS_
                if (c != '\r' && c != '\n') {
                    p[_Pos++] = '\r';
                    p[_Pos++] = '\n';
                    p[_Pos] = '\0';
                }
    
    #else
                if (c != '\r' && c != '\n') {
                    p[_Pos++] = '\n';
                    p[_Pos] = '\0';
                }
    #endif
    
                return _Pos;
            }
    
            size_t _build_time(size_t _Pos) {
                if (!(head_style & log::hs_time))
                    return _Pos;
                time_t t = time(NULL);
                auto _Tm = localtime(&t);
                _Pos += strftime((char*)s.data() + _Pos, head_presize, _fmtts.time.c_str(), _Tm);
                return _Pos;
            }
    
            size_t _build_type(size_t _Pos, const char* _Type) {
                if (!(head_style & log::hs_type))
                    return _Pos;
                _Pos += sprintf((char*)s.data() + _Pos, _fmtts.type.c_str(), _Type);
                return _Pos;
            }
    
            size_t _build_pid(size_t _Pos) {
                if (!(head_style & log::hs_pid))
                    return _Pos;
                auto _Pid = __aqxlog_getpid();
                _Pos += sprintf((char*)s.data() + _Pos, _fmtts.pid.c_str(), _Pid);
                return _Pos;
            }
    
            size_t _build_tid(size_t _Pos) {
                if (!(head_style & log::hs_tid))
                    return _Pos;
                auto _Tid = __aqxlog_gettid();
                _Pos += sprintf((char*)s.data() + _Pos, _fmtts.tid.c_str(), _Tid);
                return _Pos;
            }
    
            size_t _build_head(const char* _Type) {
                return _build_tid(_build_pid(_build_type(_build_time(0), _Type)));
            }
    
            size_t _gethps() {
                size_t _Result = 3;
                if (head_style & log::hs_time)
                    _Result += ((_fmtts.time.length() << 1) + 30);
                if (head_style & log::hs_type)
                    _Result += ((_fmtts.pid.length() << 1) + 12);
                if (head_style & log::hs_pid)
                    _Result += ((_fmtts.pid.length() << 1) + 20);
                if (head_style & log::hs_tid)
                    _Result += ((_fmtts.pid.length() << 1) + 20);
                return _Result;
            }
    
        private:
            std::vector<char> s;
            FILE* fp;
            _format_texts _fmtts;
            int head_style;
            size_t head_presize;
            bool PutStdout;
            FILE* _stdout_fp;
            std::mutex _Mtx;
            std::string _DefType;
            bool _EnableInfo;
            bool _EnableDebug;
            bool _EnableError;
            bool _EnableWarn;
        };
    }
    
    static aqx::log logger;
    #pragma warning(pop)

    最后是测试代码:客户端和服务端放在一起了,要分离就从nio.init后面的几个地方分离一下。

    // main.cpp
    #include <iostream>
    #include <aqx/netio.hpp>
    
    int main()
    {
        aqx::init_winsock();
    
        aqx::netio nio;
        nio.init(1440, 0x10000);
    
        // 一个简单的echo服务器例子:
    
        nio.server([](aqx::coio io)->aqx::netio::task {
            // 服务端始终应该放在一个死循环里,否则兜底逻辑会反复创建新协程。
            for (;;) {
                // io.accept会返回一个可用于异步send和close的对象
                auto s = co_await io.accept();
                logger("客户端连入:%s", aqx::net_base::sockaddr_to_string(s.getsockaddr()));
                for (;;) {
                    auto buf = co_await io.recv(s);
                    if (!buf.length()) {
                        logger("断开连接!");
                        break;
                    }
    
                    puts(buf.data());
                    buf.clear();
                    // 异步发送,协程不会在这里挂起
                    s.send("收到!", 5);
                    
                }
                co_await io.close(s);
                logger("已关闭!");
            }
        }, aqx::netio::listener("0.0.0.0:55554", 100, 100));
    
    
    
        // 我已经懒到让客户端和服务端都放在一起了,要分自己分
        auto sock1 = nio.client([](aqx::coio io)->aqx::netio::task {
            // 客户端只有需要自动重连,才放在循环里处理
            for (;;) {
                auto s = co_await io.connect("127.0.0.1:55554");
                if (!s) {
                    co_await io.close(s);
                    continue;
                }
    
                for (;;) {
                    auto buf = co_await io.recv(s);
                    if (!buf.length()) {
                        break;
                    }
                    puts(buf.data());
                    buf.clear();
                }
                
                co_await io.close(s);
            }
           
        });
    
        // 我已经懒到让客户端和服务端都放在一起了,要分自己分
        auto sock2 = nio.client([](aqx::coio io)->aqx::netio::task {
            // 客户端只有需要自动重连,才放在循环里处理
            for (;;) {
                auto s = co_await io.connect("127.0.0.1:55554");
                if (!s) {
                    co_await io.close(s);
                    continue;
                }
    
                for (;;) {
                    auto buf = co_await io.recv(s);
                    if (!buf.length()) {
                        break;
                    }
                    puts(buf.data());
                    buf.clear();
                }
    
                co_await io.close(s);
            }
    
        });
        
        std::string str;
        for (;;) {
            std::cin >> str;
            if (str == "exit")
                break;
    
            std::string sd = "sock1:";
            sd += str;
            sock1.safe_send(sd.data(), (int)sd.length() + 1);
    
            sd = "sock2:";
            sd += str;
            sock2.safe_send(sd.data(), (int)sd.length() + 1);
        }
    
        nio.stop();
        nio.release();
    }

    我还是稍微负责一点,既然发现了编译bug,还是跟踪一下吧。

    如果 recv_awaitable::await_ready()是inline时,流程态remuse切换到 协程态 时,会经过以下流程
    00007FF723AF6000 mov r11,rsp
    00007FF723AF6003 mov qword ptr [r11+10h],rbx
    00007FF723AF6007 mov qword ptr [r11+18h],rsi
    00007FF723AF600B mov qword ptr [r11+20h],rdi
    00007FF723AF600F mov qword ptr [r11+8],rcx
    00007FF723AF6013 push r12
    00007FF723AF6015 push r14
    00007FF723AF6017 push r15
    00007FF723AF6019 sub rsp,90h
    00007FF723AF6020 mov rax,qword ptr [__security_cookie (07FF723AFA008h)]
    00007FF723AF6027 xor rax,rsp
    00007FF723AF602A mov qword ptr [rsp+80h],rax
    00007FF723AF6032 mov rdi,rcx
    00007FF723AF6035 mov qword ptr [rsp+50h],rcx
    00007FF723AF603A movzx eax,word ptr [rdi+2Ch]
    00007FF723AF603E mov word ptr [rsp+48h],ax
    00007FF723AF6043 inc ax
    00007FF723AF6046 cmp ax,0Ah
    00007FF723AF604A ja `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+463h (07FF723AF6463h)
    00007FF723AF6050 movsx rax,ax
    00007FF723AF6054 lea rdx,[__ImageBase (07FF723AF0000h)]
    00007FF723AF605B mov ecx,dword ptr [rdx+rax*4+6494h]
    00007FF723AF6062 add rcx,rdx
    00007FF723AF6065 jmp rcx
    00007FF723AF6067 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
    00007FF723AF6069 xor r15d,r15d
    00007FF723AF606C mov dword ptr [rdi+1B0h],r15d
    00007FF723AF6073 mov r12d,10000h
    00007FF723AF6079 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2E0h (07FF723AF62E0h)
    00007FF723AF607E jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
    00007FF723AF6080 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
    }
    }, aqx::netio::listener("0.0.0.0:55554", 100, 100));
    00007FF723AF6082 cmp word ptr [rdi+0Ah],0
    00007FF723AF6087 je `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
    00007FF723AF608D mov edx,1B4h
    00007FF723AF6092 mov rcx,rdi
    00007FF723AF6095 call operator delete (07FF723AF5504h)
    00007FF723AF609A jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
    00007FF723AF609F xor r15d,r15d
    00007FF723AF60A2 mov r12d,10000h
    00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr] ******************************************** 在这里获取了__coro_frame_ptr.__resume_address
    00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
    00007FF723AF60B2 xor r15d,r15d
    00007FF723AF60B5 mov r12d,10000h
    00007FF723AF60BB jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2C4h (07FF723AF62C4h)
    00007FF723AF60C0 xor r15d,r15d
    00007FF723AF60C3 mov r12d,10000h

    ---------------------------------------------------------------------------------------------------

    00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr]
    它直接拷贝了协程帧结构下 offset=0的__resume_address

    00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
    紧接着直接跳转过去,就将rdx当作recv_awaitable去进行操作

    ---------------------------------------------------------------------------------------------------
    这个问题我敢肯定100%是编译器bug,导致这个问题的原因,一定不是简简单单的内联因素,绝对会有更深层次的编译逻辑导致此bug,但那是微软的问题。

  • 相关阅读:
    kafka 以windows服务的方式在windows下安装并自启动
    Zookeeper以Windows服务安装运行
    SpringMVC统一转换null值为空字符串的方法
    Java 常见异常种类
    svnkit https 忽略证书认证
    Java svnkit check update commit
    替换句子中的多个不同的词—— python 实现
    word2vec 构建中文词向量
    Eureka 源码编译安装部署
    面试总结
  • 原文地址:https://www.cnblogs.com/babypapa/p/15638498.html
Copyright © 2011-2022 走看看