StorageKafka: nicer interface for claiming consumers

This commit is contained in:
Marek Vavruša 2017-12-20 11:09:13 -08:00 committed by alexey-milovidov
parent 9c57def8df
commit 3e1ce9bf26
2 changed files with 14 additions and 7 deletions

View File

@ -233,13 +233,15 @@ BlockInputStreams StorageKafka::read(
if (num_consumers == 0)
return BlockInputStreams();
const size_t stream_count = std::min(num_streams, num_consumers);
BlockInputStreams streams;
streams.reserve(std::min(num_streams, num_consumers));
streams.reserve(stream_count);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < streams.capacity(); ++i)
for (size_t i = 0; i < stream_count; ++i)
{
auto consumer = claimConsumer(0);
auto consumer = tryClaimConsumer(0);
if (consumer == nullptr)
break;
@ -291,7 +293,7 @@ void StorageKafka::shutdown()
LOG_TRACE(log, "Unsubscribing from assignments");
for (size_t i = 0; i < num_consumers; ++i)
{
auto consumer = claimConsumer(-1);
auto consumer = claimConsumer();
consumer->unsubscribe();
}
@ -342,8 +344,12 @@ void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
}
}
StorageKafka::ConsumerPtr StorageKafka::claimConsumer()
{
return tryClaimConsumer(-1L);
}
StorageKafka::ConsumerPtr StorageKafka::claimConsumer(long wait_ms)
StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
{
// Wait for the first free consumer
if (wait_ms >= 0)
@ -423,7 +429,7 @@ void StorageKafka::streamToViews()
streams.reserve(num_consumers);
for (size_t i = 0; i < num_consumers; ++i)
{
auto consumer = claimConsumer(-1);
auto consumer = claimConsumer();
streams.push_back(std::make_shared<KafkaBlockInputStream>(*this, consumer, context, schema_name, block_size));
}

View File

@ -93,7 +93,8 @@ private:
std::atomic<bool> stream_cancelled{false};
void consumerConfiguration(struct rd_kafka_conf_s * conf);
ConsumerPtr claimConsumer(long wait_ms);
ConsumerPtr claimConsumer();
ConsumerPtr tryClaimConsumer(long wait_ms);
void pushConsumer(ConsumerPtr c);
void streamThread();