翻译自Asyncio, asyncio is a c++20 library to write concurrent code using the async/await syntax.,仅做个人学习笔记
安装
配置环境
-
小水管编译时内存不够,增加swap
Linux系统如何增加虚拟内存 -
需要用到C++20的新特性,Ubuntu自带的g++非常的老,需要下载11/12
想切换版本就链接到不同版本
Ubuntu升级gcc和g++到10 -
将自带的Cmake删除,换用较新的
编译安装Cmake
编译项目源码
- Build
$ git clone --recursive https://github.com/netcan/asyncio.git
$ cd asyncio
$ mkdir build
$ cd build
$ cmake ..
$ make -j
- Test
$cd asyncio
$cmake .
$make
- Run
在Test文件夹下可以看到生成的测试程序,hello_world
,echo_client
,echo_server
等
例子
Hello World
Task<> hello_world() {
fmt::print("hello\n");
co_await asyncio::sleep(1s);
fmt::print("world\n");
}
int main() {
asyncio::run(hello_world());
}
output:
hello
world
Dump callstack
打印当前栈信息
Task<int> factorial(int n) {
if (n <= 1) {
co_await dump_callstack();
co_return 1;
}
co_return (co_await factorial(n - 1)) * n;
}
int main() {
fmt::print("run result: {}\n", asyncio::run(factorial(10)));
return 0;
}
output:
[0] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:17
[1] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[2] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[3] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[4] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[5] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[6] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[7] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[8] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
[9] void factorial(factorial(int)::_Z9factoriali.Frame*) at asyncio/test/st/hello_world.cpp:20
run result: 3628800
TCP Echo
Client
Task<> tcp_echo_client(std::string_view message) {
auto stream = co_await asyncio::open_connection("127.0.0.1", 8888);
fmt::print("Send: '{}'\n", message);
co_await stream.write(Stream::Buffer(message.begin(), message.end()));
auto data = co_await stream.read(100);
fmt::print("Received: '{}'\n", data.data());
fmt::print("Close the connection\n");
stream.close(); // unneeded, just imitate python
}
int main(int argc, char** argv) {
asyncio::run(tcp_echo_client("hello world!"));
return 0;
}
output:
Send: 'hello world!'
Received: 'hello world!'
Close the connection
Server
Task<> handle_echo(Stream stream) {
auto& sockinfo = stream.get_sock_info();
auto sa = reinterpret_cast<const sockaddr*>(&sockinfo);
char addr[INET6_ADDRSTRLEN] {};
auto data = co_await stream.read(100);
fmt::print("Received: '{}' from '{}:{}'\n", data.data(),
inet_ntop(sockinfo.ss_family, get_in_addr(sa), addr, sizeof addr),
get_in_port(sa));
fmt::print("Send: '{}'\n", data.data());
co_await stream.write(data);
fmt::print("Close the connection\n");
stream.close(); // unneeded, just imitate python
}
Task<void> amain() {
auto server = co_await asyncio::start_server(
handle_echo, "127.0.0.1", 8888);
fmt::print("Serving on 127.0.0.1:8888\n");
co_await server.serve_forever();
}
int main() {
asyncio::run(amain());
return 0;
}
output:
Serving on 127.0.0.1:8888
Received: 'Hello World!' from '127.0.0.1:49588'
Send: 'Hello World!'
Close the connection
Gather
auto factorial(std::string_view name, int number) -> Task<int> {
int r = 1;
for (int i = 2; i <= number; ++i) {
fmt::print("Task {}: Compute factorial({}), currently i={}...\n", name, number, i);
co_await asyncio::sleep(500ms);
r *= i;
}
fmt::print("Task {}: factorial({}) = {}\n", name, number, r);
co_return r;
};
auto test_void_func() -> Task<> {
fmt::print("this is a void value\n");
co_return;
};
int main() {
asyncio::run([&]() -> Task<> {
auto&& [a, b, c, _void] = co_await asyncio::gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
test_void_func());
assert(a == 2);
assert(b == 6);
assert(c == 24);
}());
}
output:
Task A: Compute factorial(2), currently i=2...
Task B: Compute factorial(3), currently i=2...
Task C: Compute factorial(4), currently i=2...
this is a void value
Task C: Compute factorial(4), currently i=3...
Task A: factorial(2) = 2
Task B: Compute factorial(3), currently i=3...
Task B: factorial(3) = 6
Task C: Compute factorial(4), currently i=4...
Task C: factorial(4) = 24
FQA
如何处理被取消的协程
Q:从技术上讲,您可以添加一个事件循环队列中不存在的handle。在这种情况下,取消的event是否会成为一个危险的问题?
void cancel_handle(Handle& handle) {
cancelled_.insert(&handle);
}
A:在某些情况下可能会出现内存泄露,但它是安全的,cancelled set保存被销毁的handle,它会通知eventloop当handle就绪的时候,然后eventloop会跳过这个handle,并将它从cancelled set中删除
A:你说的也对。我发现release版本下的一个Bug,当一个handle被销毁然后插入cancelled set中。然后创建一个新的coroutine,它和被销毁的coroutine handle有相同的地址!!Loop将会删除已创建的coroutine. fixed patch: https://github.com/netcan/asyncio/commit/23e6a38f5d00b55037f9560845c4e44948e41709
和其他方法比,coroutine的性能和不同
Q:首先,很棒的工作!但是我应该何时使用coroutine何时不使用,它们太新了,不知道能带来多大性能,以及和其他方法的比较。
A:问得好。就我的观点而言,协程只是回调的语法糖,换句话说,任何需要回调接口的场景都可以被协程所取代,一个典型的异步编程模式涉及大量回调,因此使用协程代码比回调风格更具可读性。
A:就性能而言,协程只是可恢复的函数,它支持挂起和恢复,我测试了协程的调用/挂起/恢复,它只需要几十纳秒,与回调编程相比,反而具有负开销。更多详细的信息可见https://www.youtube.com/watch?v=_fu0gx-xseY,协程的作者的讲解
为什么即使在单线程模式下也需要一些原语(异步互斥锁/异步条件变量)?
Q:我很好奇,你能分享一下这些原语(异步互斥、同步等待)会做什么吗?
A:为了能够创建整个异步应用程序,我们不应该阻止线程池中的任何线程。使用操作系统调度器的传统同步原语(如std::mutex std::condition_变量)在这种情况下是无用的,我们需要这些原语与内部应用程序调度器协作。
Q: OK。好,不管这个了。我对协程库好奇,我的只是单线程
A:你仍然需要他们,尽管你只有一个线程(你只是不需要担心并发问题),你需要考虑例如 如何等待一个条件变量,并从其他地方通知它?如何join多个异步任务
A:这些原语是需要的。例如在游戏场景中,服务器必须等待收集客户端的所有命令,然后coroutine继续执行游戏逻辑,这需要条件变量。
io_uring
比 epoll
更好
Q:这个结果是难以置信的,但是有可能的,io的量级是毫秒
看情况,使用io_uring或者用户态的网络栈,可以获得微秒/纳秒级范围内的io。到目前为止,我看过的最佳的ping-pong程序,发送第一个字节的时间是1.2微秒,这包括了:网卡接收字节、PCI总线将字节传输到CPU、CPU读取请求并写入response、PCI总线将字节传输到网卡、网卡发送字节
当今IO的主要问题是系统调用开销,如果你不使用系统调用,io_uring是最经济的替代品,您可以得到一个数量级的加速。
A:如果我们记错的话,一个系统调用大概100ns(empty epoll_wait benchmark),但是io_uring可能是比epoll快,我也看到一些相关的比较
为什么python asyncio的性能如此优异?
Q:为什么python asyncio的性能如此优异?
A:因为有许多大佬已经优化python asyncio很多年了,在固定开销的意义上,它总是比C++慢,但是在可伸缩性和边界情况处理方面,它应该接近最优。
如何打印协程的调用栈?
Q:有个例子中你打印了调用栈,可以理解为"async calll stack"和传统的调用栈是不同的吗?你是如何获取这些信息的?我对这个很好奇,因为一直在想实现一个帮助调式的工具。
A:是的,它是async callstack,重点是利用协程promise_type的await_transform()
函数,它可以保存一个协程的source_location i
信息,换句话说,当用户调用co_await
时,就会保存await的位置信息(https://github.com/netcan/asyncio/blob/5ae5fdffcd065df4d9bf758741ac75647cf2f19a/include/asyncio/task.h#L113) , dump backtrace
非常简单,只需要递归的打印出协程的source_location
和 它的其他信息