diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/StorageKafka.cpp index 8dbffad301d..822e7f63558 100644 --- a/dbms/src/Storages/StorageKafka.cpp +++ b/dbms/src/Storages/StorageKafka.cpp @@ -233,13 +233,15 @@ BlockInputStreams StorageKafka::read( if (num_consumers == 0) return BlockInputStreams(); + const size_t stream_count = std::min(num_streams, num_consumers); + BlockInputStreams streams; - streams.reserve(std::min(num_streams, num_consumers)); + streams.reserve(stream_count); // Claim as many consumers as requested, but don't block - for (size_t i = 0; i < streams.capacity(); ++i) + for (size_t i = 0; i < stream_count; ++i) { - auto consumer = claimConsumer(0); + auto consumer = tryClaimConsumer(0); if (consumer == nullptr) break; @@ -291,7 +293,7 @@ void StorageKafka::shutdown() LOG_TRACE(log, "Unsubscribing from assignments"); for (size_t i = 0; i < num_consumers; ++i) { - auto consumer = claimConsumer(-1); + auto consumer = claimConsumer(); consumer->unsubscribe(); } @@ -342,8 +344,12 @@ void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf) } } +StorageKafka::ConsumerPtr StorageKafka::claimConsumer() +{ + return tryClaimConsumer(-1L); +} -StorageKafka::ConsumerPtr StorageKafka::claimConsumer(long wait_ms) +StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms) { // Wait for the first free consumer if (wait_ms >= 0) @@ -423,7 +429,7 @@ void StorageKafka::streamToViews() streams.reserve(num_consumers); for (size_t i = 0; i < num_consumers; ++i) { - auto consumer = claimConsumer(-1); + auto consumer = claimConsumer(); streams.push_back(std::make_shared(*this, consumer, context, schema_name, block_size)); } diff --git a/dbms/src/Storages/StorageKafka.h b/dbms/src/Storages/StorageKafka.h index 8da6eb0e990..83e6bb97eaf 100644 --- a/dbms/src/Storages/StorageKafka.h +++ b/dbms/src/Storages/StorageKafka.h @@ -93,7 +93,8 @@ private: std::atomic stream_cancelled{false}; void consumerConfiguration(struct rd_kafka_conf_s * conf); - ConsumerPtr claimConsumer(long wait_ms); + ConsumerPtr claimConsumer(); + ConsumerPtr tryClaimConsumer(long wait_ms); void pushConsumer(ConsumerPtr c); void streamThread();