From d24bfce93fda2d35360213adc3f90936d8cab010 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 7 Oct 2021 17:03:54 +0300 Subject: [PATCH] Add coroutines example. --- src/Core/examples/CMakeLists.txt | 3 + src/Core/examples/coro.cpp | 202 +++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 src/Core/examples/coro.cpp diff --git a/src/Core/examples/CMakeLists.txt b/src/Core/examples/CMakeLists.txt index 6b07dfbbfa6..c8846eb1743 100644 --- a/src/Core/examples/CMakeLists.txt +++ b/src/Core/examples/CMakeLists.txt @@ -13,3 +13,6 @@ target_link_libraries (mysql_protocol PRIVATE dbms) if(USE_SSL) target_include_directories (mysql_protocol SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR}) endif() + +add_executable (coro coro.cpp) +target_link_libraries (coro PRIVATE clickhouse_common_io) diff --git a/src/Core/examples/coro.cpp b/src/Core/examples/coro.cpp new file mode 100644 index 00000000000..c8e2f7418e4 --- /dev/null +++ b/src/Core/examples/coro.cpp @@ -0,0 +1,202 @@ +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#if defined(__clang__) + +#include + +template +using coroutine_handle = std::experimental::coroutine_handle; + +using default_coroutine_handle = std::experimental::coroutine_handle<>; + +using suspend_never = std::experimental::suspend_never; +using suspend_always = std::experimental::suspend_always; + +#else + +#include + +template +using coroutine_handle = std::coroutine_handle; + +using default_coroutine_handle = std::coroutine_handle<>; + +using suspend_never = std::suspend_never; +using suspend_always = std::suspend_always; + +#endif + + +template +struct suspend_never_val +{ + constexpr bool await_ready() const noexcept { return true; } + constexpr void await_suspend(default_coroutine_handle) const noexcept {} + constexpr T await_resume() const noexcept + { + std::cout << " ret " << val << std::endl; + return val; + } + + T val; +}; + +template +struct resumable +{ + struct promise_type + { + using coro_handle = coroutine_handle; + auto get_return_object() { return coro_handle::from_promise(*this); } + auto initial_suspend() { return suspend_never(); } + auto final_suspend() noexcept { return suspend_never_val{*r->value}; } + //void return_void() {} + void return_value(T value_) { r->value = value_; } + void unhandled_exception() + { + DB::tryLogCurrentException("Logger"); + r->exception = std::current_exception(); + } + + explicit promise_type(std::string tag_) : tag(tag_) {} + ~promise_type() { std::cout << "~promise_type " << tag << std::endl; } + std::string tag; + coro_handle next; + resumable * r = nullptr; + }; + + using coro_handle = coroutine_handle; + + bool await_ready() const noexcept { return false; } + void await_suspend(coro_handle g) noexcept + { + std::cout << " await_suspend " << my.promise().tag << std::endl; + std::cout << " g tag " << g.promise().tag << std::endl; + g.promise().next = my; + } + T await_resume() noexcept + { + std::cout << " await_res " << my.promise().tag << std::endl; + return *value; + } + + resumable(coro_handle handle) : my(handle), tag(handle.promise().tag) + { + assert(handle); + my.promise().r = this; + std::cout << " resumable " << tag << std::endl; + } + resumable(resumable &) = delete; + resumable(resumable &&rhs) : my(rhs.my), tag(rhs.tag) + { + rhs.my = {}; + std::cout << " resumable&& " << tag << std::endl; + } + static bool resume_impl(resumable *r) + { + if (r->value) + return false; + + auto & next = r->my.promise().next; + + if (next) + { + if (resume_impl(next.promise().r)) + return true; + next = {}; + } + + if (!r->value) + { + r->my.resume(); + if (r->exception) + std::rethrow_exception(r->exception); + } + return !r->value; + } + + bool resume() + { + return resume_impl(this); + } + + T res() + { + return *value; + } + + ~resumable() + { + std::cout << " ~resumable " << tag << std::endl; + } + +private: + coro_handle my; + std::string tag; + std::optional value; + std::exception_ptr exception; +}; + +resumable boo(std::string tag) +{ + std::cout << "x" << std::endl; + co_await suspend_always(); + std::cout << StackTrace().toString(); + std::cout << "y" << std::endl; + co_return 1; +} + +resumable bar(std::string tag) +{ + std::cout << "a" << std::endl; + int res1 = co_await boo("boo1"); + std::cout << "b " << res1 << std::endl; + int res2 = co_await boo("boo2"); + if (res2 == 1) + throw DB::Exception(1, "hello"); + std::cout << "c " << res2 << std::endl; + co_return res1 + res2; // 1 + 1 = 2 +} + +resumable foo(std::string tag) { + std::cout << "Hello" << std::endl; + auto res1 = co_await bar("bar1"); + std::cout << "Coro " << res1 << std::endl; + auto res2 = co_await bar("bar2"); + std::cout << "World " << res2 << std::endl; + co_return res1 * res2; // 2 * 2 = 4 +} + +int main() +{ + Poco::AutoPtr app_channel(new Poco::ConsoleChannel(std::cerr)); + Poco::Logger::root().setChannel(app_channel); + Poco::Logger::root().setLevel("trace"); + + LOG_INFO(&Poco::Logger::get(""), "Starting"); + + try + { + auto t = foo("foo"); + std::cout << ".. started" << std::endl; + while (t.resume()) + std::cout << ".. yielded" << std::endl; + std::cout << ".. done: " << t.res() << std::endl; + } + catch (DB::Exception & e) + { + std::cout << "Got exception " << e.what() << std::endl; + std::cout << e.getStackTraceString() << std::endl; + } +}