system_kafka_consumers: statistics.interval.ms default, tiny cleanup

This commit is contained in:
Ilya Golshtein 2023-07-03 21:33:52 +00:00
parent dd6727be3e
commit a86548a733
2 changed files with 3 additions and 7 deletions

View File

@ -52,8 +52,6 @@ KafkaSource::KafkaSource(
KafkaSource::~KafkaSource()
{
LOG_TRACE(&Poco::Logger::get("KafkaSource"), "dtor, pushConsumer");
if (!consumer)
return;
@ -80,8 +78,6 @@ Chunk KafkaSource::generateImpl()
{
if (!consumer)
{
LOG_TRACE(&Poco::Logger::get("KafkaSource"), "calling popConsumer");
auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
consumer = storage.popConsumer(timeout);

View File

@ -646,15 +646,15 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
});
if (!kafka_config.has_property("statistics.interval.ms"))
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "10"); // every 10 milliseconds
kafka_config.set("statistics.interval.ms", "600"); // every 600 milliseconds
}
if (kafka_config.get("statistics.interval.ms") != "0")
{
kafka_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
{
// LOG_DEBUG(log, "kafka statistics {}", stat_json_string);
rdkafka_stat = std::make_shared<const String>(stat_json_string);
});
}