Avoid problem with configuration queued.min.messages automatically for Kafka storage

This commit is contained in:
Stas Morozov 2023-10-30 13:53:52 +03:00
parent 3631e476eb
commit 5a922014f7
No known key found for this signature in database
GPG Key ID: 2FF7608EFCF35E7B

View File

@ -513,8 +513,9 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
// that allows to prevent fast draining of the librdkafka queue // that allows to prevent fast draining of the librdkafka queue
// during building of single insert block. Improves performance // during building of single insert block. Improves performance
// significantly, but may lead to bigger memory consumption. // significantly, but may lead to bigger memory consumption.
size_t default_queued_min_messages = 100000; // we don't want to decrease the default size_t default_queued_min_messages = 100000; // must be greater than or equal to default
conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages)); size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
/// a reference to the consumer is needed in statistic callback /// a reference to the consumer is needed in statistic callback
/// although the consumer does not exist when callback is being registered /// although the consumer does not exist when callback is being registered