VC支持协程已经有一段时间了,之前一直想不明白协程的意义在哪里,前几天拉屎的时候突然灵光一闪:
以下是伪代码:
task server() { for (;;) { sock_context s = co_await io.accept(); for (;;) { auto buf = co_await io.recv(s); if (!buf.length()) break; std::cout << buf.data() << std::endl; int n = co_await io.send(s, "收到!", strlen("收到!") + 1); } co_await io.close(s); }}
如果把IO库对外的接口做成上面这样,那岂不是看起来和最简单的阻塞模型相同的代码结构,但它的内在其实是异步的,用单线程相同的代码就能支撑一堆连接通信。
所以才有了接下来的研究(闲出屁才研究的),好在研究出成品了。
最终我也明白协程的意义了:
协程化的库越多,C++程序员的门槛会越低,做上层开发的程序员可以不用知道协程的细节,只要知道如何正确使用库即可。
好了,真正介绍协程细节的文章有一大堆,不用我来写,我直接放代码,有兴趣的可以参考我的实现以及那些细节文章自己做:
2021/12/23:我最近使用了一个边缘应用试毒了这个库,一系列修修补补过后,还是很好用的。
2021/12/23:备注:最好不要用lambda函数作为协程函数,它可能会异常,也可能不会,这属于编译器bug带来的玄学。
#pragma once#include <WinSock2.h>#include <MSWSock.h>#include <ws2tcpip.h>#pragma comment(lib, "ws2_32.lib")#include <coroutine>#include <string>#include <functional>#include <thread>#include "logger.hpp"#include <random>/*** 最近花了点时间学习了一下C++20协程,初步改造实现了IOCP协程化的网络IO库* 此前基于回调分发的机制,由于上层协议解析所需的各种上下文,导致这个库是模板化的,* 现在有了协程,上层协议上下文已经可以在协程函数中实现,消除了模板化,也变得易于维护了一丢丢。* 但目前协程还有多少坑是未知的,是好是坏还得再看。* 使用协程,就意味着,这个库几乎完全失去了多线程的能力,* 要维护好一个内部是多线程,外皮是协程的IO库,我承认我没那个脑子。* 我个人当前的状态是不考虑过度设计,只追求上层代码优雅简洁,10几万并发对我而言已经满足了。* 如果这还不够用,那就意味着该放弃协程了,协程不是完全没有损耗的,根据我的测试,协程相比回调函数分发的方式,有15%左右的性能损耗。*/#pragma warning(push)#pragma warning(disable:4996)namespace aqx{ static int init_winsock() { WSADATA wd; return WSAStartup(MAKEWORD(2, 2), &wd); } static aqx::log nlog;#ifndef _nf#define _nf ((size_t)-1)#endif#ifndef __AQX_TIME_HPP#define __AQX_NOW_FUNC using clock64_t = long long; template<typename period = std::milli> clock64_t now() { const clock64_t _Freq = _Query_perf_frequency(); const clock64_t _Ctr = _Query_perf_counter(); const clock64_t _Whole = (_Ctr / _Freq) * period::den; const clock64_t _Part = (_Ctr % _Freq) * period::den / _Freq; return _Whole + _Part; }#endif /** * 操作码与状态码定义 */ struct net_status { static constexpr unsigned int s_accept = 0x01; static constexpr unsigned int s_connect = 0x02; static constexpr unsigned int s_read = 0x04; static constexpr unsigned int s_write = 0x08; static constexpr unsigned int s_close = 0x10; static constexpr unsigned int s_exec = 0x20; static constexpr unsigned int t_activated = 0x40; static constexpr unsigned int t_acceptor = 0x0100; static constexpr unsigned int t_connector = 0x0200; static constexpr unsigned int t_await_undo = 0x0400; static constexpr unsigned int t_await_accept = 0x010000; static constexpr unsigned int t_await_connect = 0x020000; static constexpr unsigned int t_await_read = 0x040000; static constexpr unsigned int t_await_write = 0x080000; static constexpr unsigned int t_await_close = 0x100000; static constexpr unsigned int t_await = 0xFF0000; }; /** net_base 主要负责衔接操作系统 * 不考虑过度设计,写得比较辣鸡,能用就行。 */ class net_base { public: net_base() { fd = INVALID_SOCKET; hIocp = NULL; AcceptEx = NULL; ConnectEx = NULL; DisconnectEx = NULL; StreamCapacity = 1440; Timeout = 0; DataBacklog = 0; WorkerThreadId = 0; } static bool sockaddr_from_string(sockaddr_in& _Addr, const std::string& _Dest) { _Addr.sin_addr.S_un.S_addr = INADDR_NONE; size_t pos = _Dest.find(":"); if(pos == _nf) { nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data()); return false; } auto strip = _Dest.substr(0, pos); auto strport = _Dest.substr(pos + 1); strport.erase(strport.find_last_not_of("\r\n\t ") + 1); strport.erase(0, strport.find_first_not_of("\r\n\t ")); unsigned short port = (unsigned short)atoi(strport.c_str()); if (!port) { nlog("%s->目标端口号错误:(%s)\n", __FUNCTION__, _Dest.data()); return false; } strip.erase(strip.find_last_not_of("\r\n\t ") + 1); strip.erase(0, strip.find_first_not_of("\r\n\t ")); auto it = std::find_if(strip.begin(), strip.end(), [](char c)->bool { return ((c < '0' || c > '9') && (c != '.')); }); _Addr.sin_family = AF_INET; _Addr.sin_port = htons(port); if (it != strip.end()) { hostent* host = gethostbyname(strip.c_str()); if (!host) { nlog("%s->错误的目标域名:(%s)\n", __FUNCTION__, _Dest.data()); return false; } _Addr.sin_addr = *(in_addr*)(host->h_addr_list[0]); } else { _Addr.sin_addr.S_un.S_addr = inet_addr(strip.c_str()); } if (_Addr.sin_addr.S_un.S_addr == INADDR_NONE) { nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data()); return false; } return true; } static void sockaddr_any(sockaddr_in& _Addr, unsigned short _Port) { _Addr.sin_family = AF_INET; _Addr.sin_port = htons(_Port); _Addr.sin_addr.S_un.S_addr = INADDR_ANY; } static void sockaddr_local(sockaddr_in& _Addr, unsigned short _Port) { _Addr.sin_family = AF_INET; _Addr.sin_port = htons(_Port); _Addr.sin_addr.S_un.S_addr = INADDR_LOOPBACK; } static void* getmswsfunc(SOCKET s, GUID guid) { DWORD dwBytes; void* lpResult = nullptr; WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &lpResult, sizeof(lpResult), &dwBytes, NULL, NULL); return lpResult; } static std::string sockaddr_to_string(const sockaddr_in &_Addr) { char buf[256]; sprintf(buf, "%d.%d.%d.%d:%d", _Addr.sin_addr.S_un.S_un_b.s_b1, _Addr.sin_addr.S_un.S_un_b.s_b2, _Addr.sin_addr.S_un.S_un_b.s_b3, _Addr.sin_addr.S_un.S_un_b.s_b4, htons(_Addr.sin_port)); std::string _Result = buf; return _Result; } private: int init(int _StreamCapacity, int _DataBacklog, int _Timeout) { if (fd != INVALID_SOCKET) { return 0; } auto reterr = [this](int n) { if (fd != INVALID_SOCKET) { closesocket(fd); fd = INVALID_SOCKET; } return n; }; StreamCapacity = _StreamCapacity; Timeout = _Timeout; if (Timeout < 0) { nlog("%s->Timeout必须>=0", __FUNCTION__); return reterr(-1); } DataBacklog = _DataBacklog; fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (fd == INVALID_SOCKET) { nlog("%s->创建套接字失败:%d", __FUNCTION__, WSAGetLastError()); return reterr(-1); } ConnectEx = (LPFN_CONNECTEX)getmswsfunc(fd, WSAID_CONNECTEX); if (!ConnectEx) { nlog("%s->获取 ConnectEx 地址失败,错误号:%d", __FUNCTION__, WSAGetLastError()); return reterr(-2); } AcceptEx = (LPFN_ACCEPTEX)getmswsfunc(fd, WSAID_ACCEPTEX); if (!AcceptEx) { nlog("%s->获取 AcceptEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError()); return reterr(-3); } // 我已经不止一次做过DisconnectEx的测试,最终结论都是DisconnectEx并不能提高并发连接数。 // DisconnectEx 在想象中会更快是因为用IOCP队列锁去换系统全局锁带来了性能提升。 // 还有一种方法是开一个线程搞个表去阻塞调用DisconnectEx,完事之后直接AcceptEx,也就最终把全局内核锁完全转嫁成你自己的锁了。 // DisconnectEx首先是不同的操作系统行为不一致,真正保险的做法只能在对方关闭连接时,调用DisconnectEx来复用。 // 对于IOCP来说,也就是在WSARecv或者WSASend 从 GetQueuedCompletionStatus 返回之后,第2个参数transferred == 0时 // 同时它受到TCP TIME_WAIT状态的影响 // 系统存在大量TIME_WAIT套接字时,最终得到的效果是,用了更多内存,去换来了更少的并发连接数。 /*DisconnectEx = (LPFN_DISCONNECTEX)getmswsfunc(fd, WSAID_DISCONNECTEX); if (!DisconnectEx) { nlog("%s->获取 DisconnectEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError()); return reterr(-4); }*/ hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (!hIocp) { nlog("%s->创建完成端口失败,错误号:%d", __FUNCTION__, GetLastError()); return reterr(-5); } CreateIoCompletionPort((HANDLE)fd, hIocp, 0, 0); return 0; } void close() { if (fd != INVALID_SOCKET) { closesocket(fd); fd = INVALID_SOCKET; } if (hIocp) { CloseHandle(hIocp); hIocp = NULL; } } BOOL Accept(SOCKET s, char* _Data, LPOVERLAPPED _Overlapped) { DWORD _Received = 0; return AcceptEx(fd, s, _Data, 0, sizeof(SOCKADDR_IN) << 1, sizeof(SOCKADDR_IN) << 1, &_Received, _Overlapped); } BOOL Connect(SOCKET s, sockaddr* _Addr, int _AddrLen, LPOVERLAPPED _Overlapped) { DWORD _Sent = 0; return ConnectEx(s, _Addr, _AddrLen, nullptr, 0, &_Sent, _Overlapped); } /*BOOL Disconnect(SOCKET s, LPOVERLAPPED _Overlapped) { return DisconnectEx(s, _Overlapped, TF_REUSE_SOCKET, 0); }*/ /* 使用了C++11的条件变量与互斥锁实现了同步消息来保证多线程安全IO处理,本质上只是多线程Output * 因为完成端口未实现同步消息机制,所以这种操作无论如何都至少要涉及到两个锁(一个IOCP锁,一个其他锁): * 1、采用动态new delete,这种方式最坏的情况要经过那把系统全局的大锁,不可取。 * 2、采用一个我们自己的锁对象,当前使用的方式。 * 3、每个套接字上下文拥有一个独立的锁对象,总觉得在这种了不起就才10几万并发IO的场景,锁竞争带来的性能损耗不该发展到这一步。 */ int SafeIOMessage(DWORD dwNumberOfBytesTransferred, ULONG_PTR dwCompletionKey) { std::unique_lock<std::mutex> lock(safeIO.mtx); safeIO.cv.wait(lock, [this]() { return (safeIO.s & 1); }); if (safeIO.s == -1) return -1; safeIO.s = 0; PostQueuedCompletionStatus(hIocp, dwNumberOfBytesTransferred, dwCompletionKey, 0); safeIO.cv.wait(lock, [this]() { return (safeIO.s & 3); }); if (safeIO.s == -1) return -1; int _Result = safeIO.result; safeIO.s = 1; safeIO.cv.notify_all(); return _Result; } void InitSafeIO() { std::lock_guard<std::mutex> lg(safeIO.mtx); safeIO.s = 1; } void ExitSafeIO() { std::lock_guard<std::mutex> lg(safeIO.mtx); safeIO.s = -1; safeIO.cv.notify_all(); } void SafeIOResult(int _Result) { // 理论上来说,IOCP工作者线程不需要在此处加锁,实际情况未知,我个人是以悲观的态度对待这个问题 std::lock_guard<std::mutex> lg(safeIO.mtx); safeIO.result = _Result; safeIO.s = 2; safeIO.cv.notify_all(); } private: friend class sock; friend class netio; friend class coio; SOCKET fd; HANDLE hIocp; LPFN_ACCEPTEX AcceptEx; LPFN_CONNECTEX ConnectEx; LPFN_DISCONNECTEX DisconnectEx; int StreamCapacity; int Timeout; int DataBacklog; DWORD WorkerThreadId; struct safeio_send_struct { sock* s; void* buf; int len; }; struct SAFEIO { std::mutex mtx; std::condition_variable cv; int s = -1; int result = 0; }safeIO; }; /*直接继承一个std::string来作为套接字的各种缓冲区*/ class sock_buffer : public std::string { public: using _Basetype = std::string; using _Basetype::_Basetype; void preset_length(size_t _Length) { // 直接在二进制层面去搞VC的std::string结构,修改std::string::length()的返回值 // 这么做的好处是,免去了std::string::resize()的拷贝问题。 // 注意这段代码仅适用于VC,G++的std::string结构和VC不一样。 struct __stlstr { const char str[0x10]; size_t len; }; if (this->capacity() < _Length) this->reserve(_Length); ((__stlstr*)this)->len = _Length; } }; /** * 协程task */ template<typename _Ty> struct net_task_t { struct promise_type; using _Hty = std::coroutine_handle<promise_type>; struct promise_type { net_task_t get_return_object() { return { _Hty::from_promise(*this) }; } // initial_suspend 里返回return std::suspend_always{};表示协程初始化成功之后就挂起 // 这里就挂起,是为了给set_sock留出操作的时间,否则一个空函数协程,会在创建完之后直接就销毁。 auto initial_suspend() { return std::suspend_always{}; } auto final_suspend() noexcept { s->on_destroy_coroutine(); return std::suspend_never{}; } void unhandled_exception() { std::terminate(); } void return_void() { } _Ty* s = nullptr; }; _Hty _Handle; void resume() { _Handle.resume(); } void destroy() { _Handle.destroy(); } void set_sock(_Ty* _s) { _Handle.promise().s = _s; } }; /**套接字上下文*/ class sock { // 这是扩展OVERLAPPED结构 struct binding { OVERLAPPED ol; int opt; sock* s; }; /** * 返回给协程recv的对象类型 */ class sock_data { sock_data(sock* _s) : s(_s) {} public: char* data() { return s->ibuf.data(); } void erase(size_t _Count) { s->ibuf.erase(0, _Count); } size_t length() { return s->ibuf.length(); } void clear() { s->ibuf.clear(); } private: friend class sock; sock* s; }; /**返回给协程connect和accept的对象类型 * 用于异步send与close, * 其他线程也可以利用这个对象通信,已经处理了线程安全问题,但不太效率,因为使用了全局锁。 */ class asyncsock { public: /** * send 是未加锁的发送数据 * 没有多线程需求时,send是安全的 */ int send(void* data, int len) { if (s->v->WorkerThreadId != GetCurrentThreadId()) { return s->safe_send(data, len); } else { return s->send(data, len); } } int send(const void* data, int len) { if (s->v->WorkerThreadId != GetCurrentThreadId()) { return s->safe_send(data, len); } else { return s->send(data, len); } } void close() { if (s->v->WorkerThreadId != GetCurrentThreadId()) { s->safe_close(); } else { s->close(); } } bool isactivated() { return s->isactivated(); } operator bool() { return (s != nullptr); } sockaddr_in& getsockaddr() { return s->getsockaddr(); } // 响应超时,这是用来给客户端发送心跳包的 // 心跳机制是基于操作系统函数 RegisterWaitForSingleObject实现的 // 会基于netio::init传入的Timeout参数的2/3的频率发送消息 // 也就是说,Timeout并不是一个绝对准确的数值,这就是为了要给客户端留出发心跳包的切入点的代价。 // 例如Timeout设置为6000, 真正超时的客户端,将会再4000-8000ms后被检查出来 void ontimeout(void(*proc)(asyncsock)) { if (!s) return; s->ontimeout = proc; } private: bool operator<(const asyncsock& as) const{ return (size_t)s < (size_t)as.s; } friend typename std::less<asyncsock>; private: friend class netio; friend class coio; friend class sock; sock* s = nullptr; }; struct recv_awaitable { recv_awaitable(sock* s) : data(s) { } // 当编译器自动将await_ready以及await_suspend优化为inline时,协程态引发异常 // 使await_ready强制noline时,没有异常。 __declspec(noinline) bool await_ready() { // 我当前的vs版本是: vs 2022 17.0.1 // 这里发现一个编译bug,只要await_ready与await_suspend同时被inline优化 // 最后从流程态切换回协程态时,会获取 __coro_frame_ptr.__resume_address 做为recv_awaitable对象来使用 // 紧接着就会引发异常 // 最终我发现,目前vc的协程与lambda函数之间存在bug, // 使用lambda作为协程函数时,如果此lambda函数inline,就可能会有各种指针错误。 // 我已向vs社区报告过此问题,得到的答复时考虑中,也不知道何时修复。 if (data.s->st & net_status::t_await_undo) { data.s->ibuf.clear(); data.s->st &= (~net_status::t_await_undo); return true; } return false; } void await_suspend(std::coroutine_handle<> handle) { } sock_data await_resume() const { return data; } sock_data data; }; struct sock_awaitable { sock_awaitable(sock* _s) { s.s = _s; } __declspec(noinline) bool await_ready() { if (s.s->st & net_status::t_await_undo) { s.s->st &= (~net_status::t_await_undo); return true; } return false; } void await_suspend(std::coroutine_handle<> handle) { } sock::asyncsock await_resume() { return s; } sock::asyncsock s; }; struct close_awaitable { close_awaitable(bool _IsSuspend) : IsSuspend(_IsSuspend) { } __declspec(noinline) bool await_ready() { return (IsSuspend == false); } void await_suspend(std::coroutine_handle<> handle) { } void await_resume() { } bool IsSuspend; }; struct send_awaitable { send_awaitable(sock* _s) : s(_s) {} __declspec(noinline) bool await_ready() { if (s->st & net_status::t_await_undo) { s->st &= (~net_status::t_await_undo); return true; } return false; } void await_suspend(std::coroutine_handle<> handle) { } int await_resume() { return s->syncsendlen; } sock* s; }; public: using opcode = net_status; sock(net_base* _v) { fd = INVALID_SOCKET; v = _v; st = 0; ontimeout = nullptr; memset(&input.ol, 0, sizeof(input.ol)); memset(&output.ol, 0, sizeof(output.ol)); if (v->Timeout) output.ol.hEvent = input.ol.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); else output.ol.hEvent = input.ol.hEvent = NULL; output.s = input.s = this; output.opt = opcode::s_write; ibuf.reserve(v->StreamCapacity); obuf.reserve(v->StreamCapacity); } ~sock() { close(); if (!output.ol.hEvent) return; CloseHandle(output.ol.hEvent); output.ol.hEvent = output.ol.hEvent = NULL; if (st & opcode::t_await) co.destroy(); } void on_destroy_coroutine() { close(); st &= (~opcode::t_connector); } bool isactivated() { return ((st & opcode::t_activated) != 0); } int send(void* data, int len) { if (!len) return len; int n = (int)(obuf.capacity() - obuf.length()); if (n >= len && !obacklog.length()) { obuf.append((char*)data, len); } else { if (v->DataBacklog != 0 && obacklog.length() + len > v->DataBacklog) { //积压值超过限制 close(); return -1; } obacklog.append((char*)data, len); } return (write() == 0) ? len : -1; } int send(const void* data, int len) { return send((void*)data, len); } int safe_send(void* data, int len) { net_base::safeio_send_struct param = { this, data, len }; return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)¶m); } int safe_send(const void* data, int len) { net_base::safeio_send_struct param = { this, (void*)data, len }; return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)¶m); } int safe_close() { return v->SafeIOMessage(opcode::s_close, (ULONG_PTR)this); } void close() { if (INVALID_SOCKET == fd) return; ontimeout = nullptr; closesocket(fd); fd = INVALID_SOCKET; st &= ~opcode::t_activated; st |= opcode::s_close; set_timer(false); ibuf.clear(); if (obacklog.capacity() <= 0x0F) return; sock_buffer tmp; obacklog.swap(tmp); } sockaddr_in& getsockaddr() { return sa; } private: int initfd() { if (INVALID_SOCKET != fd) { return 0; } fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == fd) { nlog("%s->创建套接字失败,错误号:%d", __FUNCTION__, WSAGetLastError()); return -1; } LINGER linger = { 1, 0 }; setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); int b = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&b, sizeof(b)); CreateIoCompletionPort((HANDLE)fd, v->hIocp, 0, 0); return 0; } int bindlocal() { sockaddr_in local; local.sin_family = AF_INET; local.sin_addr.S_un.S_addr = INADDR_ANY; local.sin_port = 0; if (SOCKET_ERROR == bind(fd, (LPSOCKADDR)&local, sizeof(local))) { nlog("%s->绑定本地端口失败,错误号:%d", __FUNCTION__, WSAGetLastError()); return -1; } return 0; } bool set_dest(const std::string& _Dest) { return net_base::sockaddr_from_string(sa, _Dest); } void set_timer(bool _Enable) { if (_Enable) { if (hTimer) return; RegisterWaitForSingleObject(&hTimer, output.ol.hEvent, [](void* Param, BOOLEAN TimerOrWaitFired) { if (!TimerOrWaitFired) return; sock* p = (sock*)Param; PostQueuedCompletionStatus(p->v->hIocp, 0, (ULONG_PTR)p, nullptr); }, this, (ULONG)v->Timeout * 2 / 3, WT_EXECUTEDEFAULT); } else { if (!hTimer) return; std::ignore = UnregisterWaitEx(hTimer, NULL); hTimer = NULL; } } int nat() { sockaddr_in _Addr; int _AddrLen = sizeof(_Addr); if (-1 == getsockname(fd, (sockaddr*)&_Addr, &_AddrLen)) return -1; SOCKET fdNat = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); LINGER linger = { 1, 0 }; setsockopt(fdNat, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); CreateIoCompletionPort((HANDLE)fdNat, v->hIocp, 0, 0); if (-1 == bind(fdNat, (sockaddr*)&_Addr, sizeof(_Addr))) { closesocket(fdNat); return -1; } close(); fd = fdNat; return connect(); } int accept() { if (((st & 0xFF) | opcode::s_close) != opcode::s_close) { nlog("%s->当前套接字未断开连接!", __FUNCTION__); return -1; } if (initfd()) return -1; DWORD _Received = 0; input.opt = opcode::s_accept; st &= (~opcode::s_close); st |= opcode::s_accept; if (!v->Accept(fd, ibuf.data(), &input.ol)) { int _Error = WSAGetLastError(); if (_Error != ERROR_IO_PENDING) { st &= (~opcode::s_accept); nlog("%s->AcceptEx失败, 错误号:", __FUNCTION__, WSAGetLastError()); return -1; } } return 0; } int connect() { if (((st & 0xFF) | opcode::s_close) != opcode::s_close) { nlog("%s->当前套接字未断开连接!", __FUNCTION__); return -1; } if (INVALID_SOCKET == fd) { if (initfd()) return -1; if (bindlocal()) return -1; } input.opt = opcode::s_connect; st &= (~opcode::s_close); st |= opcode::s_connect; if (!v->Connect(fd, (sockaddr*)&sa, sizeof(sa), &input.ol)) { int _Error = WSAGetLastError(); if (_Error != ERROR_IO_PENDING) { nlog("%s->ConnectEx失败, 错误号:", __FUNCTION__, WSAGetLastError()); return -1; } } return 0; } int write() { if (!(st & opcode::t_activated)) { return -1; } if (st & (opcode::s_write | opcode::s_close | opcode::s_accept | opcode::s_connect)) return 0; if (obacklog.size()) { size_t rl = obuf.capacity() - obuf.length(); if (rl > obacklog.length()) rl = obacklog.length(); if (rl) { obuf.append(obacklog.data(), rl); obacklog.erase(0, rl); } } WSABUF buf = { (ULONG)(obuf.length()), obuf.data() }; if (!buf.len) return 0; st |= opcode::s_write; DWORD _Sent = 0; if (SOCKET_ERROR == WSASend(fd, &buf, 1, &_Sent, 0, &(output.ol), NULL)) { int _Error = WSAGetLastError(); if (WSA_IO_PENDING != _Error) { st &= (~opcode::s_write); return -1; } } return 0; } int read() { if (!(st & opcode::t_activated)) { return -1; } if (st & (opcode::s_read | opcode::s_close | opcode::s_accept | opcode::s_connect)) return 0; WSABUF buf = { (ULONG)(ibuf.capacity() - ibuf.length()), ibuf.data() + ibuf.length() }; if ((int)buf.len <= 0) { return -1; } DWORD _Received = 0; DWORD _Flags = 0; st |= opcode::s_read; input.opt = opcode::s_read; if (SOCKET_ERROR == WSARecv(fd, &buf, 1, &_Received, &_Flags, &(input.ol), NULL)) { int _Error = WSAGetLastError(); if (WSA_IO_PENDING != _Error) { st &= ~(opcode::s_read); return -1; } } return 0; } private: friend class coio; friend class netio; SOCKET fd; sockaddr_in sa; net_base* v; int st; binding input, output; sock_buffer ibuf, obuf, obacklog; HANDLE hTimer; aqx::clock64_t rtime; net_task_t<sock> co; void (*ontimeout)(asyncsock); int syncsendlen; }; // coio是传参给协程函数的操作对象 class coio { coio(sock* _s) : s(_s) {} public: using asyncsock = sock::asyncsock; using sock_awaitable = sock::sock_awaitable; using close_awaitable = sock::close_awaitable; using send_awaitable = sock::send_awaitable; using recv_awaitable = sock::recv_awaitable; struct nat_awaitable { nat_awaitable(bool _ret) : ret(_ret) { } __declspec(noinline) bool await_ready() { return (ret == false); } void await_suspend(std::coroutine_handle<> handle) { } bool await_resume() { return ret; } bool ret; }; coio() : s(nullptr) {} sock_awaitable connect(const std::string& _Dest) { if (!s->set_dest(_Dest)) { // 设置目标地址失败时,撤销等待。 s->st |= net_status::t_await_undo; return sock_awaitable(s); } // 我使用的协程initial_suspend中是不挂起的, // 所以一个套接字的首次connect操作基本都是由其他线程引发的 // 而且很可能在await_suspend之前,IOCP队列就已经完成 if (GetCurrentThreadId() == s->v->WorkerThreadId) { if (s->connect()) { // 连接失败时,撤销等待。 s->st |= net_status::t_await_undo; return sock_awaitable(s); } } else { // 因此,不是IOCP队列线程引发的connect就发送到IOCP队列去处理 PostQueuedCompletionStatus(s->v->hIocp, net_status::s_connect, (ULONG_PTR)s, 0); } s->st |= net_status::t_await_connect; return sock_awaitable(s); } sock_awaitable accept() { // 首次accept虽然也是其他线程调用的(一般是main线程) // 但首次accept时,IOCP工作线程尚未启动,因此可以无视掉connect的那个问题。 s->st |= ((!s->accept()) ? net_status::t_await_accept : net_status::t_await_undo); return sock_awaitable(s); } /** * 以下几个成员函数中的参数asyncsock _s应该等同于私有成员s,除非强行在外部使用syncio对象 * 使用参数而不是私有成员的原因是防止在尚未连接前调用IO操作。 * 私有成员s将专用于accept与connect */ close_awaitable close(asyncsock _s) { _s.s->close(); if ((_s.s->st & 0xFF) == net_status::s_close) { // 如果套接字上已经没有任何IO事件,就让awaitable直接唤醒协程 // 通常这才是正常状态,但如果有其他线程异步send时,可能就会有未决IO存在了。 return close_awaitable(false); } _s.s->st |= net_status::t_await_close; return close_awaitable(true); } send_awaitable send(asyncsock _s, void *buf, int len) { _s.s->syncsendlen = _s.send(buf, len); _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo); return sock::send_awaitable(_s.s); } send_awaitable send(asyncsock _s, const void* buf, int len) { _s.s->syncsendlen = _s.send(buf, len); _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo); return sock::send_awaitable(_s.s); } recv_awaitable recv(asyncsock _s) { int n = _s.s->read(); if (n < 0) { _s.s->st |= net_status::t_await_undo; } else { _s.s->st |= net_status::t_await_read; } return recv_awaitable(_s.s); } nat_awaitable nat(asyncsock _s, const std::string& _Dest) { if ((_s.s->st & 0xFF) != net_status::t_activated) { // nat之前必须保证所有未决IO都已经返回,与打洞服务器保持正常连接状态,否则就是失败。 // 到这里失败时,依旧与打洞服务器保持着正常连接。 return nat_awaitable(false); } sockaddr_in sa = _s.s->sa; if (!_s.s->set_dest(_Dest)) { // 设置目标地址失败 // 到这里失败时,依旧与打洞服务器保持着正常连接。 _s.s->sa = sa; return nat_awaitable(false); } if (_s.s->nat()) { // 到这一步失败时,与打洞服务器的连接就有可能会断掉 // nat失败时,本就应该直接close(); // 都失败了,我想不出还要跟打洞服务器继续苟合的理由。 // 如果所有状态全都对,还失败,可能就是双方正好属于无法穿透的NAT类型环境下。 // 我对此研究不多,业界内真正懂行的也不多,资料更是少得可怜,我只知道TCP NAT在代码上的表现为: // 1、与打洞服务器保持连接的这个套接字设置了SO_REUSEADDR,确保这个套接字绑定的本地端口可复用。 // 在这个库里我全都设置了可复用,但主要目的是为了缓解TIME_WAIT,并不是为了穿透。 // 2、双方通过打洞服务器沟通好各自的远端地址 // 3、双方都创建一个新的套接字,并将该套接字绑定到本地与打洞服务器进行连接的那个地址(getsockname可以获得) // 到第 3 步处理好之后,与打洞服务器连接的那个套接字,已经废了,无法再进行通信,此时应该把它close掉。 // 4、最后双方都connect对方的地址。 _s.s->sa = sa; return nat_awaitable(false); } s->st |= net_status::t_await_connect; return nat_awaitable(true); } bool valid() { return (s != nullptr); } operator bool () { return valid(); } private: friend class netio; sock* s; }; /** * 可以简单把netio看成是一个容器的作用 * 它主要用于对接net_base,创建线程,处理IO事件。 */ class netio { struct IOCP_STATUS { DWORD transferred; SIZE_T key; typename sock::binding* pb; BOOL ok; }; public: /**listener 只是一种简单的参数包装,只是为了方便构造而已 * 构造参数: * _Dest 要监听的地址和端口,格式为:"a.b.c.d:port" * _ListenBacklog 系统函数listen的第2个参数 * _MaxClients 最多同时接受的客户端数量 */ class listener { public: listener() { max_clients = 0; listen_backlog = 0; addr.sin_addr.S_un.S_addr = INADDR_NONE; } listener(const std::string& _Dest, int _ListenBacklog, size_t _MaxClients) { max_clients = _MaxClients; listen_backlog = _ListenBacklog; net_base::sockaddr_from_string(addr, _Dest); } private: friend class netio; sockaddr_in addr; int listen_backlog; size_t max_clients; }; using asyncsock = sock::asyncsock; using sock_data = sock::sock_data; using opcode = net_status; using task = net_task_t<sock>; int init(int _StreamCapacity = 1440, int _DataBacklog = 0, int _Timeout = 0) { std::lock_guard<std::mutex> lg(mtx); return nwb.init(_StreamCapacity, _DataBacklog, _Timeout); } int server(const std::function<task(coio)> &_func, const listener ¶m) { std::lock_guard<std::mutex> lg(mtx); if (thd.joinable()) { nlog("%s->netio已启动, 请勿重复调用!", __FUNCTION__); return 0; } if (nwb.fd == INVALID_SOCKET) return -1; cofunc = _func; if (param.addr.sin_addr.S_un.S_addr != INADDR_NONE) { if (SOCKET_ERROR == bind(nwb.fd, (SOCKADDR*)¶m.addr, sizeof(SOCKADDR))) { nlog("%s->绑定端口失败,错误号:%d", __FUNCTION__, WSAGetLastError()); nwb.close(); return -1; } if (SOCKET_ERROR == ::listen(nwb.fd, param.listen_backlog)) { nlog("%s->监听失败,错误号:%d", __FUNCTION__, WSAGetLastError()); nwb.close(); return -1; } for (int i = 0; i < param.max_clients; i++) { sock* psock = new sock(&nwb); a_list.push_back(psock); psock->st |= opcode::t_acceptor; psock->co = cofunc(coio(psock)); psock->co.set_sock(psock); psock->co.resume(); } } __start(); return 0; } // client是一次性的,专用于客户端 // 让它返回asyncsock对象的理由是为了给脚本语言预留的 // 例如可以使用lua去实现类似node.js的那种connect之后不管连没连上就先得到对象去绑定事件的机制。 asyncsock client(const std::function<task(coio)>& _func) { std::lock_guard<std::mutex> lg(mtx); coio io; asyncsock ret; if (!thd.joinable()) { // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的 if (nwb.fd == INVALID_SOCKET) return ret; __start(); } io.s = get_connector(); ret.s = io.s; io.s->co = _func(io); io.s->co.set_sock(io.s); io.s->co.resume(); return ret; } void exec(const std::function<void()>& _func) { if (!thd.joinable()) { // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的 if (nwb.fd == INVALID_SOCKET) return; __start(); } nwb.SafeIOMessage(opcode::s_exec, (ULONG_PTR)&_func); } void stop() { std::lock_guard<std::mutex> lg(mtx); if (thd.joinable()) { PostQueuedCompletionStatus(nwb.hIocp, -1, 0, 0); thd.join(); } } void release() { std::lock_guard<std::mutex> lg(mtx); if (thd.joinable()) { nlog("%s->nio正在运行,请先stop", __FUNCTION__); return; } for (auto p : a_list) { delete p; } a_list.clear(); for (auto p : c_list) { delete p; } c_list.clear(); nwb.close(); } private: sock* get_connector() { sock* psock = nullptr; for (auto v : c_list) { if ((v->st & opcode::t_connector) == 0 && ((v->st & 0xFF)| opcode::s_close) == opcode::s_close) { psock = v; break; } } if (!psock) { psock = new sock(&nwb); c_list.push_back(psock); } psock->st |= opcode::t_connector; return psock; } void on_connect(sock& s) { s.ibuf.clear(); s.obuf.clear(); s.obacklog.clear(); s.rtime = aqx::now(); if (nwb.Timeout != 0) s.set_timer(true); s.st |= opcode::t_activated; } void on_accept(sock &s) { // 懒得去调用GetAcceptExSockAddrs,有硬编码可用#ifndef _WIN64 s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x26);#else s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x20);#endif on_connect(s); } bool on_resume(sock& s) { if (s.st & opcode::t_await) { // 清除所有协程等待标志 s.st &= (~opcode::t_await); // 唤醒协程 s.co.resume(); return true; } return false; } void on_close(sock& s) { if ((s.st & 0xFF) == opcode::s_close) { s.st &= ~opcode::s_close; on_resume(s); } } bool error_resume(sock &s) { int st = s.st & opcode::t_await; switch (st) { case opcode::t_await_accept: case opcode::t_await_connect: case opcode::t_await_close: s.st &= (~opcode::t_await); s.co.resume(); return true; case opcode::t_await_read: s.ibuf.clear(); s.st &= (~opcode::t_await); s.co.resume(); return true; case opcode::t_await_write: s.syncsendlen = -1; s.st &= (~opcode::t_await); s.co.resume(); return true; default: break; } return false; } void on_reset(sock &s) { if ((s.st & 0xFF) == opcode::s_close) { s.st &= ~opcode::s_close; if (s.st & opcode::t_acceptor) { // 如果服务端协程不在一个循环里,协程返回自动销毁后就会这样 // 此时的挽救措施就是创建一个新的协程 s.co = cofunc(coio(&s)); } } } void on_completion(IOCP_STATUS& st) { sock& s = *(st.pb->s); int op = st.pb->opt; s.st &= (~op); if (s.st & opcode::s_close) op = 0; //nlog("on_completion:%I64X, %d", &s, op); switch (op) { case 0: break; case opcode::s_accept: on_accept(s); break; case opcode::s_connect: if (!st.ok && WSAGetLastError() == 1225) { // 出现这种错误,一般是由于服务端没有在监听指定端口,直接被操作系统拒绝了。 op = 0; break; } on_connect(s); break; case opcode::s_read: if (!st.transferred) { op = 0; break; } s.rtime = aqx::now(); s.ibuf.preset_length(s.ibuf.length() + st.transferred); break; case opcode::s_write: if (!st.transferred) { op = 0; break; } s.rtime = aqx::now(); s.obuf.erase(0, st.transferred); if (s.obuf.length() || s.obacklog.length()) { if (s.write()) { op = 0; break; } } // write操作可能是非协程发起的,协程很可能挂起在recv,因此需要判断一下。 if (!(s.st & opcode::t_await_write)) return; break; } //nlog("on_completion2:%I64X, %d", &s, op); if (!op) { if (error_resume(s)) return; // 只有当协程被销毁时,error_resume才会返回false s.close(); on_reset(s); return; } on_resume(s); if (s.st & opcode::s_close) return on_close(s); } void on_msgtimeout(sock *psock) { if (aqx::now() - psock->rtime >= nwb.Timeout && (psock->st & opcode::t_activated)) { psock->close(); if (error_resume(*psock)) return; on_reset(*psock); return; } if (psock->ontimeout != nullptr) { asyncsock as; as.s = psock; psock->ontimeout(as); } } void on_msgconnect(sock* psock) { if (psock->connect()) { psock->close(); if (error_resume(*psock)) return; on_reset(*psock); } } void on_msgwrite(net_base::safeio_send_struct* pss) { nwb.SafeIOResult(pss->s->send(pss->buf, pss->len)); } void on_msgclose(sock* psock) { psock->close(); nwb.SafeIOResult(0); } void __start() { thd = std::thread([this]() { nwb.WorkerThreadId = GetCurrentThreadId(); srand((unsigned int)aqx::now() + nwb.WorkerThreadId); nwb.InitSafeIO(); IOCP_STATUS st = { 0,0,0,0 }; //nlog("netio::worker->I/O工作线程 %d 开始!", nwb.WorkerThreadId); for (;;) { st.ok = GetQueuedCompletionStatus(nwb.hIocp, &(st.transferred), &(st.key), (OVERLAPPED**)&(st.pb), INFINITE); if (!st.pb) { if (st.transferred == -1) break; switch (st.transferred) { case 0: on_msgtimeout((sock*)st.key); break; case opcode::s_connect: on_msgconnect((sock*)st.key); break; case opcode::s_write: on_msgwrite((net_base::safeio_send_struct*)st.key); break; case opcode::s_close: on_msgclose((sock*)st.key); break; case opcode::s_exec: (*((std::function<void()>*)st.key))(); nwb.SafeIOResult(0); break; } continue; } on_completion(st); } nwb.ExitSafeIO(); nwb.WorkerThreadId = 0; //nlog("netio::worker->I/O工作线程 %d 已停止!", nwb.WorkerThreadId); }); } private: net_base nwb; std::list<sock*> a_list; std::list<sock*> c_list; std::function<task(coio)> cofunc; std::thread thd; std::mutex mtx; };}#pragma warning(pop)
这个库我已经去除了各种耦合,除了日志库,aqx::log我自己写的一个简单的格式化日志库:
logger.hpp#pragma once#include <iostream>#include <string>#include <time.h>#include <stdarg.h>#include <mutex>#include <vector>//aqx::log不与aqx其他库耦合#if defined(_WIN32) || defined(_WIN64)#ifndef _WINDOWS_#include <WinSock2.h>#endif#define __aqxlog_getpid GetCurrentProcessId#define __aqxlog_gettid GetCurrentThreadId#include <io.h>#else#if defined(__linux__)#include <unistd.h>#include <sys/syscall.h>#define __aqxlog_getpid getpid#define __aqxlog_gettid() syscall(__NR_gettid)#endif#endif#pragma warning(push)#pragma warning(disable:4996)namespace aqx { class log { private: struct _format_texts { std::string time; std::string type; std::string pid; std::string tid; }; public: static constexpr auto hs_time{ static_cast<int>(1) }; static constexpr auto hs_type{ static_cast<int>(2) }; static constexpr auto hs_pid{ static_cast<int>(4) }; static constexpr auto hs_tid{ static_cast<int>(8) }; log() { _stdout_fp = stdout; fp = stdout; _fmtts = { "%Y/%m/%d %H:%M:%S ", "{%s} ", "[%d] ", "(%d) " }; head_style = log::hs_time; head_presize = _gethps(); _EnableInfo = true; _EnableError = false; _EnableDebug = false; _EnableWarn = false; _DefType = "info"; s.reserve(0x1000); } ~log() { if (fp != _stdout_fp) fclose(fp); } void enable(const std::string_view& _Type, bool _Enable) { std::lock_guard<std::mutex> lg(_Mtx); if (_Type == "info") _EnableInfo = _Enable; else if (_Type == "error") _EnableError = _Enable; else if (_Type == "debug") _EnableDebug = _Enable; else if (_Type == "warn") _EnableWarn = _Enable; } void seths(int hs) { std::lock_guard<std::mutex> lg(_Mtx); head_style = hs; head_presize = _gethps(); } void sethfmt(int _Style, const char* _Fmt) { std::lock_guard<std::mutex> lg(_Mtx); switch (_Style) { case hs_time: _fmtts.time = _Fmt; break; case hs_type: _fmtts.type = _Fmt; break; case hs_pid: _fmtts.pid = _Fmt; break; case hs_tid: _fmtts.tid = _Fmt; break; } head_presize = _gethps(); } bool setvfs(const char* _FileName, bool _PutStdout = false) { std::lock_guard<std::mutex> lg(_Mtx); FILE* _tmp = fopen(_FileName, "ab"); if (!_tmp) return false; if (fp != _stdout_fp) fclose(fp); fp = _tmp; PutStdout = _PutStdout; return true; } log& info(const char* _Fmt, ...) { std::lock_guard<std::mutex> lg(_Mtx); if (!_EnableInfo) return *this; va_list vl; va_start(vl, _Fmt); _build("info", _Fmt, vl); va_end(vl); _putlog(); return *this; } log& debug(const char* _Fmt, ...) { std::lock_guard<std::mutex> lg(_Mtx); if (!_EnableDebug) return *this; va_list vl; va_start(vl, _Fmt); _build("info", _Fmt, vl); va_end(vl); _putlog(); return *this; } log& error(const char* _Fmt, ...) { std::lock_guard<std::mutex> lg(_Mtx); if (!_EnableError) return *this; va_list vl; va_start(vl, _Fmt); _build("info", _Fmt, vl); va_end(vl); _putlog(); return *this; } log& warn(const char* _Fmt, ...) { std::lock_guard<std::mutex> lg(_Mtx); if (!_EnableWarn) return *this; va_list vl; va_start(vl, _Fmt); _build("info", _Fmt, vl); va_end(vl); _putlog(); return *this; } log& operator()(const char* _Fmt, ...) { std::lock_guard<std::mutex> lg(_Mtx); if (!_EnableInfo) return *this; va_list vl; va_start(vl, _Fmt); _build(_DefType.c_str(), _Fmt, vl); va_end(vl); _putlog(); return *this; } private: void _putlog() { fputs(s.data(), fp); if (fp != _stdout_fp) { //fflush(fp); if (PutStdout) fputs(s.data(), _stdout_fp); } } size_t _build(const char* _Type, const char* _Fmt, va_list vl) { s.clear(); size_t n = vsnprintf(nullptr, 0, _Fmt, vl); if (n <= 0) return _build_head(_Type); if (n >= s.capacity()) { s.clear(); s.reserve(n + head_presize); } size_t _Pos = _build_head(_Type); char* p = (char*)s.data(); _Pos += vsnprintf(p + _Pos, s.capacity(), _Fmt, vl); char c = p[_Pos - 1];#ifdef _WINDOWS_ if (c != '\r' && c != '\n') { p[_Pos++] = '\r'; p[_Pos++] = '\n'; p[_Pos] = '\0'; }#else if (c != '\r' && c != '\n') { p[_Pos++] = '\n'; p[_Pos] = '\0'; }#endif return _Pos; } size_t _build_time(size_t _Pos) { if (!(head_style & log::hs_time)) return _Pos; time_t t = time(NULL); auto _Tm = localtime(&t); _Pos += strftime((char*)s.data() + _Pos, head_presize, _fmtts.time.c_str(), _Tm); return _Pos; } size_t _build_type(size_t _Pos, const char* _Type) { if (!(head_style & log::hs_type)) return _Pos; _Pos += sprintf((char*)s.data() + _Pos, _fmtts.type.c_str(), _Type); return _Pos; } size_t _build_pid(size_t _Pos) { if (!(head_style & log::hs_pid)) return _Pos; auto _Pid = __aqxlog_getpid(); _Pos += sprintf((char*)s.data() + _Pos, _fmtts.pid.c_str(), _Pid); return _Pos; } size_t _build_tid(size_t _Pos) { if (!(head_style & log::hs_tid)) return _Pos; auto _Tid = __aqxlog_gettid(); _Pos += sprintf((char*)s.data() + _Pos, _fmtts.tid.c_str(), _Tid); return _Pos; } size_t _build_head(const char* _Type) { return _build_tid(_build_pid(_build_type(_build_time(0), _Type))); } size_t _gethps() { size_t _Result = 3; if (head_style & log::hs_time) _Result += ((_fmtts.time.length() << 1) + 30); if (head_style & log::hs_type) _Result += ((_fmtts.pid.length() << 1) + 12); if (head_style & log::hs_pid) _Result += ((_fmtts.pid.length() << 1) + 20); if (head_style & log::hs_tid) _Result += ((_fmtts.pid.length() << 1) + 20); return _Result; } private: std::vector<char> s; FILE* fp; _format_texts _fmtts; int head_style; size_t head_presize; bool PutStdout; FILE* _stdout_fp; std::mutex _Mtx; std::string _DefType; bool _EnableInfo; bool _EnableDebug; bool _EnableError; bool _EnableWarn; };}static aqx::log logger;#pragma warning(pop)
最后是测试代码:客户端和服务端放在一起了,要分离就从nio.init后面的几个地方分离一下。
// main.cpp#include <iostream>#include <aqx/netio.hpp>int main(){ aqx::init_winsock(); aqx::netio nio; nio.init(1440, 0x10000); // 一个简单的echo服务器例子: nio.server([](aqx::coio io)->aqx::netio::task { // 服务端始终应该放在一个死循环里,否则兜底逻辑会反复创建新协程。 for (;;) { // io.accept会返回一个可用于异步send和close的对象 auto s = co_await io.accept(); logger("客户端连入:%s", aqx::net_base::sockaddr_to_string(s.getsockaddr())); for (;;) { auto buf = co_await io.recv(s); if (!buf.length()) { logger("断开连接!"); break; } puts(buf.data()); buf.clear(); // 异步发送,协程不会在这里挂起 s.send("收到!", 5); } co_await io.close(s); logger("已关闭!"); } }, aqx::netio::listener("0.0.0.0:55554", 100, 100)); // 我已经懒到让客户端和服务端都放在一起了,要分自己分 auto sock1 = nio.client([](aqx::coio io)->aqx::netio::task { // 客户端只有需要自动重连,才放在循环里处理 for (;;) { auto s = co_await io.connect("127.0.0.1:55554"); if (!s) { co_await io.close(s); continue; } for (;;) { auto buf = co_await io.recv(s); if (!buf.length()) { break; } puts(buf.data()); buf.clear(); } co_await io.close(s); } }); // 我已经懒到让客户端和服务端都放在一起了,要分自己分 auto sock2 = nio.client([](aqx::coio io)->aqx::netio::task { // 客户端只有需要自动重连,才放在循环里处理 for (;;) { auto s = co_await io.connect("127.0.0.1:55554"); if (!s) { co_await io.close(s); continue; } for (;;) { auto buf = co_await io.recv(s); if (!buf.length()) { break; } puts(buf.data()); buf.clear(); } co_await io.close(s); } }); std::string str; for (;;) { std::cin >> str; if (str == "exit") break; std::string sd = "sock1:"; sd += str; sock1.safe_send(sd.data(), (int)sd.length() + 1); sd = "sock2:"; sd += str; sock2.safe_send(sd.data(), (int)sd.length() + 1); } nio.stop(); nio.release();}
我还是稍微负责一点,既然发现了编译bug,还是跟踪一下吧。
如果 recv_awaitable::await_ready()是inline时,流程态remuse切换到 协程态 时,会经过以下流程
00007FF723AF6000 mov r11,rsp
00007FF723AF6003 mov qword ptr [r11+10h],rbx
00007FF723AF6007 mov qword ptr [r11+18h],rsi
00007FF723AF600B mov qword ptr [r11+20h],rdi
00007FF723AF600F mov qword ptr [r11+8],rcx
00007FF723AF6013 push r12
00007FF723AF6015 push r14
00007FF723AF6017 push r15
00007FF723AF6019 sub rsp,90h
00007FF723AF6020 mov rax,qword ptr [__security_cookie (07FF723AFA008h)]
00007FF723AF6027 xor rax,rsp
00007FF723AF602A mov qword ptr [rsp+80h],rax
00007FF723AF6032 mov rdi,rcx
00007FF723AF6035 mov qword ptr [rsp+50h],rcx
00007FF723AF603A movzx eax,word ptr [rdi+2Ch]
00007FF723AF603E mov word ptr [rsp+48h],ax
00007FF723AF6043 inc ax
00007FF723AF6046 cmp ax,0Ah
00007FF723AF604A ja `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+463h (07FF723AF6463h)
00007FF723AF6050 movsx rax,ax
00007FF723AF6054 lea rdx,[__ImageBase (07FF723AF0000h)]
00007FF723AF605B mov ecx,dword ptr [rdx+rax*4+6494h]
00007FF723AF6062 add rcx,rdx
00007FF723AF6065 jmp rcx
00007FF723AF6067 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6069 xor r15d,r15d
00007FF723AF606C mov dword ptr [rdi+1B0h],r15d
00007FF723AF6073 mov r12d,10000h
00007FF723AF6079 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2E0h (07FF723AF62E0h)
00007FF723AF607E jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6080 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
}
}, aqx::netio::listener("0.0.0.0:55554", 100, 100));
00007FF723AF6082 cmp word ptr [rdi+0Ah],0
00007FF723AF6087 je `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF608D mov edx,1B4h
00007FF723AF6092 mov rcx,rdi
00007FF723AF6095 call operator delete (07FF723AF5504h)
00007FF723AF609A jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF609F xor r15d,r15d
00007FF723AF60A2 mov r12d,10000h
00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr] ******************************************** 在这里获取了__coro_frame_ptr.__resume_address
00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
00007FF723AF60B2 xor r15d,r15d
00007FF723AF60B5 mov r12d,10000h
00007FF723AF60BB jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2C4h (07FF723AF62C4h)
00007FF723AF60C0 xor r15d,r15d
00007FF723AF60C3 mov r12d,10000h
---------------------------------------------------------------------------------------------------
00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr]
它直接拷贝了协程帧结构下 offset=0的__resume_address
00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
紧接着直接跳转过去,就将rdx当作recv_awaitable去进行操作
---------------------------------------------------------------------------------------------------
这个问题我敢肯定100%是编译器bug,导致这个问题的原因,一定不是简简单单的内联因素,绝对会有更深层次的编译逻辑导致此bug,但那是微软的问题。