diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index c01c9564f4b..4a26e790148 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -46,15 +46,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms; KafkaConsumer::KafkaConsumer( - ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_, const std::atomic & stopped_, const Names & _topics) - : consumer(consumer_) - , log(log_) + : log(log_) , batch_size(max_batch_size) , poll_timeout(poll_timeout_) , intermediate_commit(intermediate_commit_) @@ -63,6 +61,12 @@ KafkaConsumer::KafkaConsumer( , topics(_topics) , exceptions_buffer(EXCEPTIONS_DEPTH) { +} + +void KafkaConsumer::setConsumer(const ConsumerPtr & consumer_) +{ + consumer = consumer_; + // called (synchronously, during poll) when we enter the consumer group consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions) { @@ -135,6 +139,9 @@ KafkaConsumer::KafkaConsumer( KafkaConsumer::~KafkaConsumer() { + if (!consumer) + return; + try { if (!consumer->get_subscription().empty()) @@ -568,6 +575,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr */ std::string KafkaConsumer::getMemberId() const { + if (!consumer) + return ""; + char * memberid_ptr = rd_kafka_memberid(consumer->get_handle()); std::string memberid_string = memberid_ptr; rd_kafka_mem_free(nullptr, memberid_ptr); @@ -578,8 +588,14 @@ std::string KafkaConsumer::getMemberId() const KafkaConsumer::Stat KafkaConsumer::getStat() const { KafkaConsumer::Stat::Assignments assignments; - auto cpp_assignments = consumer->get_assignment(); - auto cpp_offsets = consumer->get_offsets_position(cpp_assignments); + cppkafka::TopicPartitionList cpp_assignments; + cppkafka::TopicPartitionList cpp_offsets; + + if (consumer) + { + cpp_assignments = consumer->get_assignment(); + cpp_offsets = consumer->get_offsets_position(cpp_assignments); + } for (size_t num = 0; num < cpp_assignments.size(); ++num) { @@ -591,7 +607,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const } return { - .consumer_id = getMemberId() /* consumer->get_member_id() */ , + .consumer_id = getMemberId(), .assignments = std::move(assignments), .last_poll_time = last_poll_timestamp_usec.load(), .num_messages_read = num_messages_read.load(), @@ -601,12 +617,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const .num_commits = num_commits.load(), .num_rebalance_assignments = num_rebalance_assignments.load(), .num_rebalance_revocations = num_rebalance_revocations.load(), - .exceptions_buffer = [&](){std::lock_guard lock(exception_mutex); - return exceptions_buffer;}(), + .exceptions_buffer = [&]() + { + std::lock_guard lock(exception_mutex); + return exceptions_buffer; + }(), .in_use = in_use.load(), .last_used_usec = last_used_usec.load(), - .rdkafka_stat = [&](){std::lock_guard lock(rdkafka_stat_mutex); - return rdkafka_stat;}(), + .rdkafka_stat = [&]() + { + std::lock_guard lock(rdkafka_stat_mutex); + return rdkafka_stat; + }(), }; } diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index ecd201f5a6e..2fd9482bd35 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -61,9 +61,7 @@ public: std::string rdkafka_stat; }; -public: KafkaConsumer( - ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, @@ -73,6 +71,11 @@ public: ); ~KafkaConsumer(); + + void setConsumer(const ConsumerPtr & consumer); + bool hasConsumer() const { return consumer.get() != nullptr; } + ConsumerPtr && moveConsumer() { return std::move(consumer); } + void commit(); // Commit all processed messages. void subscribe(); // Subscribe internal consumer to topics. void unsubscribe(); // Unsubscribe internal consumer in case of failure. diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 402e5d2fab2..8e442703567 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -413,7 +414,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & void StorageKafka::startup() { - all_consumers.resize(num_consumers); + consumers.resize(num_consumers); + for (size_t i = 0; i < num_consumers; ++i) + consumers[i] = createKafkaConsumer(i); // Start the reader thread for (auto & task : tasks) @@ -462,7 +465,6 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer) { std::lock_guard lock(mutex); consumer->notInUse(); - consumers.push_back(consumer); cv.notify_one(); CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1); } @@ -478,50 +480,106 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout) { std::unique_lock lock(mutex); - KafkaConsumerPtr consumer_ptr; - - /// 1. There is consumer available. Return one of them. - if (!consumers.empty()) + KafkaConsumerPtr ret_consumer_ptr; + std::optional closed_consumer_index; + for (size_t i = 0; i < consumers.size(); ++i) { - consumer_ptr = consumers.back(); - consumers.pop_back(); + auto & consumer_ptr = consumers[i]; + + if (consumer_ptr->isInUse()) + continue; + + if (consumer_ptr->hasConsumer()) + { + ret_consumer_ptr = consumer_ptr; + break; + } + + if (!closed_consumer_index.has_value() && !consumer_ptr->hasConsumer()) + { + closed_consumer_index = i; + } } + + /// 1. There is consumer available - return it. + if (ret_consumer_ptr) + { + /// Noop + } + /// 2. There is no consumer, but we can create a new one. + else if (!ret_consumer_ptr && closed_consumer_index.has_value()) + { + ret_consumer_ptr = consumers[*closed_consumer_index]; + /// It should be OK to create consumer under lock, since it should be fast (without subscribing). + ret_consumer_ptr->setConsumer(createConsumer(*ret_consumer_ptr, *closed_consumer_index)); + } + /// 3. There is no free consumer and num_consumers already created, waiting @timeout. else { - auto expired_consumer = std::find_if(all_consumers.begin(), all_consumers.end(), [](const auto & consumer_weak_ptr) + cv.wait_for(lock, timeout, [&]() { - return consumer_weak_ptr.expired(); - }); - - /// 2. There is no consumer, but we can create a new one. - if (expired_consumer != all_consumers.end()) - { - size_t consumer_number = std::distance(all_consumers.begin(), expired_consumer); - /// It should be OK to create consumer under lock, since it should be fast (without subscribing). - consumer_ptr = createConsumer(consumer_number); - *expired_consumer = consumer_ptr; - } - /// 3. There is no consumer and num_consumers already created, waiting @timeout. - else - { - if (cv.wait_for(lock, timeout, [&]() { return !consumers.empty(); })) + /// Note we are waiting only opened, free, consumers, since consumer cannot be closed right now + auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr) { - consumer_ptr = consumers.back(); - consumers.pop_back(); + return !ptr->isInUse() && ptr->hasConsumer(); + }); + if (it != consumers.end()) + { + ret_consumer_ptr = *it; + return true; } - } + return false; + }); } - if (consumer_ptr) + if (ret_consumer_ptr) { CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1); - consumer_ptr->inUse(); + ret_consumer_ptr->inUse(); } - return consumer_ptr; + return ret_consumer_ptr; } -KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) +KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number) +{ + /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. + auto & stream_cancelled = thread_per_consumer ? tasks[consumer_number]->stream_cancelled : tasks.back()->stream_cancelled; + + KafkaConsumerPtr kafka_consumer_ptr = std::make_shared( + log, + getPollMaxBatchSize(), + getPollTimeoutMillisecond(), + intermediate_commit, + stream_cancelled, + topics); + return kafka_consumer_ptr; +} + +ConsumerPtr StorageKafka::createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number) +{ + cppkafka::Configuration consumer_config = getConsumerConfiguration(consumer_number); + + /// Using KafkaConsumer by reference should be safe, since + /// cppkafka::Consumer can poll messages (including statistics, which will + /// trigger the callback below) only via KafkaConsumer. + if (consumer_config.get("statistics.interval.ms") != "0") + { + consumer_config.set_stats_callback([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json) + { + kafka_consumer.setRDKafkaStat(stat_json); + }); + } + + auto consumer_ptr = std::make_shared(consumer_config); + consumer_ptr->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); + + LOG_TRACE(log, "Created #{} consumer", consumer_number); + + return consumer_ptr; +} + +cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number) { cppkafka::Configuration conf; @@ -546,37 +604,14 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages)); - /// a reference to the consumer is needed in statistic callback - /// although the consumer does not exist when callback is being registered - /// shared_ptr> comes to the rescue - auto consumer_weak_ptr_ptr = std::make_shared(); - updateConfiguration(conf, consumer_weak_ptr_ptr); + updateConfiguration(conf); // those settings should not be changed by users. conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. conf.set("enable.partition.eof", "false"); // Ignore EOF messages - // Create a consumer and subscribe to topics - auto consumer_impl = std::make_shared(conf); - consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); - - KafkaConsumerPtr kafka_consumer_ptr; - - /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. - if (thread_per_consumer) - { - auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; - kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); - } - else - { - kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics); - } - LOG_TRACE(log, "Created #{} consumer", consumer_number); - - *consumer_weak_ptr_ptr = kafka_consumer_ptr; - return kafka_consumer_ptr; + return conf; } void StorageKafka::cleanConsumers() @@ -585,34 +620,28 @@ void StorageKafka::cleanConsumers() UInt64 now_usec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); /// Copy consumers for closing to a new vector to close them without a lock - std::vector consumers_to_close; + std::vector consumers_to_close; { std::lock_guard lock(mutex); - for (auto it = consumers.begin(); it != consumers.end();) + for (size_t i = 0; i < consumers.size(); ++i) { - auto & consumer_ptr = *it; + auto & consumer_ptr = consumers[i]; UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec(); chassert(consumer_last_used_usec <= now_usec); + + if (!consumer_ptr->hasConsumer()) + continue; + if (consumer_ptr->isInUse()) + continue; + if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC) { - /// We need this only to get the consumer number. - auto weak_it = std::find_if(all_consumers.begin(), all_consumers.end(), [&](const auto & consume_weak_ptr) - { - return consumer_ptr == consume_weak_ptr.lock(); - }); - chassert(weak_it != all_consumers.end()); - size_t consumer_number = std::distance(all_consumers.begin(), weak_it); - - LOG_TRACE(log, "Closing #{} consumer (id: {})", consumer_number, consumer_ptr->getMemberId()); - - consumers_to_close.push_back(std::move(consumer_ptr)); - it = consumers.erase(it); + LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId()); + consumers_to_close.push_back(consumer_ptr->moveConsumer()); } - else - ++it; } } @@ -656,8 +685,7 @@ String StorageKafka::getConfigPrefix() const return CONFIG_KAFKA_TAG; } -void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config, - std::shared_ptr kafka_consumer_weak_ptr_ptr) +void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) { // Update consumer configuration from the configuration. Example: // @@ -737,31 +765,16 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config, LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message); }); - if (kafka_consumer_weak_ptr_ptr) + /// NOTE: statistics should be consumed, otherwise it creates too much + /// entries in the queue, that leads to memory leak and slow shutdown. + /// + /// This is the case when you have kafka table but no SELECT from it or + /// materialized view attached. + /// + /// So for now it is disabled by default, until properly fixed. + if (!config.has(config_prefix + "." + "statistics_interval_ms")) { - /// NOTE: statistics should be consumed, otherwise it creates too much - /// entries in the queue, that leads to memory leak and slow shutdown. - /// - /// This is the case when you have kafka table but no SELECT from it or - /// materialized view attached. - /// - /// So for now it is disabled by default, until properly fixed. - if (!config.has(config_prefix + "." + "statistics_interval_ms")) - { - kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. - } - - if (kafka_config.get("statistics.interval.ms") != "0") - { - kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string) - { - auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock(); - if (kafka_consumer_ptr) - { - kafka_consumer_ptr->setRDKafkaStat(stat_json_string); - } - }); - } + kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable. } // Configure interceptor to change thread name @@ -832,7 +845,7 @@ void StorageKafka::threadFunc(size_t idx) mv_attached.store(true); // Keep streaming as long as there are attached views and streaming is not cancelled - while (!task->stream_cancelled && !all_consumers.empty()) + while (!task->stream_cancelled) { if (!checkDependencies(table_id)) break; @@ -869,13 +882,10 @@ void StorageKafka::threadFunc(size_t idx) LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str); auto safe_consumers = getSafeConsumers(); - for (auto const & consumer_ptr_weak : safe_consumers.consumers) + for (auto const & consumer_ptr : safe_consumers.consumers) { /// propagate materialized view exception to all consumers - if (auto consumer_ptr = consumer_ptr_weak.lock()) - { - consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */); - } + consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */); } } @@ -922,7 +932,7 @@ bool StorageKafka::streamToViews() std::vector> sources; Pipes pipes; - auto stream_count = thread_per_consumer ? 1 : all_consumers.size(); + auto stream_count = thread_per_consumer ? 1 : num_consumers; sources.reserve(stream_count); pipes.reserve(stream_count); for (size_t i = 0; i < stream_count; ++i) diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 9395c4ff68e..84f4022790f 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -13,13 +13,7 @@ #include #include #include - -namespace cppkafka -{ - -class Configuration; - -} +#include namespace DB { @@ -29,7 +23,7 @@ class StorageSystemKafkaConsumers; struct StorageKafkaInterceptors; using KafkaConsumerPtr = std::shared_ptr; -using KafkaConsumerWeakPtr = std::weak_ptr; +using ConsumerPtr = std::shared_ptr; /** Implements a Kafka queue table engine that can be used as a persistent queue / buffer, * or as a basic building block for creating pipelines with a continuous insertion / ETL. @@ -85,10 +79,10 @@ public: { std::shared_ptr storage_ptr; std::unique_lock lock; - std::vector & consumers; + std::vector & consumers; }; - SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; } + SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; } private: // Configuration and state @@ -108,8 +102,7 @@ private: std::atomic mv_attached = false; - std::vector consumers; /// available consumers - std::vector all_consumers; /// busy (belong to a KafkaSource) and vacant consumers + std::vector consumers; std::mutex mutex; std::condition_variable cv; @@ -131,7 +124,12 @@ private: std::list> thread_statuses; SettingsChanges createSettingsAdjustments(); - KafkaConsumerPtr createConsumer(size_t consumer_number); + /// Creates KafkaConsumer object without real consumer (cppkafka::Consumer) + KafkaConsumerPtr createKafkaConsumer(size_t consumer_number); + /// Creates real cppkafka::Consumer object + ConsumerPtr createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number); + /// Returns consumer configuration with all changes that had been overwritten in config + cppkafka::Configuration getConsumerConfiguration(size_t consumer_number); /// If named_collection is specified. String collection_name; @@ -139,11 +137,7 @@ private: std::atomic shutdown_called = false; // Update Kafka configuration with values from CH user configuration. - void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr); - void updateConfiguration(cppkafka::Configuration & kafka_config) - { - updateConfiguration(kafka_config, std::make_shared()); - } + void updateConfiguration(cppkafka::Configuration & kafka_config); String getConfigPrefix() const; void threadFunc(size_t idx); diff --git a/src/Storages/System/StorageSystemKafkaConsumers.cpp b/src/Storages/System/StorageSystemKafkaConsumers.cpp index 640b4de4c7e..e333f6e2c15 100644 --- a/src/Storages/System/StorageSystemKafkaConsumers.cpp +++ b/src/Storages/System/StorageSystemKafkaConsumers.cpp @@ -98,58 +98,55 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context auto safe_consumers = storage_kafka_ptr->getSafeConsumers(); - for (const auto & weak_consumer : safe_consumers.consumers) + for (const auto & consumer : safe_consumers.consumers) { - if (auto consumer = weak_consumer.lock()) + auto consumer_stat = consumer->getStat(); + + database.insertData(database_str.data(), database_str.size()); + table.insertData(table_str.data(), table_str.size()); + + consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size()); + + const auto num_assignnemts = consumer_stat.assignments.size(); + + for (size_t num = 0; num < num_assignnemts; ++num) { - auto consumer_stat = consumer->getStat(); + const auto & assign = consumer_stat.assignments[num]; - database.insertData(database_str.data(), database_str.size()); - table.insertData(table_str.data(), table_str.size()); + assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size()); - consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size()); - - const auto num_assignnemts = consumer_stat.assignments.size(); - - for (size_t num = 0; num < num_assignnemts; ++num) - { - const auto & assign = consumer_stat.assignments[num]; - - assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size()); - - assigments_partition_id.insert(assign.partition_id); - assigments_current_offset.insert(assign.current_offset); - } - last_assignment_num += num_assignnemts; - - assigments_topics_offsets.push_back(last_assignment_num); - assigments_partition_id_offsets.push_back(last_assignment_num); - assigments_current_offset_offsets.push_back(last_assignment_num); - - for (const auto & exc : consumer_stat.exceptions_buffer) - { - exceptions_text.insertData(exc.text.data(), exc.text.size()); - exceptions_time.insert(exc.timestamp_usec); - } - exceptions_num += consumer_stat.exceptions_buffer.size(); - exceptions_text_offset.push_back(exceptions_num); - exceptions_time_offset.push_back(exceptions_num); - - - last_poll_time.insert(consumer_stat.last_poll_time); - num_messages_read.insert(consumer_stat.num_messages_read); - last_commit_time.insert(consumer_stat.last_commit_timestamp_usec); - num_commits.insert(consumer_stat.num_commits); - last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec); - - num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations); - num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); - - is_currently_used.insert(consumer_stat.in_use); - last_used.insert(consumer_stat.last_used_usec); - - rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); + assigments_partition_id.insert(assign.partition_id); + assigments_current_offset.insert(assign.current_offset); } + last_assignment_num += num_assignnemts; + + assigments_topics_offsets.push_back(last_assignment_num); + assigments_partition_id_offsets.push_back(last_assignment_num); + assigments_current_offset_offsets.push_back(last_assignment_num); + + for (const auto & exc : consumer_stat.exceptions_buffer) + { + exceptions_text.insertData(exc.text.data(), exc.text.size()); + exceptions_time.insert(exc.timestamp_usec); + } + exceptions_num += consumer_stat.exceptions_buffer.size(); + exceptions_text_offset.push_back(exceptions_num); + exceptions_time_offset.push_back(exceptions_num); + + + last_poll_time.insert(consumer_stat.last_poll_time); + num_messages_read.insert(consumer_stat.num_messages_read); + last_commit_time.insert(consumer_stat.last_commit_timestamp_usec); + num_commits.insert(consumer_stat.num_commits); + last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec); + + num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations); + num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments); + + is_currently_used.insert(consumer_stat.in_use); + last_used.insert(consumer_stat.last_used_usec); + + rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size()); } };