diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 84ace9a880e..07acb15b277 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #if USE_KRB5 #include @@ -284,6 +285,14 @@ StorageKafka::StorageKafka( task->deactivate(); tasks.emplace_back(std::make_shared(std::move(task))); } + + cleanup_thread = std::make_unique([this]() + { + const auto & table = getStorageID().getTableName(); + const auto & thread_name = std::string("KfkCln:") + table; + setThreadName(thread_name.c_str(), /*truncate=*/ true); + cleanConsumers(); + }); } SettingsChanges StorageKafka::createSettingsAdjustments() @@ -428,6 +437,18 @@ void StorageKafka::startup() void StorageKafka::shutdown(bool) { shutdown_called = true; + cleanup_cv.notify_one(); + + { + LOG_TRACE(log, "Waiting for consumers cleanup thread"); + Stopwatch watch; + if (cleanup_thread) + { + cleanup_thread->join(); + cleanup_thread.reset(); + } + LOG_TRACE(log, "Consumers cleanup thread finished in {} ms.", watch.elapsedMilliseconds()); + } { LOG_TRACE(log, "Waiting for streaming jobs"); @@ -616,42 +637,53 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n void StorageKafka::cleanConsumers() { UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000; - UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - /// Copy consumers for closing to a new vector to close them without a lock - std::vector consumers_to_close; + std::unique_lock lock(mutex); + std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS); + while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; })) { - std::lock_guard lock(mutex); + /// Copy consumers for closing to a new vector to close them without a lock + std::vector consumers_to_close; - for (size_t i = 0; i < consumers.size(); ++i) + UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); { - auto & consumer_ptr = consumers[i]; - - UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); - chassert(consumer_last_used_usec <= now_usec); - - if (!consumer_ptr->hasConsumer()) - continue; - if (consumer_ptr->isInUse()) - continue; - - if (now_usec - consumer_last_used_usec > ttl_usec) + for (size_t i = 0; i < consumers.size(); ++i) { - LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); - consumers_to_close.push_back(consumer_ptr->moveConsumer()); + auto & consumer_ptr = consumers[i]; + + UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); + chassert(consumer_last_used_usec <= now_usec); + + if (!consumer_ptr->hasConsumer()) + continue; + if (consumer_ptr->isInUse()) + continue; + + if (now_usec - consumer_last_used_usec > ttl_usec) + { + LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); + consumers_to_close.push_back(consumer_ptr->moveConsumer()); + } } } + + if (!consumers_to_close.empty()) + { + lock.unlock(); + + Stopwatch watch; + size_t closed = consumers_to_close.size(); + consumers_to_close.clear(); + LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", + closed, ttl_usec, watch.elapsedMilliseconds()); + + lock.lock(); + } + + ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000; } - if (!consumers_to_close.empty()) - { - Stopwatch watch; - size_t closed = consumers_to_close.size(); - consumers_to_close.clear(); - LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", - closed, ttl_usec, watch.elapsedMilliseconds()); - } + LOG_TRACE(log, "Consumers cleanup thread finished"); } size_t StorageKafka::getMaxBlockSize() const @@ -890,8 +922,6 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(false); - cleanConsumers(); - // Wait for attached views if (!task->stream_cancelled) task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 84f4022790f..20f42f9e7df 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -106,6 +107,7 @@ private: std::mutex mutex; std::condition_variable cv; + std::condition_variable cleanup_cv; // Stream thread struct TaskContext @@ -119,6 +121,8 @@ private: std::vector> tasks; bool thread_per_consumer = false; + std::unique_ptr cleanup_thread; + /// For memory accounting in the librdkafka threads. std::mutex thread_statuses_mutex; std::list> thread_statuses;