diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 43a3bedfb74..f2517db8aae 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -513,8 +513,9 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number) // that allows to prevent fast draining of the librdkafka queue // during building of single insert block. Improves performance // significantly, but may lead to bigger memory consumption. - size_t default_queued_min_messages = 100000; // we don't want to decrease the default - conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages)); + size_t default_queued_min_messages = 100000; // must be greater than or equal to default + size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value + conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages)); /// a reference to the consumer is needed in statistic callback /// although the consumer does not exist when callback is being registered