remove stale loogs from asynchronous insertion queue

This commit is contained in:
Anton Popov 2021-08-30 16:37:27 +03:00
parent 7aec53bb8f
commit 8f60c4b8d2
2 changed files with 30 additions and 13 deletions

View File

@ -94,7 +94,6 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextMutablePtr context_, siz
, max_data_size(max_data_size_) , max_data_size(max_data_size_)
, busy_timeout(timeouts.busy) , busy_timeout(timeouts.busy)
, stale_timeout(timeouts.stale) , stale_timeout(timeouts.stale)
, lock(RWLockImpl::create())
, queue(new Queue) , queue(new Queue)
, pool(pool_size) , pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
@ -124,7 +123,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id) void AsynchronousInsertQueue::push(const ASTPtr & query, const Settings & settings, const String & query_id)
{ {
auto write_lock = lock->getLock(RWLockImpl::Write, String()); std::unique_lock lock(rwlock);
InsertQuery key{query, settings}; InsertQuery key{query, settings};
auto it = queue->find(key); auto it = queue->find(key);
@ -165,20 +164,38 @@ void AsynchronousInsertQueue::busyCheck()
{ {
std::this_thread::sleep_for(timeout); std::this_thread::sleep_for(timeout);
auto read_lock = lock->getLock(RWLockImpl::Read, String());
/// TODO: use priority queue instead of raw unsorted queue. /// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout; timeout = busy_timeout;
for (auto & [_, data] : *queue) std::vector<InsertQuery> keys_to_remove;
{ {
std::unique_lock<std::mutex> data_lock(data->mutex); std::shared_lock read_lock(rwlock);
auto lag = std::chrono::steady_clock::now() - data->first_update; for (auto & [key, data] : *queue)
{
std::unique_lock<std::mutex> data_lock(data->mutex);
if (lag >= busy_timeout) auto lag = std::chrono::steady_clock::now() - data->first_update;
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
else if (data->is_reset)
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag)); keys_to_remove.push_back(key);
else if (lag >= busy_timeout)
pool.scheduleOrThrowOnError([data = data, global_context = getContext()] { processData(data, global_context); });
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::seconds>(busy_timeout - lag));
}
}
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
for (const auto & key : keys_to_remove)
{
auto it = queue->find(key);
if (it != queue->end() && it->second->is_reset)
queue->erase(it);
}
} }
} }
} }
@ -189,7 +206,7 @@ void AsynchronousInsertQueue::staleCheck()
{ {
std::this_thread::sleep_for(stale_timeout); std::this_thread::sleep_for(stale_timeout);
auto read_lock = lock->getLock(RWLockImpl::Read, String()); std::shared_lock read_lock(rwlock);
for (auto & [_, data] : *queue) for (auto & [_, data] : *queue)
{ {

View File

@ -65,7 +65,7 @@ class AsynchronousInsertQueue : public WithMutableContext
const size_t max_data_size; /// in bytes const size_t max_data_size; /// in bytes
const std::chrono::seconds busy_timeout, stale_timeout; const std::chrono::seconds busy_timeout, stale_timeout;
RWLock lock; std::shared_mutex rwlock;
std::unique_ptr<Queue> queue; std::unique_ptr<Queue> queue;
std::atomic<bool> shutdown{false}; std::atomic<bool> shutdown{false};