Merge pull request #46752 from ClickHouse/rs/periods-in-kafka-topics

Allow configuration of Kafka topics with periods
This commit is contained in:
Robert Schulze 2023-02-26 07:49:28 +01:00 committed by GitHub
commit 5e68d50b39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 49 deletions

View File

@ -162,22 +162,59 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Configuration {#configuration}
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`kafka`) and topic-level (`kafka_*`). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (below `<kafka>`) and topic-level (below `<kafka><kafka_topic>`). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).
``` xml
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
<!-- Configuration specific to topics "logs" and "stats" -->
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</kafka>
```
<details markdown="1">
<summary>Example in deprecated syntax</summary>
``` xml
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<!-- Configuration specific to topics "logs" and "stats" -->
<!-- Does NOT support periods in topic names, e.g. "logs.security"> -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
<kafka_stats>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_stats>
```
</details>
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}

View File

@ -175,22 +175,69 @@ namespace
const auto CLEANUP_TIMEOUT_MS = 3000;
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
/// Configuration prefix
const String CONFIG_PREFIX = "kafka";
const String CONFIG_KAFKA_TAG = "kafka";
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";
void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path)
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(path, keys);
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & key : keys)
for (const auto & tag : tags)
{
const String key_path = path + "." + key;
// log_level has valid underscore, rest librdkafka setting use dot.separated.format
// which is not acceptable for XML.
// See also https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String key_name = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
conf.set(key_name, config.getString(key_path));
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
continue; /// used by new per-topic configuration, ignore
const String setting_path = config_prefix + "." + tag;
const String setting_value = config.getString(setting_path);
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (tag == "log_level") ? tag : boost::replace_all_copy(tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
}
}
/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & topic)
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String topic_name = config.getString(kafpa_topic_name_path);
if (topic_name == topic)
{
/// Found it! Now read the per-topic configuration into cppkafka.
Poco::Util::AbstractConfiguration::Keys inner_tags;
config.keys(kafka_topic_path, inner_tags);
for (const auto & inner_tag : inner_tags)
{
if (inner_tag == CONFIG_NAME_TAG)
continue; // ignore <name>
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_path = kafka_topic_path + "." + inner_tag;
const String setting_value = config.getString(setting_path);
const String setting_name_in_kafka_config = (inner_tag == "log_level") ? inner_tag : boost::replace_all_copy(inner_tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
}
}
}
}
}
@ -509,29 +556,33 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
String StorageKafka::getConfigPrefix() const
{
if (!collection_name.empty())
return "named_collections." + collection_name + "." + CONFIG_PREFIX; /// Add one more level to separate librdkafka configuration.
return CONFIG_PREFIX;
return "named_collections." + collection_name + "." + CONFIG_KAFKA_TAG; /// Add one more level to separate librdkafka configuration.
return CONFIG_KAFKA_TAG;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration
// Update consumer configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(conf, config, config_prefix);
loadFromConfig(kafka_config, config, config_prefix);
#if USE_KRB5
if (conf.has_property("sasl.kerberos.kinit.cmd"))
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
conf.set("sasl.kerberos.kinit.cmd","");
conf.set("sasl.kerberos.min.time.before.relogin","0");
kafka_config.set("sasl.kerberos.kinit.cmd","");
kafka_config.set("sasl.kerberos.min.time.before.relogin","0");
if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal"))
if (kafka_config.has_property("sasl.kerberos.keytab") && kafka_config.has_property("sasl.kerberos.principal"))
{
String keytab = conf.get("sasl.kerberos.keytab");
String principal = conf.get("sasl.kerberos.principal");
String keytab = kafka_config.get("sasl.kerberos.keytab");
String principal = kafka_config.get("sasl.kerberos.principal");
LOG_DEBUG(log, "Running KerberosInit");
try
{
@ -543,21 +594,47 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
}
LOG_DEBUG(log, "Finished KerberosInit");
}
#else // USE_KRB5
if (conf.has_property("sasl.kerberos.keytab") || conf.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Kerberos-related parameters are ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
#else // USE_KRB5
if (kafka_config.has_property("sasl.kerberos.keytab") || kafka_config.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support.");
#endif // USE_KRB5
// Update consumer topic-specific configuration
// Update consumer topic-specific configuration (legacy syntax, retained for compatibility). Example with topic "football":
// <kafka_football>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka_football>
// The legacy syntax has the problem that periods in topic names (e.g. "sports.football") are not supported because the Poco
// configuration framework hierarchy is based on periods as level separators. Besides that, per-topic tags at the same level
// as <kafka> are ugly.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
loadFromConfig(kafka_config, config, topic_config_key);
}
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
// <kafka>
// <kafka_topic>
// <name>football</name>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>5000</fetch_min_bytes>
// </kafka_topic>
// <kafka_topic>
// <name>baseball</name>
// <retry_backoff_ms>300</retry_backoff_ms>
// <fetch_min_bytes>2000</fetch_min_bytes>
// </kafka_topic>
// </kafka>
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
if (config.has(config_prefix))
loadTopicConfig(kafka_config, config, config_prefix, topic);
// No need to add any prefix, messages can be distinguished
conf.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
{
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
@ -573,13 +650,13 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
int status;
status = rd_kafka_conf_interceptor_add_on_new(conf.get_handle(),
status = rd_kafka_conf_interceptor_add_on_new(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set new interceptor due to {} error", status);
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(conf.get_handle(),
status = rd_kafka_conf_interceptor_add_on_conf_dup(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set dup conf interceptor due to {} error", status);

View File

@ -126,7 +126,7 @@ private:
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void updateConfiguration(cppkafka::Configuration & kafka_config);
String getConfigPrefix() const;
void threadFunc(size_t idx);

View File

@ -9,12 +9,14 @@
XXX: for now this messages will appears in stderr.
-->
<debug>cgrp,consumer,topic,protocol</debug>
<kafka_topic>
<name>consumer_hang</name>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_topic>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</clickhouse>

View File

@ -15,12 +15,13 @@
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
<kafka_topic>
<name>consumer_hang</name>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_topic>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</clickhouse>