diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 141d87fed20..5bfa5e4159a 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -170,53 +170,37 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u cgrp - smallest - 600 + 3000 - + + + smallest + + logs + 100000 + - - logs - 250 - 100000 - + + stats + 50000 + + - - stats - 400 - 50000 - + + + + logs + 250 + + + + stats + 400 + + - ``` -
- -Example in deprecated syntax - -``` xml - - - cgrp - smallest - - - - - - - 250 - 100000 - - - - 400 - 50000 - -``` - -
- For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `true`. diff --git a/docs/ru/engines/table-engines/integrations/kafka.md b/docs/ru/engines/table-engines/integrations/kafka.md index 18f6c7cd1f9..6b02f501043 100644 --- a/docs/ru/engines/table-engines/integrations/kafka.md +++ b/docs/ru/engines/table-engines/integrations/kafka.md @@ -166,17 +166,38 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (`kafka`) и по топикам (`kafka_topic_*`). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует). ``` xml - - + + cgrp - smallest - + 3000 - - - 250 - 100000 - + + + smallest + + logs + 100000 + + + + stats + 50000 + + + + + + + logs + 250 + + + + stats + 400 + + + ``` В документе [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) можно увидеть список возможных опций конфигурации. Используйте подчеркивание (`_`) вместо точки в конфигурации ClickHouse. Например, `check.crcs=true` будет соответствовать `true`. diff --git a/docs/zh/engines/table-engines/integrations/kafka.md b/docs/zh/engines/table-engines/integrations/kafka.md index fd4e5e9c10a..f28abf112ef 100644 --- a/docs/zh/engines/table-engines/integrations/kafka.md +++ b/docs/zh/engines/table-engines/integrations/kafka.md @@ -120,17 +120,38 @@ Kafka 特性: 与 `GraphiteMergeTree` 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (`kafka`) 和 主题级别 (`kafka_*`)。首先应用全局配置,然后应用主题级配置(如果存在)。 ``` xml - - + + cgrp - smallest - + 3000 - - - 250 - 100000 - + + + smallest + + logs + 100000 + + + + stats + 50000 + + + + + + + logs + 250 + + + + stats + 400 + + + ``` 有关详细配置选项列表,请参阅 [librdkafka配置参考](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。在 ClickHouse 配置中使用下划线 (`_`) ,并不是使用点 (`.`)。例如,`check.crcs=true` 将是 `true`。 diff --git a/programs/server/config.xml b/programs/server/config.xml index ea3ead47c32..9b8b8961b49 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -1579,6 +1579,42 @@ --> + + + + + + + + + + + + + + 1073741824 diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index a6c15149e15..7845ccb02b0 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -244,47 +244,56 @@ namespace { const String CONFIG_KAFKA_TAG = "kafka"; const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic"; + const String CONFIG_KAFKA_CONSUMER_TAG = "consumer"; + const String CONFIG_KAFKA_PRODUCER_TAG = "producer"; const String CONFIG_NAME_TAG = "name"; void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value) { - if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc. - return; /// used by new per-topic configuration, ignore - /// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML. /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", "."); kafka_config.set(setting_name_in_kafka_config, value); } - /// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration - void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String & config_prefix) + void loadConfigProperty(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & tag) + { + const String property_path = config_prefix + "." + tag; + const String property_value = config.getString(property_path); + + setKafkaConfigValue(kafka_config, tag, property_value); + } + + void loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix) + { + const auto & collection = NamedCollectionFactory::instance().get(collection_name); + for (const auto & key : collection->getKeys(-1, config_prefix)) + { + // Cut prefix with '.' before actual config tag. + const auto param_name = key.substr(config_prefix.size() + 1); + setKafkaConfigValue(kafka_config, param_name, collection->get(key)); + } + } + + void loadLegacyTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix) { if (!collection_name.empty()) { - const auto & collection = NamedCollectionFactory::instance().get(collection_name); - for (const auto & key : collection->getKeys(-1, config_prefix)) - { - // Cut prefix with '.' before actual config tag. - const auto param_name = key.substr(config_prefix.size() + 1); - setKafkaConfigValue(kafka_config, param_name, collection->get(key)); - } + loadNamedCollectionConfig(kafka_config, collection_name, config_prefix); return; } - /// Read all tags one level below Poco::Util::AbstractConfiguration::Keys tags; config.keys(config_prefix, tags); for (const auto & tag : tags) { - const String setting_path = fmt::format("{}.{}", config_prefix, tag); - setKafkaConfigValue(kafka_config, tag, config.getString(setting_path)); + loadConfigProperty(kafka_config, config, config_prefix, tag); } } /// Read server configuration into cppkafa configuration, used by new per-topic configuration - void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String& config_prefix, const String& topic) + void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const String & topic) { if (!collection_name.empty()) { @@ -300,7 +309,7 @@ namespace const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG; if (topic == collection->get(kafka_topic_name_path)) /// Found it! Now read the per-topic configuration into cppkafka. - loadFromConfig(kafka_config, config, collection_name, kafka_topic_path); + loadNamedCollectionConfig(kafka_config, collection_name, kafka_topic_path); } } else @@ -311,21 +320,100 @@ namespace for (const auto & tag : tags) { - /// Only consider tag . Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc. - if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) - continue; - - /// Read topic name between ... - const String kafka_topic_path = fmt::format("{}.{}", config_prefix, tag); - const String kafka_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG); - - const String topic_name = config.getString(kafka_topic_name_path); - if (topic_name == topic) - /// Found it! Now read the per-topic configuration into cppkafka. - loadFromConfig(kafka_config, config, collection_name, kafka_topic_path); + if (tag == CONFIG_NAME_TAG) + continue; // ignore , it is used to match topic configurations + loadConfigProperty(kafka_config, config, config_prefix, tag); } } } + + /// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration + void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const Names & topics) + { + if (!collection_name.empty()) + { + loadNamedCollectionConfig(kafka_config, collection_name, config_prefix); + return; + } + + /// Read all tags one level below + Poco::Util::AbstractConfiguration::Keys tags; + config.keys(config_prefix, tags); + + for (const auto & tag : tags) + { + if (tag == CONFIG_KAFKA_PRODUCER_TAG || tag == CONFIG_KAFKA_CONSUMER_TAG) + /// Do not load consumer/producer properties, since they should be separated by different configuration objects. + continue; + + if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc. + { + // Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball": + // + // + // football + // 250 + // 5000 + // + // + // baseball + // 300 + // 2000 + // + // + // Advantages: The period restriction no longer applies (e.g. sports.football will work), everything + // Kafka-related is below . + for (const auto & topic : topics) + { + /// Read topic name between ... + const String kafka_topic_path = config_prefix + "." + tag; + const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG; + const String topic_name = config.getString(kafka_topic_name_path); + + if (topic_name != topic) + continue; + loadTopicConfig(kafka_config, config, collection_name, kafka_topic_path, topic); + } + continue; + } + if (tag.starts_with(CONFIG_KAFKA_TAG)) + /// skip legacy configuration per topic e.g. . + /// it will be processed is a separate function + continue; + // Update configuration from the configuration. Example: + // + // 250 + // 100000 + // + loadConfigProperty(kafka_config, config, config_prefix, tag); + } + } + + void loadLegacyConfigSyntax(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics) + { + for (const auto & topic : topics) + { + const String kafka_topic_path = prefix + "." + CONFIG_KAFKA_TAG + "_" + topic; + loadLegacyTopicConfig(kafka_config, config, collection_name, kafka_topic_path); + } + } + + void loadConsumerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics) + { + const String consumer_path = prefix + "." + CONFIG_KAFKA_CONSUMER_TAG; + loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics); + // A new syntax has higher priority + loadFromConfig(kafka_config, config, collection_name, consumer_path, topics); + } + + void loadProducerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics) + { + const String producer_path = prefix + "." + CONFIG_KAFKA_PRODUCER_TAG; + loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics); + // A new syntax has higher priority + loadFromConfig(kafka_config, config, collection_name, producer_path, topics); + + } } StorageKafka::StorageKafka( @@ -484,13 +572,7 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & if (topics.size() > 1) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't write to Kafka table with multiple topics!"); - cppkafka::Configuration conf; - conf.set("metadata.broker.list", brokers); - conf.set("client.id", client_id); - conf.set("client.software.name", VERSION_NAME); - conf.set("client.software.version", VERSION_DESCRIBE); - // TODO: fill required settings - updateConfiguration(conf); + cppkafka::Configuration conf = getProducerConfiguration(); const Settings & settings = getContext()->getSettingsRef(); size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); @@ -499,6 +581,8 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & auto producer = std::make_unique( std::make_shared(conf), topics[0], std::chrono::milliseconds(poll_timeout), shutdown_called, header); + LOG_TRACE(log, "Kafka producer created"); + size_t max_rows = max_rows_per_message; /// Need for backward compatibility. if (format_name == "Avro" && local_context->getSettingsRef().output_format_avro_rows_in_file.changed) @@ -688,13 +772,38 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages)); - updateConfiguration(conf); + updateGlobalConfiguration(conf); + updateConsumerConfiguration(conf); // those settings should not be changed by users. conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. conf.set("enable.partition.eof", "false"); // Ignore EOF messages + for (auto & property : conf.get_all()) + { + LOG_TRACE(log, "Consumer set property {}:{}", property.first, property.second); + } + + return conf; +} + +cppkafka::Configuration StorageKafka::getProducerConfiguration() +{ + cppkafka::Configuration conf; + conf.set("metadata.broker.list", brokers); + conf.set("client.id", client_id); + conf.set("client.software.name", VERSION_NAME); + conf.set("client.software.version", VERSION_DESCRIBE); + + updateGlobalConfiguration(conf); + updateProducerConfiguration(conf); + + for (auto & property : conf.get_all()) + { + LOG_TRACE(log, "Producer set property {}:{}", property.first, property.second); + } + return conf; } @@ -773,15 +882,10 @@ size_t StorageKafka::getPollTimeoutMillisecond() const : getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds(); } -void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) +void StorageKafka::updateGlobalConfiguration(cppkafka::Configuration & kafka_config) { - // Update consumer configuration from the configuration. Example: - // - // 250 - // 100000 - // const auto & config = getContext()->getConfigRef(); - loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG); + loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics); #if USE_KRB5 if (kafka_config.has_property("sasl.kerberos.kinit.cmd")) @@ -810,37 +914,6 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) LOG_WARNING(log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support."); #endif // USE_KRB5 - // Update consumer topic-specific configuration (legacy syntax, retained for compatibility). Example with topic "football": - // - // 250 - // 100000 - // - // The legacy syntax has the problem that periods in topic names (e.g. "sports.football") are not supported because the Poco - // configuration framework hierarchy is based on periods as level separators. Besides that, per-topic tags at the same level - // as are ugly. - for (const auto & topic : topics) - { - loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG + "_" + topic); - } - - // Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball": - // - // - // football - // 250 - // 5000 - // - // - // baseball - // 300 - // 2000 - // - // - // Advantages: The period restriction no longer applies (e.g. sports.football will work), everything - // Kafka-related is below . - for (const auto & topic : topics) - loadTopicConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topic); - // No need to add any prefix, messages can be distinguished kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message) { @@ -879,6 +952,18 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config) } } +void StorageKafka::updateConsumerConfiguration(cppkafka::Configuration & kafka_config) +{ + const auto & config = getContext()->getConfigRef(); + loadConsumerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics); +} + +void StorageKafka::updateProducerConfiguration(cppkafka::Configuration & kafka_config) +{ + const auto & config = getContext()->getConfigRef(); + loadProducerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics); +} + bool StorageKafka::checkDependencies(const StorageID & table_id) { // Check if all dependencies are attached diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 829e23faf77..cc74a86b79f 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -134,16 +134,29 @@ private: SettingsChanges createSettingsAdjustments(); /// Creates KafkaConsumer object without real consumer (cppkafka::Consumer) KafkaConsumerPtr createKafkaConsumer(size_t consumer_number); - /// Returns consumer configuration with all changes that had been overwritten in config + /// Returns full consumer related configuration, also the configuration + /// contains global kafka properties. cppkafka::Configuration getConsumerConfiguration(size_t consumer_number); + /// Returns full producer related configuration, also the configuration + /// contains global kafka properties. + cppkafka::Configuration getProducerConfiguration(); /// If named_collection is specified. String collection_name; std::atomic shutdown_called = false; - // Update Kafka configuration with values from CH user configuration. - void updateConfiguration(cppkafka::Configuration & kafka_config); + // Load Kafka global configuration + // https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties + void updateGlobalConfiguration(cppkafka::Configuration & kafka_config); + // Load Kafka properties from consumer configuration + // NOTE: librdkafka allow to set a consumer property to a producer and vice versa, + // but a warning will be generated e.g: + // "Configuration property session.timeout.ms is a consumer property and + // will be ignored by this producer instance" + void updateConsumerConfiguration(cppkafka::Configuration & kafka_config); + // Load Kafka properties from producer configuration + void updateProducerConfiguration(cppkafka::Configuration & kafka_config); void threadFunc(size_t idx); diff --git a/tests/integration/test_storage_kafka/configs/kafka.xml b/tests/integration/test_storage_kafka/configs/kafka.xml index 3bd1b681c9c..b10db879b72 100644 --- a/tests/integration/test_storage_kafka/configs/kafka.xml +++ b/tests/integration/test_storage_kafka/configs/kafka.xml @@ -1,6 +1,5 @@ - earliest - 300 - - 6000 + 301 + + + + 302 + + + earliest + + + consumer_hang + + 6000 + + + separate_settings + + 6001 + + + + + + + 300001 + + + separate_settings + + + 30001 + + + 60001 + diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 081b15520a1..8393e88db88 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2369,6 +2369,83 @@ def test_kafka_virtual_columns2(kafka_cluster): instance.rotate_logs() +def test_kafka_producer_consumer_separate_settings(kafka_cluster): + instance.query( + """ + DROP TABLE IF EXISTS test.test_kafka; + CREATE TABLE test.test_kafka (key UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'separate_settings', + kafka_group_name = 'test', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\\n'; + """ + ) + + instance.query("SELECT * FROM test.test_kafka") + instance.query("INSERT INTO test.test_kafka VALUES (1)") + + assert instance.contains_in_log("Kafka producer created") + assert instance.contains_in_log("Created #0 consumer") + + kafka_conf_warnings = instance.grep_in_log("rdk:CONFWARN") + + assert kafka_conf_warnings is not None + + for warn in kafka_conf_warnings.strip().split("\n"): + # this setting was applied via old syntax and applied on both consumer + # and producer configurations + assert "heartbeat.interval.ms" in warn + + kafka_consumer_applyed_properties = instance.grep_in_log("Consumer set property") + kafka_producer_applyed_properties = instance.grep_in_log("Producer set property") + + assert kafka_consumer_applyed_properties is not None + assert kafka_producer_applyed_properties is not None + + # global settings should be applied for consumer and producer + global_settings = { + "debug": "topic,protocol,cgrp,consumer", + "statistics.interval.ms": "600", + } + + for name, value in global_settings.items(): + property_in_log = f"{name}:{value}" + assert property_in_log in kafka_consumer_applyed_properties + assert property_in_log in kafka_producer_applyed_properties + + settings_topic__separate_settings__consumer = {"session.timeout.ms": "6001"} + + for name, value in settings_topic__separate_settings__consumer.items(): + property_in_log = f"{name}:{value}" + assert property_in_log in kafka_consumer_applyed_properties + assert property_in_log not in kafka_producer_applyed_properties + + producer_settings = {"transaction.timeout.ms": "60001"} + + for name, value in producer_settings.items(): + property_in_log = f"{name}:{value}" + assert property_in_log not in kafka_consumer_applyed_properties + assert property_in_log in kafka_producer_applyed_properties + + # Should be ignored, because it is inside producer tag + producer_legacy_syntax__topic_separate_settings = {"message.timeout.ms": "300001"} + + for name, value in producer_legacy_syntax__topic_separate_settings.items(): + property_in_log = f"{name}:{value}" + assert property_in_log not in kafka_consumer_applyed_properties + assert property_in_log not in kafka_producer_applyed_properties + + # Old syntax, applied on consumer and producer + legacy_syntax__topic_separated_settings = {"heartbeat.interval.ms": "302"} + + for name, value in legacy_syntax__topic_separated_settings.items(): + property_in_log = f"{name}:{value}" + assert property_in_log in kafka_consumer_applyed_properties + assert property_in_log in kafka_producer_applyed_properties + + def test_kafka_produce_key_timestamp(kafka_cluster): admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)