Allow configuration of Kafka topics with periods

The Kafka table engine allows global configuration and per-Kafka-topic
configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic
"football":

  <kafka_football>
      <retry_backoff_ms>250</retry_backoff_ms>
      <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_football>

Some users had to find out the hard way that such configuration doesn't
take effect if the topic name contains a period, e.g. "sports.football".
The reason is that ClickHouse configuration framework already uses
periods as level separators to descend the configuration hierarchy.
(Besides that, per-topic configuration at the same level as global
configuration could be considered ugly.)

Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and
a tree-like topic organization using periods is quite common in
practice.

This PR deprecates the existing per-topic configuration syntax (but
continues to support it for backward compat) and introduces a new
per-topic configuration syntax below the global Kafka configuration of
the form:

<kafka>
   <topic name="football">
       <retry_backoff_ms>250</retry_backoff_ms>
       <fetch_min_bytes>100000</fetch_min_bytes>
   </topic>
</kafka>

The period restriction doesn't apply to XML attributes, so <topic
name="sports.football"> will work. Also, everything Kafka-related is
below <kafka>.

Considered but rejected alternatives:
- Extending Poco ConfigurationView with custom separators (e.g."/"
  instead of "."). Won't work easily because ConfigurationView only
  builds a path but defers descending the configuration tree to the
  normal configuration classes.
- Reloading the configuration file in StorageKafka (instead of reading
  the loaded file) but with a custom separator. This mode is supported
  by XML configuration. Too ugly and error-prone since the true
  configuration is composed from multiple configuration files.

(*) https://stackoverflow.com/a/37067544
This commit is contained in:
Robert Schulze 2023-02-22 19:58:48 +00:00
parent 8e7533fa57
commit 81bf43157f
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
5 changed files with 92 additions and 38 deletions

View File

@ -162,7 +162,25 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Configuration {#configuration}
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`kafka`) and topic-level (`kafka_*`). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`kafka`) and topic-level (`topic name="*"`, nested in `kafka`). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).
``` xml
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
<!-- Configuration specific for topic "logs" -->
<topic name="logs">
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</topic>
</kafka>
```
<details markdown="1">
<summary>Example in deprecated syntax</summary>
``` xml
<!-- Global configuration options for all tables of Kafka engine type -->
@ -172,12 +190,16 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
</kafka>
<!-- Configuration specific for topic "logs" -->
<!-- Does NOT support periods in topic names, e.g. "logs.security"> -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
```
</details>
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}

View File

@ -175,22 +175,26 @@ namespace
const auto CLEANUP_TIMEOUT_MS = 3000;
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
/// Configuration prefix
const String CONFIG_PREFIX = "kafka";
const String CONFIG_TOPIC_TAG = "topic";
void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path)
/// Read server configuration into cppkafka configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(path, keys);
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
const String key_path = path + "." + key;
if (key == CONFIG_TOPIC_TAG)
continue; // special case: when loading the global config, cppkafka rightfully complains that "topic" isn't a valid kafka setting
const String key_path = config_prefix + "." + key;
// log_level has valid underscore, rest librdkafka setting use dot.separated.format
// which is not acceptable for XML.
// See also https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String key_name = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
conf.set(key_name, config.getString(key_path));
String value = config.getString(key_path);
kafka_config.set(key_name, value);
}
}
}
@ -513,25 +517,29 @@ String StorageKafka::getConfigPrefix() const
return CONFIG_PREFIX;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration
// Update consumer configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(conf, config, config_prefix);
loadFromConfig(kafka_config, config, config_prefix);
#if USE_KRB5
if (conf.has_property("sasl.kerberos.kinit.cmd"))
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
conf.set("sasl.kerberos.kinit.cmd","");
conf.set("sasl.kerberos.min.time.before.relogin","0");
kafka_config.set("sasl.kerberos.kinit.cmd","");
kafka_config.set("sasl.kerberos.min.time.before.relogin","0");
if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal"))
if (kafka_config.has_property("sasl.kerberos.keytab") && kafka_config.has_property("sasl.kerberos.principal"))
{
String keytab = conf.get("sasl.kerberos.keytab");
String principal = conf.get("sasl.kerberos.principal");
String keytab = kafka_config.get("sasl.kerberos.keytab");
String principal = kafka_config.get("sasl.kerberos.principal");
LOG_DEBUG(log, "Running KerberosInit");
try
{
@ -543,21 +551,44 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
}
LOG_DEBUG(log, "Finished KerberosInit");
}
#else // USE_KRB5
if (conf.has_property("sasl.kerberos.keytab") || conf.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Kerberos-related parameters are ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
#else // USE_KRB5
if (kafka_config.has_property("sasl.kerberos.keytab") || kafka_config.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support.");
#endif // USE_KRB5
// Update consumer topic-specific configuration
// Update consumer topic-specific configuration (legacy syntax, retained for compatibility). Example with topic "football":
// <kafka_football>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka_football>
// The legacy syntax has the problem that periods in topic names (e.g. "sports.football") are not supported because the Poco
// configuration framework hierarchy is based on periods as level separators. Besides that, per-topic tags at the same level
// as <kafka> are ugly.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
loadFromConfig(kafka_config, config, topic_config_key);
}
// Update consumer topic-specific configuration (new syntax). Example with topic "football":
// <kafka>
// <topic name="football">
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// <topic name="football">
// </kafka>
// Advantages: The period restriction doesn't apply to XML attributes (e.g. <topic name="sports.football"> works) and everything
// kafka-related is below <kafka>.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + ".topic[@name=" + topic + "]";
if (config.has(topic_config_key))
loadFromConfig(kafka_config, config, topic_config_key);
}
// No need to add any prefix, messages can be distinguished
conf.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
{
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
@ -573,13 +604,13 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
int status;
status = rd_kafka_conf_interceptor_add_on_new(conf.get_handle(),
status = rd_kafka_conf_interceptor_add_on_new(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set new interceptor due to {} error", status);
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(conf.get_handle(),
status = rd_kafka_conf_interceptor_add_on_conf_dup(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set dup conf interceptor due to {} error", status);

View File

@ -126,7 +126,7 @@ private:
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void updateConfiguration(cppkafka::Configuration & kafka_config);
String getConfigPrefix() const;
void threadFunc(size_t idx);

View File

@ -9,12 +9,13 @@
XXX: for now this messages will appears in stderr.
-->
<debug>cgrp,consumer,topic,protocol</debug>
<topic name="consumer_hang">
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</topic>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</clickhouse>

View File

@ -15,12 +15,12 @@
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
<topic name="consumer_hang">
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</topic>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</clickhouse>