refactore: improve reading several configurations for kafka

Simplify and do some refactoring for kafka client settings.

Allows to set up separate
settings for consumer and producer like:

```
<consumer>
    ...
</consumer>

<producer>
    <kafka_topic>
        <name>topic_name</name>
        ...
    </kafka_topic>
</producer>
```

Moreover, this fixes warnings from kafka client like:
`Configuration property session.timeout.ms is a consumer property and
will be ignored by this producer instance`
This commit is contained in:
Aleksandr Musorin 2024-03-27 12:19:26 +01:00
parent 20a45b4073
commit f9d81bc774
8 changed files with 410 additions and 141 deletions

View File

@ -170,53 +170,37 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
<statistics_interval_ms>600</statistics_interval_ms>
<statistics_interval_ms>3000</statistics_interval_ms>
<!-- Configuration specific to topics "logs" and "stats" -->
<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>
```
<details markdown="1">
<summary>Example in deprecated syntax</summary>
``` xml
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific to topics "logs" and "stats" -->
<!-- Does NOT support periods in topic names, e.g. "logs.security"> -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
<kafka_stats>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_stats>
```
</details>
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.

View File

@ -166,17 +166,38 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (`kafka`) и по топикам (`kafka_topic_*`). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).
``` xml
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<statistics_interval_ms>3000</statistics_interval_ms>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>
<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>
```
В документе [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) можно увидеть список возможных опций конфигурации. Используйте подчеркивание (`_`) вместо точки в конфигурации ClickHouse. Например, `check.crcs=true` будет соответствовать `<check_crcs>true</check_crcs>`.

View File

@ -120,17 +120,38 @@ Kafka 特性:
`GraphiteMergeTree` 类似Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键全局 (`kafka`) 和 主题级别 (`kafka_*`)。首先应用全局配置,然后应用主题级配置(如果存在)。
``` xml
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<statistics_interval_ms>3000</statistics_interval_ms>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>
<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>
```
有关详细配置选项列表,请参阅 [librdkafka配置参考](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。在 ClickHouse 配置中使用下划线 (`_`) ,并不是使用点 (`.`)。例如,`check.crcs=true` 将是 `<check_crcs>true</check_crcs>`

View File

@ -1579,6 +1579,42 @@
</rocksdb>
-->
<!-- <kafka> -->
<!-- Global configuration properties -->
<!--
NOTE: statistics should be consumed, otherwise it creates too much
entries in the queue, that leads to memory leak and slow shutdown.
default value: 0
<statistics_interval_ms>3000</statistics_interval_ms>
-->
<!-- Topic configuration properties -->
<!--
<kafka_topic>
<name>football</name>
<request_timeout_ms>6000</request_timeout_ms>
</kafka_topic>
-->
<!-- Producer configuration -->
<!--
<producer>
<compression_codec>gzip</compression_codec>
<kafka_topic>
<name>football</name>
<request_timeout_ms>6000</request_timeout_ms>
</kafka_topic>
</producer>
-->
<!-- Consumer configuration -->
<!--
<consumer>
<enable_auto_commit>true</enable_auto_commit>
</consumer>
-->
<!-- </kafka> -->
<!-- Configuration for the query cache -->
<query_cache>
<max_size_in_bytes>1073741824</max_size_in_bytes>

View File

@ -244,47 +244,56 @@ namespace
{
const String CONFIG_KAFKA_TAG = "kafka";
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_KAFKA_CONSUMER_TAG = "consumer";
const String CONFIG_KAFKA_PRODUCER_TAG = "producer";
const String CONFIG_NAME_TAG = "name";
void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
if (key.starts_with(CONFIG_KAFKA_TOPIC_TAG) || key == CONFIG_NAME_TAG) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
return; /// used by new per-topic configuration, ignore
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
kafka_config.set(setting_name_in_kafka_config, value);
}
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String & config_prefix)
void loadConfigProperty(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & tag)
{
const String property_path = config_prefix + "." + tag;
const String property_value = config.getString(property_path);
setKafkaConfigValue(kafka_config, tag, property_value);
}
void loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix)
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
}
void loadLegacyTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix)
{
if (!collection_name.empty())
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
loadNamedCollectionConfig(kafka_config, collection_name, config_prefix);
return;
}
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
const String setting_path = fmt::format("{}.{}", config_prefix, tag);
setKafkaConfigValue(kafka_config, tag, config.getString(setting_path));
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String& collection_name, const String& config_prefix, const String& topic)
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const String & topic)
{
if (!collection_name.empty())
{
@ -300,7 +309,7 @@ namespace
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
loadNamedCollectionConfig(kafka_config, collection_name, kafka_topic_path);
}
}
else
@ -311,21 +320,100 @@ namespace
for (const auto & tag : tags)
{
/// Only consider tag <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!tag.starts_with(CONFIG_KAFKA_TOPIC_TAG))
continue;
/// Read topic name between <name>...</name>
const String kafka_topic_path = fmt::format("{}.{}", config_prefix, tag);
const String kafka_topic_name_path = fmt::format("{}.{}", kafka_topic_path, CONFIG_NAME_TAG);
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name == topic)
/// Found it! Now read the per-topic configuration into cppkafka.
loadFromConfig(kafka_config, config, collection_name, kafka_topic_path);
if (tag == CONFIG_NAME_TAG)
continue; // ignore <name>, it is used to match topic configurations
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
}
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const Names & topics)
{
if (!collection_name.empty())
{
loadNamedCollectionConfig(kafka_config, collection_name, config_prefix);
return;
}
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag == CONFIG_KAFKA_PRODUCER_TAG || tag == CONFIG_KAFKA_CONSUMER_TAG)
/// Do not load consumer/producer properties, since they should be separated by different configuration objects.
continue;
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
{
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
// <kafka>
// <kafka_topic>
// <name>football</name>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>5000</fetch_min_bytes>
// </kafka_topic>
// <kafka_topic>
// <name>baseball</name>
// <retry_backoff_ms>300</retry_backoff_ms>
// <fetch_min_bytes>2000</fetch_min_bytes>
// </kafka_topic>
// </kafka>
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
{
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name != topic)
continue;
loadTopicConfig(kafka_config, config, collection_name, kafka_topic_path, topic);
}
continue;
}
if (tag.starts_with(CONFIG_KAFKA_TAG))
/// skip legacy configuration per topic e.g. <kafka_TOPIC_NAME>.
/// it will be processed is a separate function
continue;
// Update configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
void loadLegacyConfigSyntax(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
for (const auto & topic : topics)
{
const String kafka_topic_path = prefix + "." + CONFIG_KAFKA_TAG + "_" + topic;
loadLegacyTopicConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
void loadConsumerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
const String consumer_path = prefix + "." + CONFIG_KAFKA_CONSUMER_TAG;
loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, config, collection_name, consumer_path, topics);
}
void loadProducerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
const String producer_path = prefix + "." + CONFIG_KAFKA_PRODUCER_TAG;
loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, config, collection_name, producer_path, topics);
}
}
StorageKafka::StorageKafka(
@ -484,13 +572,7 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
if (topics.size() > 1)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't write to Kafka table with multiple topics!");
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("client.id", client_id);
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
// TODO: fill required settings
updateConfiguration(conf);
cppkafka::Configuration conf = getProducerConfiguration();
const Settings & settings = getContext()->getSettingsRef();
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
@ -499,6 +581,8 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
auto producer = std::make_unique<KafkaProducer>(
std::make_shared<cppkafka::Producer>(conf), topics[0], std::chrono::milliseconds(poll_timeout), shutdown_called, header);
LOG_TRACE(log, "Kafka producer created");
size_t max_rows = max_rows_per_message;
/// Need for backward compatibility.
if (format_name == "Avro" && local_context->getSettingsRef().output_format_avro_rows_in_file.changed)
@ -688,13 +772,38 @@ cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_n
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
updateConfiguration(conf);
updateGlobalConfiguration(conf);
updateConsumerConfiguration(conf);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
for (auto & property : conf.get_all())
{
LOG_TRACE(log, "Consumer set property {}:{}", property.first, property.second);
}
return conf;
}
cppkafka::Configuration StorageKafka::getProducerConfiguration()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("client.id", client_id);
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
updateGlobalConfiguration(conf);
updateProducerConfiguration(conf);
for (auto & property : conf.get_all())
{
LOG_TRACE(log, "Producer set property {}:{}", property.first, property.second);
}
return conf;
}
@ -773,15 +882,10 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
void StorageKafka::updateGlobalConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
const auto & config = getContext()->getConfigRef();
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG);
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
@ -810,37 +914,6 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
LOG_WARNING(log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support.");
#endif // USE_KRB5
// Update consumer topic-specific configuration (legacy syntax, retained for compatibility). Example with topic "football":
// <kafka_football>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka_football>
// The legacy syntax has the problem that periods in topic names (e.g. "sports.football") are not supported because the Poco
// configuration framework hierarchy is based on periods as level separators. Besides that, per-topic tags at the same level
// as <kafka> are ugly.
for (const auto & topic : topics)
{
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG + "_" + topic);
}
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
// <kafka>
// <kafka_topic>
// <name>football</name>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>5000</fetch_min_bytes>
// </kafka_topic>
// <kafka_topic>
// <name>baseball</name>
// <retry_backoff_ms>300</retry_backoff_ms>
// <fetch_min_bytes>2000</fetch_min_bytes>
// </kafka_topic>
// </kafka>
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
loadTopicConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topic);
// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
{
@ -879,6 +952,18 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
}
}
void StorageKafka::updateConsumerConfiguration(cppkafka::Configuration & kafka_config)
{
const auto & config = getContext()->getConfigRef();
loadConsumerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
}
void StorageKafka::updateProducerConfiguration(cppkafka::Configuration & kafka_config)
{
const auto & config = getContext()->getConfigRef();
loadProducerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
}
bool StorageKafka::checkDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached

View File

@ -134,16 +134,29 @@ private:
SettingsChanges createSettingsAdjustments();
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
/// Returns consumer configuration with all changes that had been overwritten in config
/// Returns full consumer related configuration, also the configuration
/// contains global kafka properties.
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
/// Returns full producer related configuration, also the configuration
/// contains global kafka properties.
cppkafka::Configuration getProducerConfiguration();
/// If named_collection is specified.
String collection_name;
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);
// Load Kafka global configuration
// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties
void updateGlobalConfiguration(cppkafka::Configuration & kafka_config);
// Load Kafka properties from consumer configuration
// NOTE: librdkafka allow to set a consumer property to a producer and vice versa,
// but a warning will be generated e.g:
// "Configuration property session.timeout.ms is a consumer property and
// will be ignored by this producer instance"
void updateConsumerConfiguration(cppkafka::Configuration & kafka_config);
// Load Kafka properties from producer configuration
void updateProducerConfiguration(cppkafka::Configuration & kafka_config);
void threadFunc(size_t idx);

View File

@ -1,6 +1,5 @@
<clickhouse>
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
<!-- Debugging of possible issues, like:
- https://github.com/edenhill/librdkafka/issues/2077
- https://github.com/edenhill/librdkafka/issues/1778
@ -17,10 +16,43 @@
<kafka_topic>
<name>consumer_hang</name>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
<heartbeat_interval_ms>301</heartbeat_interval_ms>
</kafka_topic>
<kafka_separate_settings>
<!-- This is old syntax. This setting will be set for consumer and producer-->
<!-- default 3000 -->
<heartbeat_interval_ms>302</heartbeat_interval_ms>
</kafka_separate_settings>
<consumer>
<auto_offset_reset>earliest</auto_offset_reset>
<kafka_topic>
<!-- Setting for topic will be applied only for consumer -->
<name>consumer_hang</name>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_topic>
<kafka_topic>
<name>separate_settings</name>
<!-- default: 10000 -->
<session_timeout_ms>6001</session_timeout_ms>
</kafka_topic>
</consumer>
<producer>
<kafka_separate_settings>
<!-- This is old syntax. And doesn't work inside producer and consumer tags-->
<!-- default 300000 -->
<message_timeout_ms>300001</message_timeout_ms>
</kafka_separate_settings>
<kafka_topic>
<name>separate_settings</name>
<!-- This is only producer setting, if it was applied to consumer,
it would create warning message in logs. -->
<!-- default 30000 -->
<request_timeout_ms>30001</request_timeout_ms>
</kafka_topic>
<!-- default 60000 -->
<transaction_timeout_ms>60001</transaction_timeout_ms>
</producer>
</kafka>
</clickhouse>

View File

@ -2369,6 +2369,83 @@ def test_kafka_virtual_columns2(kafka_cluster):
instance.rotate_logs()
def test_kafka_producer_consumer_separate_settings(kafka_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.test_kafka;
CREATE TABLE test.test_kafka (key UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'separate_settings',
kafka_group_name = 'test',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
"""
)
instance.query("SELECT * FROM test.test_kafka")
instance.query("INSERT INTO test.test_kafka VALUES (1)")
assert instance.contains_in_log("Kafka producer created")
assert instance.contains_in_log("Created #0 consumer")
kafka_conf_warnings = instance.grep_in_log("rdk:CONFWARN")
assert kafka_conf_warnings is not None
for warn in kafka_conf_warnings.strip().split("\n"):
# this setting was applied via old syntax and applied on both consumer
# and producer configurations
assert "heartbeat.interval.ms" in warn
kafka_consumer_applyed_properties = instance.grep_in_log("Consumer set property")
kafka_producer_applyed_properties = instance.grep_in_log("Producer set property")
assert kafka_consumer_applyed_properties is not None
assert kafka_producer_applyed_properties is not None
# global settings should be applied for consumer and producer
global_settings = {
"debug": "topic,protocol,cgrp,consumer",
"statistics.interval.ms": "600",
}
for name, value in global_settings.items():
property_in_log = f"{name}:{value}"
assert property_in_log in kafka_consumer_applyed_properties
assert property_in_log in kafka_producer_applyed_properties
settings_topic__separate_settings__consumer = {"session.timeout.ms": "6001"}
for name, value in settings_topic__separate_settings__consumer.items():
property_in_log = f"{name}:{value}"
assert property_in_log in kafka_consumer_applyed_properties
assert property_in_log not in kafka_producer_applyed_properties
producer_settings = {"transaction.timeout.ms": "60001"}
for name, value in producer_settings.items():
property_in_log = f"{name}:{value}"
assert property_in_log not in kafka_consumer_applyed_properties
assert property_in_log in kafka_producer_applyed_properties
# Should be ignored, because it is inside producer tag
producer_legacy_syntax__topic_separate_settings = {"message.timeout.ms": "300001"}
for name, value in producer_legacy_syntax__topic_separate_settings.items():
property_in_log = f"{name}:{value}"
assert property_in_log not in kafka_consumer_applyed_properties
assert property_in_log not in kafka_producer_applyed_properties
# Old syntax, applied on consumer and producer
legacy_syntax__topic_separated_settings = {"heartbeat.interval.ms": "302"}
for name, value in legacy_syntax__topic_separated_settings.items():
property_in_log = f"{name}:{value}"
assert property_in_log in kafka_consumer_applyed_properties
assert property_in_log in kafka_producer_applyed_properties
def test_kafka_produce_key_timestamp(kafka_cluster):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)