From 8f60c4b8d264cb1ec3fc1f81b89ba455d6b6c28d Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 30 Aug 2021 16:37:27 +0300 Subject: [PATCH] remove stale loogs from asynchronous insertion queue --- .../AsynchronousInsertionQueue.cpp | 41 +++++++++++++------ src/Interpreters/AsynchronousInsertionQueue.h | 2 +- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertionQueue.cpp b/src/Interpreters/AsynchronousInsertionQueue.cpp index 6af913d3b89..56fdea8f6b4 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.cpp +++ b/src/Interpreters/AsynchronousInsertionQueue.cpp @@ -94,7 +94,6 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, siz , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) - , lock(RWLockImpl::create()) , queue(new Queue) , pool(pool_size) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) @@ -124,7 +123,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id) { - auto write_lock = lock->getLock(RWLockImpl::Write, String()); + std::unique_lock lock(rwlock); InsertQuery key{query, settings}; auto it = queue->find(key); @@ -165,20 +164,38 @@ void AsynchronousInsertQueue::busyCheck() { std::this_thread::sleep_for(timeout); - auto read_lock = lock->getLock(RWLockImpl::Read, String()); - /// TODO: use priority queue instead of raw unsorted queue. timeout = busy_timeout; - for (auto & [_, data] : *queue) + std::vector keys_to_remove; + { - std::unique_lock data_lock(data->mutex); + std::shared_lock read_lock(rwlock); - auto lag = std::chrono::steady_clock::now() - data->first_update; + for (auto & [key, data] : *queue) + { + std::unique_lock data_lock(data->mutex); - if (lag >= busy_timeout) - pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); - else - timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); + auto lag = std::chrono::steady_clock::now() - data->first_update; + + if (data->is_reset) + keys_to_remove.push_back(key); + else if (lag >= busy_timeout) + pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); }); + else + timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); + } + } + + if (!keys_to_remove.empty()) + { + std::unique_lock write_lock(rwlock); + + for (const auto & key : keys_to_remove) + { + auto it = queue->find(key); + if (it != queue->end() && it->second->is_reset) + queue->erase(it); + } } } } @@ -189,7 +206,7 @@ void AsynchronousInsertQueue::staleCheck() { std::this_thread::sleep_for(stale_timeout); - auto read_lock = lock->getLock(RWLockImpl::Read, String()); + std::shared_lock read_lock(rwlock); for (auto & [_, data] : *queue) { diff --git a/src/Interpreters/AsynchronousInsertionQueue.h b/src/Interpreters/AsynchronousInsertionQueue.h index d89006e0684..5d1c449eae5 100644 --- a/src/Interpreters/AsynchronousInsertionQueue.h +++ b/src/Interpreters/AsynchronousInsertionQueue.h @@ -65,7 +65,7 @@ class AsynchronousInsertQueue : public WithMutableContext const size_t max_data_size; /// in bytes const std::chrono::seconds busy_timeout, stale_timeout; - RWLock lock; + std::shared_mutex rwlock; std::unique_ptr queue; std::atomic shutdown{false};