Async Scope
之前的章节只是讲解了协程中的各种概念
本章我们来实现一个async scope,即一个可以在其中使用co_await和co_return的函数
最后我们要在这个基础上实现一个单元素的回调转协程
Promise实现
这里我们直接把整个Scope建模为一个Task
先看看整体的样式
Task<int> simple_task2() {
std::cout << __func__ << "\n";
co_return 2;
}
Task<int> simple_task1() {
std::cout << __func__ << "\n";
co_return 1;
}
Task<int> simple_task() {
// result2 == 2
auto result2 = co_await simple_task1();
std::cout << __func__ << " co_await task1 \n";
// result3 == 3
auto result3 = co_await simple_task2();
std::cout << __func__ << " co_await task2 \n";
co_return 1 + result2 + result3;
}
这里可以推断出两个要做的事情
- co_await 对应的await_transform允许接受Task作为参数并返回一个Awaiter
- co_return 对应的return_value允许接受T来作为整个任务的结果值
先考虑一个单独异步任务,即我们最常见的那种Promise/Future模型,promise负责支持complete触发回调,future负责挂载回调,那么我们就可以这样考虑:
promise::complete 的实现就是 return_value 的实现
void return_value(ResultType value) {
done = true;
res = Result<ResultType>(std::move(value));
for (auto &callback : completion_callbacks) {
callback(res);
}
}
那么下一步的设计就该考虑completion_callbacks放在哪里了
首先按照习惯性设计外部的Future(这里是C++的协程概念中的future,就是promise外面那个)是会持有当前的coroutine_handle的通过这个很容易拿到promise,但是promise是不知道future的,所以我们直接把completion_callbacks放在promise内部即可。
这里跟co_await实现倒是没什么关系。。。只是给一个外部可以给task挂载回调的机制罢了
Task &finally(std::function<void()> &&func) {
handle.promise().on_completed([func](auto result) { func(); });
return *this;
}
接下来就是暂存结果值和实现三大件了
都很简单
struct promise_type {
auto initial_suspend() { return std::suspend_never{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
auto unhandled_exception() {
done = true;
res = Result<ResultType>(std::current_exception());
for (auto &callback : completion_callbacks) {
callback(res);
}
}
// 定制co_return行为 当调用这个方法时意味着此时
void return_value(ResultType value) {
done = true;
res = Result<ResultType>(std::move(value));
for (auto &callback : completion_callbacks) {
callback(res);
}
}
Task<ResultType> get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
template <typename _ResultType>
TaskAwaiter<_ResultType> await_transform(Task<_ResultType> &&task) {
return TaskAwaiter<_ResultType>(std::move(task));
}
void on_completed(std::function<void(Result<ResultType>)> &&func) {
if (done) {
// result 已经有值
auto value = res;
// 解锁之后再调用 func
func(value);
} else {
// 否则添加回调函数,等待调用
completion_callbacks.push_back(func);
}
}
auto get_result() { return res.get_or_throw(); }
public:
Result<ResultType> res;
bool done = false;
std::list<std::function<void(Result<ResultType>)>> completion_callbacks;
};
Awaitable
剩下的就是awaitable实现,这个主要用于co_await这个操作符的返回值,此时我们需要根据其右侧的状态来决定是否挂起以及返回值了,就是等待子任务·
我们再看一眼这个函数的签名,注意Task是个C++的Future哦
template <typename _ResultType>
TaskAwaiter<_ResultType> await_transform(Task<_ResultType> &&task) {
return TaskAwaiter<_ResultType>(std::move(task));
}
直接看代码吧 很简单
//因为task是future所以我们有个handle指针再获取到promise再获取到done变量状态
bool await_ready() const noexcept {
return task.handle.promise().done;
}
//注意这个handle代指的是当前continuation,是调用方的句柄,当我resume时,等价于从co_await处向下执行 而task则是代指的子任务
void await_suspend(std::coroutine_handle<> handle) noexcept {
// 当 task 执行完之后调用 resume
task.finally([handle]() { handle.resume(); });
}
// 协程恢复执行时,被等待的子 Task 已经执行完,调用 get_result 来获取结果
R await_resume() noexcept { return task.get_result(); }
简单的一个async scope就完成了
回调转协程
这个就更简单啦
我们要实现的结构类似于js中的
fn(args,(i) => {fun1(i)});
fun1(await fn(args))
状态储存
这个就有点像我们Vert.x的Promise/Future实现了
template <typename T> struct AsyncResult {
explicit AsyncResult() = default;
// 当 Task 正常返回时用结果初始化 Result
explicit AsyncResult(T &&value)
: res(value),
completion_callbacks(std::move(value.completion_callbacks)) {}
// 这里先暂时一把大锁控制一下 好理解
void complete(Result<T> &&async_value) {
std::cout << __func__ << "AsyncResult 准备触发回调 \n";
auto scope = std::lock_guard(queue_lock);
done = true;
res = async_value;
for (auto fn : completion_callbacks) {
fn(res);
}
completion_callbacks.clear();
}
void on_completed(std::function<void(Result<T>)> &&func) {
auto scope = std::lock_guard(queue_lock);
if (done) {
// result 已经有值
auto value = res;
// 解锁之后再调用 func
func(value);
} else {
// 否则添加回调函数,等待调用
completion_callbacks.push_back(func);
}
}
bool is_done() { return done; }
Result<T> get_result() { return res; }
private:
std::mutex queue_lock;
std::list<std::function<void(Result<T>)>> completion_callbacks;
Result<T> res;
bool done = false;
};
awaitable
既然要允许await那就得把之前实现的AsyncResult添加点awaitable实现
这里我们采用组合的方式来做
这里是一个多所有权的场景 直接用share_ptr把这个玩意让协程和异步任务共同持有
我思考了一下,这里因为存在一定的顺序性 raw ptr也是可行的
异步任务持有指针不释放
等待awaitable析构时自然释放即可
template <typename T> struct AsyncResultAwaiter {
//省略构造参数
bool await_ready() const noexcept { return res->is_done(); }
void await_suspend(std::coroutine_handle<> handle) noexcept {
// 当 task 执行完之后调用 resume
res->on_completed([handle](auto a) {
std::cout << __func__ << "AsyncResultAwaiter 回调 \n";
std::cout << handle.address();
handle.resume();
});
}
Result<T> await_resume() noexcept {
return res.get()->get_result();
}
std::shared_ptr<AsyncResult<T>> res;
std::coroutine_handle<> handle;
};
这里就相当于kt的那种做法,直接拿到coroutine_handle来用
最后我们在之前实现的task的promise中添加对它的支持就行了
这里的参数名为future的参数就是与异步任务共享的那个回调,这里只是为了能拿到协程句柄丢到对应的异步任务回调中罢了
template <typename _ResultType>
AsyncResultAwaiter<_ResultType>
await_transform(std::shared_ptr<AsyncResult<_ResultType>> future) {
return AsyncResultAwaiter(
future, std::coroutine_handle<promise_type>::from_promise(*this));
}
用法
Task<int> async_task() {
// result2 == 2
auto shared_ptr = std::make_shared<AsyncResult<int>>();
auto thread = std::thread([shared_ptr]() {
std::cout << "thread :in lambda \n";
std::this_thread::sleep_for(std::chrono::seconds(1));
shared_ptr->complete(Result(12));
});
thread.detach();
std::cout << "before await count:" << shared_ptr.use_count() << "\n";
auto res = co_await shared_ptr;
std::cout << "res:" << res.get_or_throw() << "\n";
co_return res.get_or_throw() + 3;
}
总结
这里面我只是实现了 而非打磨好了 其中还有一些内存所有权的问题还没有解决甚至存在内存泄漏
但是先学会再打磨
完整代码参考codepieces/async_scope_and_future.cpp at main · dreamlike-ocean/codepieces · GitHub