Use separate thread for kafka consumers cleanup

Since pool may exceed threads, while we need to run this thread always
to avoid memory leaking.

And this should not be a problem since librdkafka has multiple threads
for each consumer (5!) anyway.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-12-15 16:08:54 +01:00
parent a7453f7f14
commit 06a9e9a9ca
2 changed files with 63 additions and 29 deletions

View File

@ -47,6 +47,7 @@
#include <Common/config_version.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <base/sleep.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
@ -284,6 +285,14 @@ StorageKafka::StorageKafka(
task->deactivate();
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>([this]()
{
const auto & table = getStorageID().getTableName();
const auto & thread_name = std::string("KfkCln:") + table;
setThreadName(thread_name.c_str(), /*truncate=*/ true);
cleanConsumers();
});
}
SettingsChanges StorageKafka::createSettingsAdjustments()
@ -428,6 +437,18 @@ void StorageKafka::startup()
void StorageKafka::shutdown(bool)
{
shutdown_called = true;
cleanup_cv.notify_one();
{
LOG_TRACE(log, "Waiting for consumers cleanup thread");
Stopwatch watch;
if (cleanup_thread)
{
cleanup_thread->join();
cleanup_thread.reset();
}
LOG_TRACE(log, "Consumers cleanup thread finished in {} ms.", watch.elapsedMilliseconds());
}
{
LOG_TRACE(log, "Waiting for streaming jobs");
@ -616,42 +637,53 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n
void StorageKafka::cleanConsumers()
{
UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
/// Copy consumers for closing to a new vector to close them without a lock
std::vector<ConsumerPtr> consumers_to_close;
std::unique_lock lock(mutex);
std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS);
while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; }))
{
std::lock_guard lock(mutex);
/// Copy consumers for closing to a new vector to close them without a lock
std::vector<ConsumerPtr> consumers_to_close;
for (size_t i = 0; i < consumers.size(); ++i)
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
{
auto & consumer_ptr = consumers[i];
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
chassert(consumer_last_used_usec <= now_usec);
if (!consumer_ptr->hasConsumer())
continue;
if (consumer_ptr->isInUse())
continue;
if (now_usec - consumer_last_used_usec > ttl_usec)
for (size_t i = 0; i < consumers.size(); ++i)
{
LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId());
consumers_to_close.push_back(consumer_ptr->moveConsumer());
auto & consumer_ptr = consumers[i];
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
chassert(consumer_last_used_usec <= now_usec);
if (!consumer_ptr->hasConsumer())
continue;
if (consumer_ptr->isInUse())
continue;
if (now_usec - consumer_last_used_usec > ttl_usec)
{
LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId());
consumers_to_close.push_back(consumer_ptr->moveConsumer());
}
}
}
if (!consumers_to_close.empty())
{
lock.unlock();
Stopwatch watch;
size_t closed = consumers_to_close.size();
consumers_to_close.clear();
LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.",
closed, ttl_usec, watch.elapsedMilliseconds());
lock.lock();
}
ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
}
if (!consumers_to_close.empty())
{
Stopwatch watch;
size_t closed = consumers_to_close.size();
consumers_to_close.clear();
LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.",
closed, ttl_usec, watch.elapsedMilliseconds());
}
LOG_TRACE(log, "Consumers cleanup thread finished");
}
size_t StorageKafka::getMaxBlockSize() const
@ -890,8 +922,6 @@ void StorageKafka::threadFunc(size_t idx)
mv_attached.store(false);
cleanConsumers();
// Wait for attached views
if (!task->stream_cancelled)
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ThreadPool_fwd.h>
#include <Common/Macros.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
@ -106,6 +107,7 @@ private:
std::mutex mutex;
std::condition_variable cv;
std::condition_variable cleanup_cv;
// Stream thread
struct TaskContext
@ -119,6 +121,8 @@ private:
std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
/// For memory accounting in the librdkafka threads.
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;