system_kafka_consumers: per consumer librdkafka stat

This commit is contained in:
Ilya Golshtein 2023-08-10 22:27:01 +00:00
parent 7d7fbc9049
commit ade9c3d970
5 changed files with 55 additions and 31 deletions

View File

@ -559,7 +559,7 @@ void KafkaConsumer::setExceptionInfo(const String & text)
* is merged,
* because consumer->get_member_id() contains a leak
*/
std::string KafkaConsumer::getMemberId()
std::string KafkaConsumer::getMemberId() const
{
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
std::string memberid_string = memberid_ptr;
@ -568,7 +568,7 @@ std::string KafkaConsumer::getMemberId()
}
KafkaConsumer::Stat KafkaConsumer::getStat()
KafkaConsumer::Stat KafkaConsumer::getStat() const
{
KafkaConsumer::Stat::Assignments assignments;
auto cpp_assignments = consumer->get_assignment();
@ -596,7 +596,9 @@ KafkaConsumer::Stat KafkaConsumer::getStat()
.num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.in_use = in_use.load()
.in_use = in_use.load(),
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;}(),
};
}

View File

@ -57,6 +57,7 @@ public:
UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use;
std::string rdkafka_stat;
};
public:
@ -106,11 +107,16 @@ public:
String currentPayload() const { return current[-1].get_payload(); }
void setExceptionInfo(const cppkafka::Error & err);
void setExceptionInfo(const String & text);
void setRDKafkaStat(const std::string & stat_json_string)
{
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
rdkafka_stat = stat_json_string;
}
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
// For system.kafka_consumers
Stat getStat();
Stat getStat() const;
private:
using Messages = std::vector<cppkafka::Message>;
@ -163,6 +169,9 @@ private:
std::atomic<UInt64> num_rebalance_revocations = 0;
std::atomic<bool> in_use = 0;
mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
void drain();
void cleanUnprocessed();
void resetIfStopped();
@ -170,7 +179,7 @@ private:
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();
std::string getMemberId();
std::string getMemberId() const;
};
}

View File

@ -516,7 +516,11 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
size_t default_queued_min_messages = 100000; // we don't want to decrease the default
conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages));
updateConfiguration(conf);
/// a reference to the consumer is needed in statistic callback
/// although the consumer does not exist when callback is being registered
/// shared_ptr<weak_ptr<KafkaConsumer>> comes to the rescue
auto consumer_weak_ptr_ptr = std::make_shared<KafkaConsumerWeakPtr>();
updateConfiguration(conf, consumer_weak_ptr_ptr);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
@ -527,13 +531,20 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
auto consumer_impl = std::make_shared<cppkafka::Consumer>(conf);
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
KafkaConsumerPtr kafka_consumer_ptr;
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
if (thread_per_consumer)
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
return std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
return std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
else
{
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
}
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
return kafka_consumer_ptr;
}
size_t StorageKafka::getMaxBlockSize() const
@ -566,7 +577,8 @@ String StorageKafka::getConfigPrefix() const
return CONFIG_KAFKA_TAG;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
std::shared_ptr<KafkaConsumerWeakPtr> kafka_consumer_weak_ptr_ptr)
{
// Update consumer configuration from the configuration. Example:
// <kafka>
@ -646,6 +658,8 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
});
if (kafka_consumer_weak_ptr_ptr)
{
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
@ -653,11 +667,16 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
if (kafka_config.get("statistics.interval.ms") != "0")
{
kafka_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
{
rdkafka_stat = std::make_shared<const String>(stat_json_string);
auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
if (kafka_consumer_ptr)
{
kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
}
});
}
}
// Configure interceptor to change thread name
//

View File

@ -79,7 +79,6 @@ public:
NamesAndTypesList getVirtuals() const override;
Names getVirtualColumnNames() const;
HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
std::shared_ptr<const String> getRdkafkaStat() const { return rdkafka_stat; }
struct SafeConsumers
{
@ -143,7 +142,12 @@ private:
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);
void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr<KafkaConsumerWeakPtr>);
void updateConfiguration(cppkafka::Configuration & kafka_config)
{
updateConfiguration(kafka_config, std::make_shared<KafkaConsumerWeakPtr>());
}
String getConfigPrefix() const;
void threadFunc(size_t idx);
@ -157,7 +161,6 @@ private:
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
std::shared_ptr<const String> rdkafka_stat;
};
}

View File

@ -145,16 +145,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
is_currently_used.insert(consumer_stat.in_use);
auto stat_string_ptr = storage_kafka_ptr->getRdkafkaStat();
if (stat_string_ptr)
{
rdkafka_stat.insertData(stat_string_ptr->data(), stat_string_ptr->size());
}
else
{
const std::string empty_stat = "{}";
rdkafka_stat.insertData(empty_stat.data(), empty_stat.size());
}
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
}
}
};