Merge pull request #59710 from GrigoryPervakov/master

Support sql created named collections in Kafka Storage for librdkafka settings
This commit is contained in:
Nikolai Kochetov 2024-02-28 16:38:56 +01:00 committed by GitHub
commit 8dd08e9844
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 121 additions and 60 deletions

View File

@ -246,64 +246,83 @@ namespace
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";
void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
return; /// used by new per-topic configuration, ignore
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
kafka_config.set(setting_name_in_kafka_config, value);
}
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String & config_prefix)
{
if (!collection_name.empty())
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
return;
}
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
continue; /// used by new per-topic configuration, ignore
const String setting_path = config_prefix + "." + tag;
const String setting_value = config.getString(setting_path);
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (tag == "log_level") ? tag : boost::replace_all_copy(tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
const String setting_path = fmt::format("{}.{}", config_prefix, tag);
setKafkaConfigValue(kafka_config, tag, config.getString(setting_path));
}
}
/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & topic)
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String& config_prefix, const String& topic)
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
if (!collection_name.empty())
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafpa_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String topic_name = config.getString(kafpa_topic_name_path);
if (topic_name == topic)
const auto topic_prefix = fmt::format("{}.{}", config_prefix, CONFIG_KAFKA_TOPIC_TAG);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix))
{
/// Found it! Now read the per-topic configuration into cppkafka.
Poco::Util::AbstractConfiguration::Keys inner_tags;
config.keys(kafka_topic_path, inner_tags);
for (const auto & inner_tag : inner_tags)
{
if (inner_tag == CONFIG_NAME_TAG)
continue; // ignore <name>
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(topic_prefix))
continue;
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_path = kafka_topic_path + "." + inner_tag;
const String setting_value = config.getString(setting_path);
const String kafka_topic_path = config_prefix + "." + key;
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
else
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
const String setting_name_in_kafka_config = (inner_tag == "log_level") ? inner_tag : boost::replace_all_copy(inner_tag, "_", ".");
kafka_config.set(setting_name_in_kafka_config, setting_value);
}
for (const auto & tag : tags)
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
/// Read topic name between <name>...</name>
const String kafka_topic_path = fmt::format("{}.{}", config_prefix, tag);
const String kafka_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG);
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name == topic)
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
}
@ -728,13 +747,6 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}
String StorageKafka::getConfigPrefix() const
{
if (!collection_name.empty())
return "named_collections." + collection_name + "." + CONFIG_KAFKA_TAG; /// Add one more level to separate librdkafka configuration.
return CONFIG_KAFKA_TAG;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration. Example:
@ -743,9 +755,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(kafka_config, config, config_prefix);
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG);
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
@ -784,9 +794,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// as <kafka> are ugly.
for (const auto & topic : topics)
{
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(kafka_config, config, topic_config_key);
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG + "_" + topic);
}
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
@ -805,8 +813,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
if (config.has(config_prefix))
loadTopicConfig(kafka_config, config, config_prefix, topic);
loadTopicConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topic);
// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
@ -817,7 +824,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
if (!kafka_config.has_property("statistics.interval.ms"))
{
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");

View File

@ -145,7 +145,6 @@ private:
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);
String getConfigPrefix() const;
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;

View File

@ -11,8 +11,8 @@
<debug>cgrp,consumer,topic,protocol</debug>
<!-- librdkafka stat in system.kafka_consumers -->
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<kafka_topic>
<name>consumer_hang</name>

View File

@ -105,6 +105,9 @@ create_keytabs() {
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"
kadmin.local -q "addprinc -randkey anotherkafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab anotherkafkauser/instance@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}

View File

@ -227,6 +227,58 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
assert instance.contains_in_log("KerberosInit failure:")
def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE NAMED COLLECTION kafka_config AS
kafka.security_protocol = 'SASL_PLAINTEXT',
kafka.sasl_mechanism = 'GSSAPI',
kafka.sasl_kerberos_service_name = 'kafka',
kafka.sasl_kerberos_keytab = '/tmp/keytab/clickhouse.keytab',
kafka.sasl_kerberos_principal = 'anotherkafkauser/instance@TEST.CLICKHOUSE.TECH',
kafka.debug = 'security',
kafka.api_version_request = 'false',
kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka(kafka_config);
"""
)
time.sleep(3)
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")