From 123d63e82448003026d5fc3553db76b786d78319 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 13 Dec 2023 14:57:22 +0100 Subject: [PATCH 01/11] Remove StorageKafka::num_created_consumers (in favor of all_consumers.size()) Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 13 ++++++------- src/Storages/Kafka/StorageKafka.h | 4 ---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 34138b2237f..5b484aca7f0 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -343,7 +343,7 @@ Pipe StorageKafka::read( size_t /* max_block_size */, size_t /* num_streams */) { - if (num_created_consumers == 0) + if (all_consumers.empty()) return {}; if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) @@ -357,12 +357,12 @@ Pipe StorageKafka::read( /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. Pipes pipes; - pipes.reserve(num_created_consumers); + pipes.reserve(all_consumers.size()); auto modified_context = Context::createCopy(local_context); modified_context->applySettingsChanges(settings_adjustments); // Claim as many consumers as requested, but don't block - for (size_t i = 0; i < num_created_consumers; ++i) + for (size_t i = 0; i < all_consumers.size(); ++i) { /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block /// TODO: probably that leads to awful performance. @@ -419,7 +419,6 @@ void StorageKafka::startup() auto consumer = createConsumer(i); pushConsumer(consumer); all_consumers.push_back(consumer); - ++num_created_consumers; } catch (const cppkafka::Exception &) { @@ -447,7 +446,7 @@ void StorageKafka::shutdown(bool) } LOG_TRACE(log, "Closing consumers"); - for (size_t i = 0; i < num_created_consumers; ++i) + for (size_t i = 0; i < all_consumers.size(); ++i) auto consumer = popConsumer(); LOG_TRACE(log, "Consumers closed"); @@ -756,7 +755,7 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(true); // Keep streaming as long as there are attached views and streaming is not cancelled - while (!task->stream_cancelled && num_created_consumers > 0) + while (!task->stream_cancelled && !all_consumers.empty()) { if (!checkDependencies(table_id)) break; @@ -844,7 +843,7 @@ bool StorageKafka::streamToViews() std::vector> sources; Pipes pipes; - auto stream_count = thread_per_consumer ? 1 : num_created_consumers; + auto stream_count = thread_per_consumer ? 1 : all_consumers.size(); sources.reserve(stream_count); pipes.reserve(stream_count); for (size_t i = 0; i < stream_count; ++i) diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 9280809be0e..66f92a2df88 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -108,10 +108,6 @@ private: std::atomic mv_attached = false; - /// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called). - /// In this case we still need to be able to shutdown() properly. - size_t num_created_consumers = 0; /// number of actually created consumers. - std::vector consumers; /// available consumers std::vector all_consumers; /// busy (belong to a KafkaSource) and vacant consumers From 51d4f583e6a6aa2dcf5fadfd75421b0466a96e8e Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 13 Dec 2023 19:23:52 +0100 Subject: [PATCH 02/11] Properly set shutdown_called in StorageKafka::shutdown() Fixes: https://github.com/ClickHouse/ClickHouse/pull/42777 Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 5b484aca7f0..56a2ed81bfd 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -436,6 +436,8 @@ void StorageKafka::startup() void StorageKafka::shutdown(bool) { + shutdown_called = true; + for (auto & task : tasks) { // Interrupt streaming thread From e7592c140e2659bbb22cc70279bcaa514e4a44c7 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 13 Dec 2023 15:32:52 +0100 Subject: [PATCH 03/11] Create consumers for Kafka tables on fly (but keep them for 1min since last used) Pool of consumers created a problem for librdkafka internal statistics, you need to read from the queue always, while in ClickHouse consumers created regardless are there any readers or not (attached materialized views or direct SELECTs). Otherwise, this statistics messages got queued and never released, which: - creates live memory leak - and also makes destroy very slow, due to librdkafka internals (it moves entries from this queue into another linked list, but in a with sorting, which is incredibly slow for linked lists) So the idea is simple, let's create a pool of consumers only when they are required, and destroy them after some timeout (right now it is 60 seconds) if nobody uses them, that way this problem should gone. This should also reduce number of internal librdkafka threads, when nobody reads from Kafka tables. Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/KafkaConsumer.cpp | 1 + src/Storages/Kafka/KafkaConsumer.h | 16 +- src/Storages/Kafka/StorageKafka.cpp | 161 +++++++++++++----- src/Storages/Kafka/StorageKafka.h | 4 +- .../System/StorageSystemKafkaConsumers.cpp | 3 + 5 files changed, 140 insertions(+), 45 deletions(-) diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index 31d431e27fe..c01c9564f4b 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -604,6 +604,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const .exceptions_buffer = [&](){std::lock_guard lock(exception_mutex); return exceptions_buffer;}(), .in_use = in_use.load(), + .last_used_usec = last_used_usec.load(), .rdkafka_stat = [&](){std::lock_guard lock(rdkafka_stat_mutex); return rdkafka_stat;}(), }; diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index f9758ff6c90..ecd201f5a6e 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -57,6 +57,7 @@ public: UInt64 num_rebalance_revocations; KafkaConsumer::ExceptionsBuffer exceptions_buffer; bool in_use; + UInt64 last_used_usec; std::string rdkafka_stat; }; @@ -113,11 +114,20 @@ public: rdkafka_stat = stat_json_string; } void inUse() { in_use = true; } - void notInUse() { in_use = false; } + void notInUse() + { + in_use = false; + last_used_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + } // For system.kafka_consumers Stat getStat() const; + bool isInUse() const { return in_use; } + UInt64 getLastUsedUsec() const { return last_used_usec; } + + std::string getMemberId() const; + private: using Messages = std::vector; CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers}; @@ -168,6 +178,8 @@ private: std::atomic num_rebalance_assignments = 0; std::atomic num_rebalance_revocations = 0; std::atomic in_use = 0; + /// Last used time (for TTL) + std::atomic last_used_usec = 0; mutable std::mutex rdkafka_stat_mutex; std::string rdkafka_stat; @@ -178,8 +190,6 @@ private: /// Return number of messages with an error. size_t filterMessageErrors(); ReadBufferPtr getNextMessage(); - - std::string getMemberId() const; }; } diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 56a2ed81bfd..4496d4a4450 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -76,6 +77,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int QUERY_NOT_ALLOWED; + extern const int ABORTED; } struct StorageKafkaInterceptors @@ -262,7 +264,6 @@ StorageKafka::StorageKafka( , schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info)) , num_consumers(kafka_settings->kafka_num_consumers.value) , log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")")) - , semaphore(0, static_cast(num_consumers)) , intermediate_commit(kafka_settings->kafka_commit_every_batch.value) , settings_adjustments(createSettingsAdjustments()) , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) @@ -343,8 +344,8 @@ Pipe StorageKafka::read( size_t /* max_block_size */, size_t /* num_streams */) { - if (all_consumers.empty()) - return {}; + if (shutdown_called) + throw Exception(ErrorCodes::ABORTED, "Table is detached"); if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, @@ -357,12 +358,12 @@ Pipe StorageKafka::read( /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. Pipes pipes; - pipes.reserve(all_consumers.size()); + pipes.reserve(num_consumers); auto modified_context = Context::createCopy(local_context); modified_context->applySettingsChanges(settings_adjustments); // Claim as many consumers as requested, but don't block - for (size_t i = 0; i < all_consumers.size(); ++i) + for (size_t i = 0; i < num_consumers; ++i) { /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block /// TODO: probably that leads to awful performance. @@ -412,19 +413,7 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & void StorageKafka::startup() { - for (size_t i = 0; i < num_consumers; ++i) - { - try - { - auto consumer = createConsumer(i); - pushConsumer(consumer); - all_consumers.push_back(consumer); - } - catch (const cppkafka::Exception &) - { - tryLogCurrentException(log); - } - } + all_consumers.resize(num_consumers); // Start the reader thread for (auto & task : tasks) @@ -438,21 +427,34 @@ void StorageKafka::shutdown(bool) { shutdown_called = true; - for (auto & task : tasks) { - // Interrupt streaming thread - task->stream_cancelled = true; + LOG_TRACE(log, "Waiting for streaming jobs"); + Stopwatch watch; + for (auto & task : tasks) + { + // Interrupt streaming thread + task->stream_cancelled = true; - LOG_TRACE(log, "Waiting for cleanup"); - task->holder->deactivate(); + LOG_TEST(log, "Waiting for cleanup of a task"); + task->holder->deactivate(); + } + LOG_TRACE(log, "Streaming jobs finished in {} ms.", watch.elapsedMilliseconds()); } - LOG_TRACE(log, "Closing consumers"); - for (size_t i = 0; i < all_consumers.size(); ++i) - auto consumer = popConsumer(); - LOG_TRACE(log, "Consumers closed"); + { + std::lock_guard lock(mutex); + LOG_TRACE(log, "Closing {} consumers", consumers.size()); + Stopwatch watch; + consumers.clear(); + LOG_TRACE(log, "Consumers closed. Took {} ms.", watch.elapsedMilliseconds()); + } - rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS); + { + LOG_TRACE(log, "Waiting for final cleanup"); + Stopwatch watch; + rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS); + LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), CLEANUP_TIMEOUT_MS); + } } @@ -461,7 +463,7 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer) std::lock_guard lock(mutex); consumer->notInUse(); consumers.push_back(consumer); - semaphore.set(); + cv.notify_one(); CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1); } @@ -474,22 +476,48 @@ KafkaConsumerPtr StorageKafka::popConsumer() KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout) { - // Wait for the first free buffer - if (timeout == std::chrono::milliseconds::zero()) - semaphore.wait(); + std::unique_lock lock(mutex); + + KafkaConsumerPtr consumer_ptr; + + /// 1. There is consumer available. Return one of them. + if (!consumers.empty()) + { + consumer_ptr = consumers.back(); + consumers.pop_back(); + } else { - if (!semaphore.tryWait(timeout.count())) - return nullptr; + auto expired_consumer = std::find_if(all_consumers.begin(), all_consumers.end(), [](const auto & consumer_weak_ptr) + { + return consumer_weak_ptr.expired(); + }); + + /// 2. There is no consumer, but we can create a new one. + if (expired_consumer != all_consumers.end()) + { + size_t consumer_number = std::distance(all_consumers.begin(), expired_consumer); + /// It should be OK to create consumer under lock, since it should be fast (without subscribing). + consumer_ptr = createConsumer(consumer_number); + *expired_consumer = consumer_ptr; + } + /// 3. There is no consumer and num_consumers already created, waiting @timeout. + else + { + if (cv.wait_for(lock, timeout, [&]() { return !consumers.empty(); })) + { + consumer_ptr = consumers.back(); + consumers.pop_back(); + } + } } - // Take the first available buffer from the list - std::lock_guard lock(mutex); - auto consumer = consumers.back(); - consumers.pop_back(); - CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1); - consumer->inUse(); - return consumer; + if (consumer_ptr) + { + CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1); + consumer_ptr->inUse(); + } + return consumer_ptr; } @@ -545,10 +573,59 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) { kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics); } + LOG_TRACE(log, "Created #{} consumer", consumer_number); + *consumer_weak_ptr_ptr = kafka_consumer_ptr; return kafka_consumer_ptr; } +void StorageKafka::cleanConsumers() +{ + static const UInt64 CONSUMER_TTL_USEC = 60'000'000; + UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + /// Copy consumers for closing to a new vector to close them without a lock + std::vector consumers_to_close; + + { + std::lock_guard lock(mutex); + + for (auto it = consumers.begin(); it != consumers.end();) + { + auto & consumer_ptr = *it; + + UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); + chassert(consumer_last_used_usec <= now_usec); + if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC) + { + /// We need this only to get the consumer number. + auto weak_it = std::find_if(all_consumers.begin(), all_consumers.end(), [&](const auto & consume_weak_ptr) + { + return consumer_ptr == consume_weak_ptr.lock(); + }); + chassert(weak_it != all_consumers.end()); + size_t consumer_number = std::distance(all_consumers.begin(), weak_it); + + LOG_TRACE(log, "Closing #{} consumer (id: {})", consumer_number, consumer_ptr->getMemberId()); + + consumers_to_close.push_back(std::move(consumer_ptr)); + it = consumers.erase(it); + } + else + ++it; + } + } + + if (!consumers_to_close.empty()) + { + Stopwatch watch; + size_t closed = consumers_to_close.size(); + consumers_to_close.clear(); + LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", + closed, CONSUMER_TTL_USEC, watch.elapsedMilliseconds()); + } +} + size_t StorageKafka::getMaxBlockSize() const { return kafka_settings->kafka_max_block_size.changed @@ -806,6 +883,8 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(false); + cleanConsumers(); + // Wait for attached views if (!task->stream_cancelled) task->holder->scheduleAfter(RESCHEDULE_MS); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 66f92a2df88..9395c4ff68e 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -102,7 +103,6 @@ private: const String schema_name; const size_t num_consumers; /// total number of consumers Poco::Logger * log; - Poco::Semaphore semaphore; const bool intermediate_commit; const SettingsChanges settings_adjustments; @@ -112,6 +112,7 @@ private: std::vector all_consumers; /// busy (belong to a KafkaSource) and vacant consumers std::mutex mutex; + std::condition_variable cv; // Stream thread struct TaskContext @@ -157,6 +158,7 @@ private: bool streamToViews(); bool checkDependencies(const StorageID & table_id); + void cleanConsumers(); }; } diff --git a/src/Storages/System/StorageSystemKafkaConsumers.cpp b/src/Storages/System/StorageSystemKafkaConsumers.cpp index eb7d84603c0..640b4de4c7e 100644 --- a/src/Storages/System/StorageSystemKafkaConsumers.cpp +++ b/src/Storages/System/StorageSystemKafkaConsumers.cpp @@ -41,6 +41,7 @@ NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes() {"num_rebalance_revocations", std::make_shared()}, {"num_rebalance_assignments", std::make_shared()}, {"is_currently_used", std::make_shared()}, + {"last_used", std::make_shared(6)}, {"rdkafka_stat", std::make_shared()}, }; return names_and_types; @@ -78,6 +79,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context auto & num_rebalance_revocations = assert_cast(*res_columns[index++]); auto & num_rebalance_assigments = assert_cast(*res_columns[index++]); auto & is_currently_used = assert_cast(*res_columns[index++]); + auto & last_used = assert_cast(*res_columns[index++]); auto & rdkafka_stat = assert_cast(*res_columns[index++]); const auto access = context->getAccess(); @@ -144,6 +146,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); is_currently_used.insert(consumer_stat.in_use); + last_used.insert(consumer_stat.last_used_usec); rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); } From db745499404716b484ececb9e66833ef789db78b Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 14 Dec 2023 12:15:24 +0100 Subject: [PATCH 04/11] Enable stats for system.kafka_consumers back by default Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 4496d4a4450..402e5d2fab2 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -746,12 +746,10 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config, /// materialized view attached. /// /// So for now it is disabled by default, until properly fixed. -#if 0 if (!config.has(config_prefix + "." + "statistics_interval_ms")) { kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. } -#endif if (kafka_config.get("statistics.interval.ms") != "0") { From 2ff0bfb0a1e920dccc9ec0a178614c3edbbd7b58 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 15 Dec 2023 11:13:18 +0100 Subject: [PATCH 05/11] Preserve KafkaConsumer objects This will make system.kafka_consumers more useful, since after TTL consumer object will be removed prio this patch, but after, all information will be preserved. Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/KafkaConsumer.cpp | 42 +++- src/Storages/Kafka/KafkaConsumer.h | 7 +- src/Storages/Kafka/StorageKafka.cpp | 222 +++++++++--------- src/Storages/Kafka/StorageKafka.h | 30 +-- .../System/StorageSystemKafkaConsumers.cpp | 91 ++++--- 5 files changed, 209 insertions(+), 183 deletions(-) diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index c01c9564f4b..4a26e790148 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -46,15 +46,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms; KafkaConsumer::KafkaConsumer( - ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_, const std::atomic & stopped_, const Names & _topics) - : consumer(consumer_) - , log(log_) + : log(log_) , batch_size(max_batch_size) , poll_timeout(poll_timeout_) , intermediate_commit(intermediate_commit_) @@ -63,6 +61,12 @@ KafkaConsumer::KafkaConsumer( , topics(_topics) , exceptions_buffer(EXCEPTIONS_DEPTH) { +} + +void KafkaConsumer::setConsumer(const ConsumerPtr & consumer_) +{ + consumer = consumer_; + // called (synchronously, during poll) when we enter the consumer group consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions) { @@ -135,6 +139,9 @@ KafkaConsumer::KafkaConsumer( KafkaConsumer::~KafkaConsumer() { + if (!consumer) + return; + try { if (!consumer->get_subscription().empty()) @@ -568,6 +575,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr */ std::string KafkaConsumer::getMemberId() const { + if (!consumer) + return ""; + char * memberid_ptr = rd_kafka_memberid(consumer->get_handle()); std::string memberid_string = memberid_ptr; rd_kafka_mem_free(nullptr, memberid_ptr); @@ -578,8 +588,14 @@ std::string KafkaConsumer::getMemberId() const KafkaConsumer::Stat KafkaConsumer::getStat() const { KafkaConsumer::Stat::Assignments assignments; - auto cpp_assignments = consumer->get_assignment(); - auto cpp_offsets = consumer->get_offsets_position(cpp_assignments); + cppkafka::TopicPartitionList cpp_assignments; + cppkafka::TopicPartitionList cpp_offsets; + + if (consumer) + { + cpp_assignments = consumer->get_assignment(); + cpp_offsets = consumer->get_offsets_position(cpp_assignments); + } for (size_t num = 0; num < cpp_assignments.size(); ++num) { @@ -591,7 +607,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const } return { - .consumer_id = getMemberId() /* consumer->get_member_id() */ , + .consumer_id = getMemberId(), .assignments = std::move(assignments), .last_poll_time = last_poll_timestamp_usec.load(), .num_messages_read = num_messages_read.load(), @@ -601,12 +617,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const .num_commits = num_commits.load(), .num_rebalance_assignments = num_rebalance_assignments.load(), .num_rebalance_revocations = num_rebalance_revocations.load(), - .exceptions_buffer = [&](){std::lock_guard lock(exception_mutex); - return exceptions_buffer;}(), + .exceptions_buffer = [&]() + { + std::lock_guard lock(exception_mutex); + return exceptions_buffer; + }(), .in_use = in_use.load(), .last_used_usec = last_used_usec.load(), - .rdkafka_stat = [&](){std::lock_guard lock(rdkafka_stat_mutex); - return rdkafka_stat;}(), + .rdkafka_stat = [&]() + { + std::lock_guard lock(rdkafka_stat_mutex); + return rdkafka_stat; + }(), }; } diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index ecd201f5a6e..2fd9482bd35 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -61,9 +61,7 @@ public: std::string rdkafka_stat; }; -public: KafkaConsumer( - ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, @@ -73,6 +71,11 @@ public: ); ~KafkaConsumer(); + + void setConsumer(const ConsumerPtr & consumer); + bool hasConsumer() const { return consumer.get() != nullptr; } + ConsumerPtr && moveConsumer() { return std::move(consumer); } + void commit(); // Commit all processed messages. void subscribe(); // Subscribe internal consumer to topics. void unsubscribe(); // Unsubscribe internal consumer in case of failure. diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 402e5d2fab2..8e442703567 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -413,7 +414,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & void StorageKafka::startup() { - all_consumers.resize(num_consumers); + consumers.resize(num_consumers); + for (size_t i = 0; i < num_consumers; ++i) + consumers[i] = createKafkaConsumer(i); // Start the reader thread for (auto & task : tasks) @@ -462,7 +465,6 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer) { std::lock_guard lock(mutex); consumer->notInUse(); - consumers.push_back(consumer); cv.notify_one(); CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1); } @@ -478,50 +480,106 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout) { std::unique_lock lock(mutex); - KafkaConsumerPtr consumer_ptr; - - /// 1. There is consumer available. Return one of them. - if (!consumers.empty()) + KafkaConsumerPtr ret_consumer_ptr; + std::optional closed_consumer_index; + for (size_t i = 0; i < consumers.size(); ++i) { - consumer_ptr = consumers.back(); - consumers.pop_back(); + auto & consumer_ptr = consumers[i]; + + if (consumer_ptr->isInUse()) + continue; + + if (consumer_ptr->hasConsumer()) + { + ret_consumer_ptr = consumer_ptr; + break; + } + + if (!closed_consumer_index.has_value() && !consumer_ptr->hasConsumer()) + { + closed_consumer_index = i; + } } + + /// 1. There is consumer available - return it. + if (ret_consumer_ptr) + { + /// Noop + } + /// 2. There is no consumer, but we can create a new one. + else if (!ret_consumer_ptr && closed_consumer_index.has_value()) + { + ret_consumer_ptr = consumers[*closed_consumer_index]; + /// It should be OK to create consumer under lock, since it should be fast (without subscribing). + ret_consumer_ptr->setConsumer(createConsumer(*ret_consumer_ptr, *closed_consumer_index)); + } + /// 3. There is no free consumer and num_consumers already created, waiting @timeout. else { - auto expired_consumer = std::find_if(all_consumers.begin(), all_consumers.end(), [](const auto & consumer_weak_ptr) + cv.wait_for(lock, timeout, [&]() { - return consumer_weak_ptr.expired(); - }); - - /// 2. There is no consumer, but we can create a new one. - if (expired_consumer != all_consumers.end()) - { - size_t consumer_number = std::distance(all_consumers.begin(), expired_consumer); - /// It should be OK to create consumer under lock, since it should be fast (without subscribing). - consumer_ptr = createConsumer(consumer_number); - *expired_consumer = consumer_ptr; - } - /// 3. There is no consumer and num_consumers already created, waiting @timeout. - else - { - if (cv.wait_for(lock, timeout, [&]() { return !consumers.empty(); })) + /// Note we are waiting only opened, free, consumers, since consumer cannot be closed right now + auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr) { - consumer_ptr = consumers.back(); - consumers.pop_back(); + return !ptr->isInUse() && ptr->hasConsumer(); + }); + if (it != consumers.end()) + { + ret_consumer_ptr = *it; + return true; } - } + return false; + }); } - if (consumer_ptr) + if (ret_consumer_ptr) { CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1); - consumer_ptr->inUse(); + ret_consumer_ptr->inUse(); } - return consumer_ptr; + return ret_consumer_ptr; } -KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) +KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number) +{ + /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. + auto & stream_cancelled = thread_per_consumer ? tasks[consumer_number]->stream_cancelled : tasks.back()->stream_cancelled; + + KafkaConsumerPtr kafka_consumer_ptr = std::make_shared( + log, + getPollMaxBatchSize(), + getPollTimeoutMillisecond(), + intermediate_commit, + stream_cancelled, + topics); + return kafka_consumer_ptr; +} + +ConsumerPtr StorageKafka::createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number) +{ + cppkafka::Configuration consumer_config = getConsumerConfiguration(consumer_number); + + /// Using KafkaConsumer by reference should be safe, since + /// cppkafka::Consumer can poll messages (including statistics, which will + /// trigger the callback below) only via KafkaConsumer. + if (consumer_config.get("statistics.interval.ms") != "0") + { + consumer_config.set_stats_callback([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json) + { + kafka_consumer.setRDKafkaStat(stat_json); + }); + } + + auto consumer_ptr = std::make_shared(consumer_config); + consumer_ptr->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); + + LOG_TRACE(log, "Created #{} consumer", consumer_number); + + return consumer_ptr; +} + +cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number) { cppkafka::Configuration conf; @@ -546,37 +604,14 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages)); - /// a reference to the consumer is needed in statistic callback - /// although the consumer does not exist when callback is being registered - /// shared_ptr> comes to the rescue - auto consumer_weak_ptr_ptr = std::make_shared(); - updateConfiguration(conf, consumer_weak_ptr_ptr); + updateConfiguration(conf); // those settings should not be changed by users. conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. conf.set("enable.partition.eof", "false"); // Ignore EOF messages - // Create a consumer and subscribe to topics - auto consumer_impl = std::make_shared(conf); - consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); - - KafkaConsumerPtr kafka_consumer_ptr; - - /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. - if (thread_per_consumer) - { - auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; - kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); - } - else - { - kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics); - } - LOG_TRACE(log, "Created #{} consumer", consumer_number); - - *consumer_weak_ptr_ptr = kafka_consumer_ptr; - return kafka_consumer_ptr; + return conf; } void StorageKafka::cleanConsumers() @@ -585,34 +620,28 @@ void StorageKafka::cleanConsumers() UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); /// Copy consumers for closing to a new vector to close them without a lock - std::vector consumers_to_close; + std::vector consumers_to_close; { std::lock_guard lock(mutex); - for (auto it = consumers.begin(); it != consumers.end();) + for (size_t i = 0; i < consumers.size(); ++i) { - auto & consumer_ptr = *it; + auto & consumer_ptr = consumers[i]; UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); chassert(consumer_last_used_usec <= now_usec); + + if (!consumer_ptr->hasConsumer()) + continue; + if (consumer_ptr->isInUse()) + continue; + if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC) { - /// We need this only to get the consumer number. - auto weak_it = std::find_if(all_consumers.begin(), all_consumers.end(), [&](const auto & consume_weak_ptr) - { - return consumer_ptr == consume_weak_ptr.lock(); - }); - chassert(weak_it != all_consumers.end()); - size_t consumer_number = std::distance(all_consumers.begin(), weak_it); - - LOG_TRACE(log, "Closing #{} consumer (id: {})", consumer_number, consumer_ptr->getMemberId()); - - consumers_to_close.push_back(std::move(consumer_ptr)); - it = consumers.erase(it); + LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); + consumers_to_close.push_back(consumer_ptr->moveConsumer()); } - else - ++it; } } @@ -656,8 +685,7 @@ String StorageKafka::getConfigPrefix() const return CONFIG_KAFKA_TAG; } -void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config, - std::shared_ptr kafka_consumer_weak_ptr_ptr) +void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) { // Update consumer configuration from the configuration. Example: // @@ -737,31 +765,16 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config, LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message); }); - if (kafka_consumer_weak_ptr_ptr) + /// NOTE: statistics should be consumed, otherwise it creates too much + /// entries in the queue, that leads to memory leak and slow shutdown. + /// + /// This is the case when you have kafka table but no SELECT from it or + /// materialized view attached. + /// + /// So for now it is disabled by default, until properly fixed. + if (!config.has(config_prefix + "." + "statistics_interval_ms")) { - /// NOTE: statistics should be consumed, otherwise it creates too much - /// entries in the queue, that leads to memory leak and slow shutdown. - /// - /// This is the case when you have kafka table but no SELECT from it or - /// materialized view attached. - /// - /// So for now it is disabled by default, until properly fixed. - if (!config.has(config_prefix + "." + "statistics_interval_ms")) - { - kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. - } - - if (kafka_config.get("statistics.interval.ms") != "0") - { - kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string) - { - auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock(); - if (kafka_consumer_ptr) - { - kafka_consumer_ptr->setRDKafkaStat(stat_json_string); - } - }); - } + kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. } // Configure interceptor to change thread name @@ -832,7 +845,7 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(true); // Keep streaming as long as there are attached views and streaming is not cancelled - while (!task->stream_cancelled && !all_consumers.empty()) + while (!task->stream_cancelled) { if (!checkDependencies(table_id)) break; @@ -869,13 +882,10 @@ void StorageKafka::threadFunc(size_t idx) LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str); auto safe_consumers = getSafeConsumers(); - for (auto const & consumer_ptr_weak : safe_consumers.consumers) + for (auto const & consumer_ptr : safe_consumers.consumers) { /// propagate materialized view exception to all consumers - if (auto consumer_ptr = consumer_ptr_weak.lock()) - { - consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */); - } + consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */); } } @@ -922,7 +932,7 @@ bool StorageKafka::streamToViews() std::vector> sources; Pipes pipes; - auto stream_count = thread_per_consumer ? 1 : all_consumers.size(); + auto stream_count = thread_per_consumer ? 1 : num_consumers; sources.reserve(stream_count); pipes.reserve(stream_count); for (size_t i = 0; i < stream_count; ++i) diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 9395c4ff68e..84f4022790f 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -13,13 +13,7 @@ #include #include #include - -namespace cppkafka -{ - -class Configuration; - -} +#include namespace DB { @@ -29,7 +23,7 @@ class StorageSystemKafkaConsumers; struct StorageKafkaInterceptors; using KafkaConsumerPtr = std::shared_ptr; -using KafkaConsumerWeakPtr = std::weak_ptr; +using ConsumerPtr = std::shared_ptr; /** Implements a Kafka queue table engine that can be used as a persistent queue / buffer, * or as a basic building block for creating pipelines with a continuous insertion / ETL. @@ -85,10 +79,10 @@ public: { std::shared_ptr storage_ptr; std::unique_lock lock; - std::vector & consumers; + std::vector & consumers; }; - SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; } + SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; } private: // Configuration and state @@ -108,8 +102,7 @@ private: std::atomic mv_attached = false; - std::vector consumers; /// available consumers - std::vector all_consumers; /// busy (belong to a KafkaSource) and vacant consumers + std::vector consumers; std::mutex mutex; std::condition_variable cv; @@ -131,7 +124,12 @@ private: std::list> thread_statuses; SettingsChanges createSettingsAdjustments(); - KafkaConsumerPtr createConsumer(size_t consumer_number); + /// Creates KafkaConsumer object without real consumer (cppkafka::Consumer) + KafkaConsumerPtr createKafkaConsumer(size_t consumer_number); + /// Creates real cppkafka::Consumer object + ConsumerPtr createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number); + /// Returns consumer configuration with all changes that had been overwritten in config + cppkafka::Configuration getConsumerConfiguration(size_t consumer_number); /// If named_collection is specified. String collection_name; @@ -139,11 +137,7 @@ private: std::atomic shutdown_called = false; // Update Kafka configuration with values from CH user configuration. - void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr); - void updateConfiguration(cppkafka::Configuration & kafka_config) - { - updateConfiguration(kafka_config, std::make_shared()); - } + void updateConfiguration(cppkafka::Configuration & kafka_config); String getConfigPrefix() const; void threadFunc(size_t idx); diff --git a/src/Storages/System/StorageSystemKafkaConsumers.cpp b/src/Storages/System/StorageSystemKafkaConsumers.cpp index 640b4de4c7e..e333f6e2c15 100644 --- a/src/Storages/System/StorageSystemKafkaConsumers.cpp +++ b/src/Storages/System/StorageSystemKafkaConsumers.cpp @@ -98,58 +98,55 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context auto safe_consumers = storage_kafka_ptr->getSafeConsumers(); - for (const auto & weak_consumer : safe_consumers.consumers) + for (const auto & consumer : safe_consumers.consumers) { - if (auto consumer = weak_consumer.lock()) + auto consumer_stat = consumer->getStat(); + + database.insertData(database_str.data(), database_str.size()); + table.insertData(table_str.data(), table_str.size()); + + consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size()); + + const auto num_assignnemts = consumer_stat.assignments.size(); + + for (size_t num = 0; num < num_assignnemts; ++num) { - auto consumer_stat = consumer->getStat(); + const auto & assign = consumer_stat.assignments[num]; - database.insertData(database_str.data(), database_str.size()); - table.insertData(table_str.data(), table_str.size()); + assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size()); - consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size()); - - const auto num_assignnemts = consumer_stat.assignments.size(); - - for (size_t num = 0; num < num_assignnemts; ++num) - { - const auto & assign = consumer_stat.assignments[num]; - - assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size()); - - assigments_partition_id.insert(assign.partition_id); - assigments_current_offset.insert(assign.current_offset); - } - last_assignment_num += num_assignnemts; - - assigments_topics_offsets.push_back(last_assignment_num); - assigments_partition_id_offsets.push_back(last_assignment_num); - assigments_current_offset_offsets.push_back(last_assignment_num); - - for (const auto & exc : consumer_stat.exceptions_buffer) - { - exceptions_text.insertData(exc.text.data(), exc.text.size()); - exceptions_time.insert(exc.timestamp_usec); - } - exceptions_num += consumer_stat.exceptions_buffer.size(); - exceptions_text_offset.push_back(exceptions_num); - exceptions_time_offset.push_back(exceptions_num); - - - last_poll_time.insert(consumer_stat.last_poll_time); - num_messages_read.insert(consumer_stat.num_messages_read); - last_commit_time.insert(consumer_stat.last_commit_timestamp_usec); - num_commits.insert(consumer_stat.num_commits); - last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec); - - num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations); - num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); - - is_currently_used.insert(consumer_stat.in_use); - last_used.insert(consumer_stat.last_used_usec); - - rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); + assigments_partition_id.insert(assign.partition_id); + assigments_current_offset.insert(assign.current_offset); } + last_assignment_num += num_assignnemts; + + assigments_topics_offsets.push_back(last_assignment_num); + assigments_partition_id_offsets.push_back(last_assignment_num); + assigments_current_offset_offsets.push_back(last_assignment_num); + + for (const auto & exc : consumer_stat.exceptions_buffer) + { + exceptions_text.insertData(exc.text.data(), exc.text.size()); + exceptions_time.insert(exc.timestamp_usec); + } + exceptions_num += consumer_stat.exceptions_buffer.size(); + exceptions_text_offset.push_back(exceptions_num); + exceptions_time_offset.push_back(exceptions_num); + + + last_poll_time.insert(consumer_stat.last_poll_time); + num_messages_read.insert(consumer_stat.num_messages_read); + last_commit_time.insert(consumer_stat.last_commit_timestamp_usec); + num_commits.insert(consumer_stat.num_commits); + last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec); + + num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations); + num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); + + is_currently_used.insert(consumer_stat.in_use); + last_used.insert(consumer_stat.last_used_usec); + + rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); } }; From b19b70b8fc1fbb6ab0a6dcab20a1dd64fc56592a Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 15 Dec 2023 11:41:26 +0100 Subject: [PATCH 06/11] Add ability to configure TTL for kafka consumers Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/KafkaSettings.cpp | 12 ++++++++++++ src/Storages/Kafka/KafkaSettings.h | 9 +++++++++ src/Storages/Kafka/StorageKafka.cpp | 21 ++++++++++----------- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/Storages/Kafka/KafkaSettings.cpp b/src/Storages/Kafka/KafkaSettings.cpp index 6ef74511d83..8e6883736dd 100644 --- a/src/Storages/Kafka/KafkaSettings.cpp +++ b/src/Storages/Kafka/KafkaSettings.cpp @@ -11,6 +11,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_SETTING; + extern const int BAD_ARGUMENTS; } IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS) @@ -38,4 +39,15 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def) } } +void KafkaSettings::sanityCheck() const +{ + if (kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})", + kafka_consumers_pool_ttl_ms, KAFKA_RESCHEDULE_MS); + + if (kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be too big (greater then {}), since this may cause live memory leaks", + kafka_consumers_pool_ttl_ms, KAFKA_CONSUMERS_POOL_TTL_MS_MAX); +} + } diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index 075e79c96f0..0addaf9e3b3 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -8,6 +8,12 @@ namespace DB { class ASTStorage; +const auto KAFKA_RESCHEDULE_MS = 500; +const auto KAFKA_CLEANUP_TIMEOUT_MS = 3000; +// once per minute leave do reschedule (we can't lock threads in pool forever) +const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000; +// 10min +const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000; #define KAFKA_RELATED_SETTINGS(M, ALIAS) \ M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \ @@ -25,6 +31,7 @@ class ASTStorage; /* default is stream_poll_timeout_ms */ \ M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \ M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \ + M(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \ /* default is stream_flush_interval_ms */ \ M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \ M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ @@ -53,6 +60,8 @@ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS) struct KafkaSettings : public BaseSettings { void loadFromQuery(ASTStorage & storage_def); + + void sanityCheck() const; }; } diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 8e442703567..84ace9a880e 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -175,10 +175,6 @@ struct StorageKafkaInterceptors namespace { - const auto RESCHEDULE_MS = 500; - const auto CLEANUP_TIMEOUT_MS = 3000; - const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever) - const String CONFIG_KAFKA_TAG = "kafka"; const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic"; const String CONFIG_NAME_TAG = "name"; @@ -270,11 +266,14 @@ StorageKafka::StorageKafka( , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) , collection_name(collection_name_) { + kafka_settings->sanityCheck(); + if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; } + StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); @@ -455,8 +454,8 @@ void StorageKafka::shutdown(bool) { LOG_TRACE(log, "Waiting for final cleanup"); Stopwatch watch; - rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS); - LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), CLEANUP_TIMEOUT_MS); + rd_kafka_wait_destroyed(KAFKA_CLEANUP_TIMEOUT_MS); + LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), KAFKA_CLEANUP_TIMEOUT_MS); } } @@ -616,7 +615,7 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n void StorageKafka::cleanConsumers() { - static const UInt64 CONSUMER_TTL_USEC = 60'000'000; + UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000; UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); /// Copy consumers for closing to a new vector to close them without a lock @@ -637,7 +636,7 @@ void StorageKafka::cleanConsumers() if (consumer_ptr->isInUse()) continue; - if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC) + if (now_usec - consumer_last_used_usec > ttl_usec) { LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); consumers_to_close.push_back(consumer_ptr->moveConsumer()); @@ -651,7 +650,7 @@ void StorageKafka::cleanConsumers() size_t closed = consumers_to_close.size(); consumers_to_close.clear(); LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", - closed, CONSUMER_TTL_USEC, watch.elapsedMilliseconds()); + closed, ttl_usec, watch.elapsedMilliseconds()); } } @@ -862,7 +861,7 @@ void StorageKafka::threadFunc(size_t idx) auto ts = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(ts-start_time); - if (duration.count() > MAX_THREAD_WORK_DURATION_MS) + if (duration.count() > KAFKA_MAX_THREAD_WORK_DURATION_MS) { LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule."); break; @@ -895,7 +894,7 @@ void StorageKafka::threadFunc(size_t idx) // Wait for attached views if (!task->stream_cancelled) - task->holder->scheduleAfter(RESCHEDULE_MS); + task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS); } From a7453f7f14a22fb2a6d44f34ac18b2f24ba3fcff Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 15 Dec 2023 16:08:27 +0100 Subject: [PATCH 07/11] Allow setThreadName() to truncate thread name instead of throw an error Signed-off-by: Azat Khuzhin --- src/Common/setThreadName.cpp | 20 +++++++++++++------- src/Common/setThreadName.h | 4 +++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/Common/setThreadName.cpp b/src/Common/setThreadName.cpp index e14abb247f3..aae80272206 100644 --- a/src/Common/setThreadName.cpp +++ b/src/Common/setThreadName.cpp @@ -28,25 +28,31 @@ namespace ErrorCodes static thread_local char thread_name[THREAD_NAME_SIZE]{}; -void setThreadName(const char * name) +void setThreadName(const char * name, bool truncate) { - if (strlen(name) > THREAD_NAME_SIZE - 1) + size_t name_len = strlen(name); + if (!truncate && name_len > THREAD_NAME_SIZE - 1) throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes"); + size_t name_capped_len = std::min(1 + name_len, THREAD_NAME_SIZE - 1); + char name_capped[THREAD_NAME_SIZE]; + memcpy(name_capped, name, name_capped_len); + name_capped[name_capped_len] = '\0'; + #if defined(OS_FREEBSD) - pthread_set_name_np(pthread_self(), name); + pthread_set_name_np(pthread_self(), name_capped); if ((false)) #elif defined(OS_DARWIN) - if (0 != pthread_setname_np(name)) + if (0 != pthread_setname_np(name_capped)) #elif defined(OS_SUNOS) - if (0 != pthread_setname_np(pthread_self(), name)) + if (0 != pthread_setname_np(pthread_self(), name_capped)) #else - if (0 != prctl(PR_SET_NAME, name, 0, 0, 0)) + if (0 != prctl(PR_SET_NAME, name_capped, 0, 0, 0)) #endif if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments. throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)"); - memcpy(thread_name, name, std::min(1 + strlen(name), THREAD_NAME_SIZE - 1)); + memcpy(thread_name, name_capped, name_capped_len); } const char * getThreadName() diff --git a/src/Common/setThreadName.h b/src/Common/setThreadName.h index 1834ea9696f..fdb2717925f 100644 --- a/src/Common/setThreadName.h +++ b/src/Common/setThreadName.h @@ -4,7 +4,9 @@ /** Sets the thread name (maximum length is 15 bytes), * which will be visible in ps, gdb, /proc, * for convenience of observation and debugging. + * + * @param truncate - if true, will truncate to 15 automatically, otherwise throw */ -void setThreadName(const char * name); +void setThreadName(const char * name, bool truncate = false); const char * getThreadName(); From 06a9e9a9ca46f9a4dfaf53ae5dad4204eb2c99bd Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 15 Dec 2023 16:08:54 +0100 Subject: [PATCH 08/11] Use separate thread for kafka consumers cleanup Since pool may exceed threads, while we need to run this thread always to avoid memory leaking. And this should not be a problem since librdkafka has multiple threads for each consumer (5!) anyway. Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 88 +++++++++++++++++++---------- src/Storages/Kafka/StorageKafka.h | 4 ++ 2 files changed, 63 insertions(+), 29 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 84ace9a880e..07acb15b277 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #if USE_KRB5 #include @@ -284,6 +285,14 @@ StorageKafka::StorageKafka( task->deactivate(); tasks.emplace_back(std::make_shared(std::move(task))); } + + cleanup_thread = std::make_unique([this]() + { + const auto & table = getStorageID().getTableName(); + const auto & thread_name = std::string("KfkCln:") + table; + setThreadName(thread_name.c_str(), /*truncate=*/ true); + cleanConsumers(); + }); } SettingsChanges StorageKafka::createSettingsAdjustments() @@ -428,6 +437,18 @@ void StorageKafka::startup() void StorageKafka::shutdown(bool) { shutdown_called = true; + cleanup_cv.notify_one(); + + { + LOG_TRACE(log, "Waiting for consumers cleanup thread"); + Stopwatch watch; + if (cleanup_thread) + { + cleanup_thread->join(); + cleanup_thread.reset(); + } + LOG_TRACE(log, "Consumers cleanup thread finished in {} ms.", watch.elapsedMilliseconds()); + } { LOG_TRACE(log, "Waiting for streaming jobs"); @@ -616,42 +637,53 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n void StorageKafka::cleanConsumers() { UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000; - UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - /// Copy consumers for closing to a new vector to close them without a lock - std::vector consumers_to_close; + std::unique_lock lock(mutex); + std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS); + while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; })) { - std::lock_guard lock(mutex); + /// Copy consumers for closing to a new vector to close them without a lock + std::vector consumers_to_close; - for (size_t i = 0; i < consumers.size(); ++i) + UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); { - auto & consumer_ptr = consumers[i]; - - UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); - chassert(consumer_last_used_usec <= now_usec); - - if (!consumer_ptr->hasConsumer()) - continue; - if (consumer_ptr->isInUse()) - continue; - - if (now_usec - consumer_last_used_usec > ttl_usec) + for (size_t i = 0; i < consumers.size(); ++i) { - LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); - consumers_to_close.push_back(consumer_ptr->moveConsumer()); + auto & consumer_ptr = consumers[i]; + + UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); + chassert(consumer_last_used_usec <= now_usec); + + if (!consumer_ptr->hasConsumer()) + continue; + if (consumer_ptr->isInUse()) + continue; + + if (now_usec - consumer_last_used_usec > ttl_usec) + { + LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); + consumers_to_close.push_back(consumer_ptr->moveConsumer()); + } } } + + if (!consumers_to_close.empty()) + { + lock.unlock(); + + Stopwatch watch; + size_t closed = consumers_to_close.size(); + consumers_to_close.clear(); + LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", + closed, ttl_usec, watch.elapsedMilliseconds()); + + lock.lock(); + } + + ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000; } - if (!consumers_to_close.empty()) - { - Stopwatch watch; - size_t closed = consumers_to_close.size(); - consumers_to_close.clear(); - LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.", - closed, ttl_usec, watch.elapsedMilliseconds()); - } + LOG_TRACE(log, "Consumers cleanup thread finished"); } size_t StorageKafka::getMaxBlockSize() const @@ -890,8 +922,6 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(false); - cleanConsumers(); - // Wait for attached views if (!task->stream_cancelled) task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 84f4022790f..20f42f9e7df 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -106,6 +107,7 @@ private: std::mutex mutex; std::condition_variable cv; + std::condition_variable cleanup_cv; // Stream thread struct TaskContext @@ -119,6 +121,8 @@ private: std::vector> tasks; bool thread_per_consumer = false; + std::unique_ptr cleanup_thread; + /// For memory accounting in the librdkafka threads. std::mutex thread_statuses_mutex; std::list> thread_statuses; From 1f03a210335b6c394698346fbeff1b00bd8653aa Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 15 Dec 2023 16:35:21 +0100 Subject: [PATCH 09/11] Update comment for statistics.interval.ms librdkafka option Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 07acb15b277..270e31d2787 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -798,14 +798,10 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) /// NOTE: statistics should be consumed, otherwise it creates too much /// entries in the queue, that leads to memory leak and slow shutdown. - /// - /// This is the case when you have kafka table but no SELECT from it or - /// materialized view attached. - /// - /// So for now it is disabled by default, until properly fixed. if (!config.has(config_prefix + "." + "statistics_interval_ms")) { - kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. + // every 3 seconds by default. set to 0 to disable. + kafka_config.set("statistics.interval.ms", "3000"); } // Configure interceptor to change thread name From 03218202d3988682300516e9bbde4b38dc01c8f9 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 21 Dec 2023 14:15:10 +0100 Subject: [PATCH 10/11] Fix data-race between StorageKafka::startup() and cleanConsumers() Actually now we can create consumer object in the ctor, no need to do this in startup(), since consumer now do not connects to kafka. Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/StorageKafka.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 270e31d2787..7fe82996205 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -286,6 +286,10 @@ StorageKafka::StorageKafka( tasks.emplace_back(std::make_shared(std::move(task))); } + consumers.resize(num_consumers); + for (size_t i = 0; i < num_consumers; ++i) + consumers[i] = createKafkaConsumer(i); + cleanup_thread = std::make_unique([this]() { const auto & table = getStorageID().getTableName(); @@ -422,10 +426,6 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & void StorageKafka::startup() { - consumers.resize(num_consumers); - for (size_t i = 0; i < num_consumers; ++i) - consumers[i] = createKafkaConsumer(i); - // Start the reader thread for (auto & task : tasks) { From ebad1bf4f3c4af4319faaddfba01c384a9815369 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 21 Dec 2023 14:22:19 +0100 Subject: [PATCH 11/11] Move StorageKafka::createConsumer() into KafkaConsumer Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/KafkaConsumer.cpp | 18 ++++++++++++++++-- src/Storages/Kafka/KafkaConsumer.h | 2 +- src/Storages/Kafka/StorageKafka.cpp | 28 ++++------------------------ src/Storages/Kafka/StorageKafka.h | 2 -- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index 4a26e790148..870877b7207 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -13,6 +13,7 @@ #include #include +#include namespace CurrentMetrics { @@ -63,9 +64,22 @@ KafkaConsumer::KafkaConsumer( { } -void KafkaConsumer::setConsumer(const ConsumerPtr & consumer_) +void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config) { - consumer = consumer_; + chassert(!consumer.get()); + + /// Using this should be safe, since cppkafka::Consumer can poll messages + /// (including statistics, which will trigger the callback below) only via + /// KafkaConsumer. + if (consumer_config.get("statistics.interval.ms") != "0") + { + consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json) + { + setRDKafkaStat(stat_json); + }); + } + consumer = std::make_shared(consumer_config); + consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); // called (synchronously, during poll) when we enter the consumer group consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions) diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index 2fd9482bd35..a7820924e98 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -72,7 +72,7 @@ public: ~KafkaConsumer(); - void setConsumer(const ConsumerPtr & consumer); + void createConsumer(cppkafka::Configuration consumer_config); bool hasConsumer() const { return consumer.get() != nullptr; } ConsumerPtr && moveConsumer() { return std::move(consumer); } diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 7fe82996205..1cb810cf8ad 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -530,8 +530,11 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout) else if (!ret_consumer_ptr && closed_consumer_index.has_value()) { ret_consumer_ptr = consumers[*closed_consumer_index]; + + cppkafka::Configuration consumer_config = getConsumerConfiguration(*closed_consumer_index); /// It should be OK to create consumer under lock, since it should be fast (without subscribing). - ret_consumer_ptr->setConsumer(createConsumer(*ret_consumer_ptr, *closed_consumer_index)); + ret_consumer_ptr->createConsumer(consumer_config); + LOG_TRACE(log, "Created #{} consumer", *closed_consumer_index); } /// 3. There is no free consumer and num_consumers already created, waiting @timeout. else @@ -576,29 +579,6 @@ KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number) return kafka_consumer_ptr; } -ConsumerPtr StorageKafka::createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number) -{ - cppkafka::Configuration consumer_config = getConsumerConfiguration(consumer_number); - - /// Using KafkaConsumer by reference should be safe, since - /// cppkafka::Consumer can poll messages (including statistics, which will - /// trigger the callback below) only via KafkaConsumer. - if (consumer_config.get("statistics.interval.ms") != "0") - { - consumer_config.set_stats_callback([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json) - { - kafka_consumer.setRDKafkaStat(stat_json); - }); - } - - auto consumer_ptr = std::make_shared(consumer_config); - consumer_ptr->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); - - LOG_TRACE(log, "Created #{} consumer", consumer_number); - - return consumer_ptr; -} - cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number) { cppkafka::Configuration conf; diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 20f42f9e7df..f60719538cf 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -130,8 +130,6 @@ private: SettingsChanges createSettingsAdjustments(); /// Creates KafkaConsumer object without real consumer (cppkafka::Consumer) KafkaConsumerPtr createKafkaConsumer(size_t consumer_number); - /// Creates real cppkafka::Consumer object - ConsumerPtr createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number); /// Returns consumer configuration with all changes that had been overwritten in config cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);