From c61e9a655a9af4a81db5e384372122f68f57fff9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 12 Jan 2021 13:01:19 +0300 Subject: [PATCH] Print log in case of exception in PollingQueue --- src/Processors/Executors/PollingQueue.cpp | 23 ++++++++++++++++++++--- src/Processors/Executors/PollingQueue.h | 9 +++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/Processors/Executors/PollingQueue.cpp b/src/Processors/Executors/PollingQueue.cpp index dfd146fcb0a..0a7f50f5fc2 100644 --- a/src/Processors/Executors/PollingQueue.cpp +++ b/src/Processors/Executors/PollingQueue.cpp @@ -59,9 +59,11 @@ void PollingQueue::addTask(size_t thread_number, void * data, int fd) if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &socket_event)) throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + + log.emplace_back(Log{.add = true, .key = key, .ptr = data}); } -std::string dumpTasks(const std::unordered_map & tasks) +static std::string dumpTasks(const std::unordered_map & tasks) { WriteBufferFromOwnString res; res << "Tasks = ["; @@ -77,6 +79,19 @@ std::string dumpTasks(const std::unordered_map & log) +{ + WriteBufferFromOwnString res; + for (const auto & item : log) + { + res << (item.add ? '+' : '-') << ' ' << item.key << ' '; + writePointerHex(item.ptr, res); + res << '\n'; + } + + return res.str(); +} + PollingQueue::TaskData PollingQueue::wait(std::unique_lock & lock) { if (is_finished) @@ -105,8 +120,8 @@ PollingQueue::TaskData PollingQueue::wait(std::unique_lock & lock) auto it = tasks.find(key); if (it == tasks.end()) { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Task {} ({}) was not found in task queue: {}", - key, ptr, dumpTasks(tasks)); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Task {} ({}) was not found in task queue: {}\n {}", + key, ptr, dumpTasks(tasks), dumpLog(log)); } auto res = it->second; @@ -115,6 +130,8 @@ PollingQueue::TaskData PollingQueue::wait(std::unique_lock & lock) if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, res.fd, &event)) throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + log.emplace_back(Log{.add = true, .key = key, .ptr = ptr}); + return res; } diff --git a/src/Processors/Executors/PollingQueue.h b/src/Processors/Executors/PollingQueue.h index 9d37bf0a2cc..74f3b414705 100644 --- a/src/Processors/Executors/PollingQueue.h +++ b/src/Processors/Executors/PollingQueue.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -24,11 +25,19 @@ public: explicit operator bool() const { return data; } }; + struct Log + { + bool add; + std::uintptr_t key; + const void * ptr; + }; + private: int epoll_fd; int pipe_fd[2]; std::atomic_bool is_finished = false; std::unordered_map tasks; + std::vector log; public: PollingQueue();