Fix crash in case of consumer receives no consumer groups on assignment

This commit is contained in:
János Benjamin Antal 2024-06-11 09:21:12 +00:00
parent b40c931651
commit 0691c01427

View File

@ -289,9 +289,13 @@ void StorageKafka2::shutdown(bool)
shutdown_called = true;
for (auto & task : tasks)
{
LOG_TRACE(log, "Cancelling streams");
// Interrupt streaming thread
task->stream_cancelled = true;
}
for (auto & task : tasks)
{
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
}
@ -1108,16 +1112,8 @@ void StorageKafka2::threadFunc(size_t idx)
bool StorageKafka2::streamToViews(size_t idx)
{
// What to do?
// 1. Select a topic partition to consume from
// 2. Do a casual poll for every other consumer to keep them alive
// 3. Get the necessary data from Keeper
// 4. Get the corresponding consumer
// 5. Pull messages
// 6. Create a BlockList from it
// 7. Execute the pipeline
// 8. Write the offset to Keeper
// This function is written assuming that each consumer has their own thread. This means once this is changed, this function should be revisited.
// The return values should be revisited, as stalling all consumers because of a single one stalled is not a good idea.
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
@ -1189,6 +1185,11 @@ bool StorageKafka2::streamToViews(size_t idx)
consumer_info.consumer->updateOffsets(consumer_info.topic_partitions);
}
if (consumer_info.topic_partitions.empty())
{
LOG_TRACE(log, "Consumer {} has assignment, but has no partitions, probably because there are more consumers in the consumer group than partitions.", idx);
return true;
}
LOG_TRACE(log, "Trying to consume from consumer {}", idx);
const auto maybe_rows = streamFromConsumer(consumer_info);
if (maybe_rows.has_value())