Remove StorageKafka::num_created_consumers (in favor of all_consumers.size())

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
(cherry picked from commit 123d63e824)
This commit is contained in:
Azat Khuzhin 2023-12-13 14:57:22 +01:00
parent d58b76ce06
commit 3541d9a05f
2 changed files with 6 additions and 11 deletions

View File

@ -343,7 +343,7 @@ Pipe StorageKafka::read(
size_t /* max_block_size */, size_t /* max_block_size */,
size_t /* num_streams */) size_t /* num_streams */)
{ {
if (num_created_consumers == 0) if (all_consumers.empty())
return {}; return {};
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
@ -357,12 +357,12 @@ Pipe StorageKafka::read(
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes; Pipes pipes;
pipes.reserve(num_created_consumers); pipes.reserve(all_consumers.size());
auto modified_context = Context::createCopy(local_context); auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments); modified_context->applySettingsChanges(settings_adjustments);
// Claim as many consumers as requested, but don't block // Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < all_consumers.size(); ++i)
{ {
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance. /// TODO: probably that leads to awful performance.
@ -419,7 +419,6 @@ void StorageKafka::startup()
auto consumer = createConsumer(i); auto consumer = createConsumer(i);
pushConsumer(consumer); pushConsumer(consumer);
all_consumers.push_back(consumer); all_consumers.push_back(consumer);
++num_created_consumers;
} }
catch (const cppkafka::Exception &) catch (const cppkafka::Exception &)
{ {
@ -447,7 +446,7 @@ void StorageKafka::shutdown(bool)
} }
LOG_TRACE(log, "Closing consumers"); LOG_TRACE(log, "Closing consumers");
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < all_consumers.size(); ++i)
auto consumer = popConsumer(); auto consumer = popConsumer();
LOG_TRACE(log, "Consumers closed"); LOG_TRACE(log, "Consumers closed");
@ -756,7 +755,7 @@ void StorageKafka::threadFunc(size_t idx)
mv_attached.store(true); mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled // Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled && num_created_consumers > 0) while (!task->stream_cancelled && !all_consumers.empty())
{ {
if (!checkDependencies(table_id)) if (!checkDependencies(table_id))
break; break;
@ -844,7 +843,7 @@ bool StorageKafka::streamToViews()
std::vector<std::shared_ptr<KafkaSource>> sources; std::vector<std::shared_ptr<KafkaSource>> sources;
Pipes pipes; Pipes pipes;
auto stream_count = thread_per_consumer ? 1 : num_created_consumers; auto stream_count = thread_per_consumer ? 1 : all_consumers.size();
sources.reserve(stream_count); sources.reserve(stream_count);
pipes.reserve(stream_count); pipes.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i) for (size_t i = 0; i < stream_count; ++i)

View File

@ -108,10 +108,6 @@ private:
std::atomic<bool> mv_attached = false; std::atomic<bool> mv_attached = false;
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.
std::vector<KafkaConsumerPtr> consumers; /// available consumers std::vector<KafkaConsumerPtr> consumers; /// available consumers
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers