Print log in case of exception in PollingQueue

This commit is contained in:
Nikolai Kochetov 2021-01-12 13:01:19 +03:00
parent 1d6bfe8ad7
commit c61e9a655a
2 changed files with 29 additions and 3 deletions

View File

@ -59,9 +59,11 @@ void PollingQueue::addTask(size_t thread_number, void * data, int fd)
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &socket_event)) if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &socket_event))
throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
log.emplace_back(Log{.add = true, .key = key, .ptr = data});
} }
std::string dumpTasks(const std::unordered_map<std::uintptr_t, PollingQueue::TaskData> & tasks) static std::string dumpTasks(const std::unordered_map<std::uintptr_t, PollingQueue::TaskData> & tasks)
{ {
WriteBufferFromOwnString res; WriteBufferFromOwnString res;
res << "Tasks = ["; res << "Tasks = [";
@ -77,6 +79,19 @@ std::string dumpTasks(const std::unordered_map<std::uintptr_t, PollingQueue::Tas
return res.str(); return res.str();
} }
static std::string dumpLog(const std::vector<PollingQueue::Log> & log)
{
WriteBufferFromOwnString res;
for (const auto & item : log)
{
res << (item.add ? '+' : '-') << ' ' << item.key << ' ';
writePointerHex(item.ptr, res);
res << '\n';
}
return res.str();
}
PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock) PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock)
{ {
if (is_finished) if (is_finished)
@ -105,8 +120,8 @@ PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock)
auto it = tasks.find(key); auto it = tasks.find(key);
if (it == tasks.end()) if (it == tasks.end())
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task {} ({}) was not found in task queue: {}", throw Exception(ErrorCodes::LOGICAL_ERROR, "Task {} ({}) was not found in task queue: {}\n {}",
key, ptr, dumpTasks(tasks)); key, ptr, dumpTasks(tasks), dumpLog(log));
} }
auto res = it->second; auto res = it->second;
@ -115,6 +130,8 @@ PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock)
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, res.fd, &event)) if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, res.fd, &event))
throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
log.emplace_back(Log{.add = true, .key = key, .ptr = ptr});
return res; return res;
} }

View File

@ -4,6 +4,7 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <unordered_map> #include <unordered_map>
#include <vector>
namespace DB namespace DB
{ {
@ -24,11 +25,19 @@ public:
explicit operator bool() const { return data; } explicit operator bool() const { return data; }
}; };
struct Log
{
bool add;
std::uintptr_t key;
const void * ptr;
};
private: private:
int epoll_fd; int epoll_fd;
int pipe_fd[2]; int pipe_fd[2];
std::atomic_bool is_finished = false; std::atomic_bool is_finished = false;
std::unordered_map<std::uintptr_t, TaskData> tasks; std::unordered_map<std::uintptr_t, TaskData> tasks;
std::vector<Log> log;
public: public:
PollingQueue(); PollingQueue();