Create consumers in startup

This commit is contained in:
János Benjamin Antal 2024-06-12 18:23:50 +00:00
parent 0691c01427
commit 11bda3f5f7

View File

@ -278,6 +278,18 @@ StorageKafka2::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapsho
void StorageKafka2::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
try
{
consumers.emplace_back(ConsumerAndAssignmentInfo{.consumer = createConsumer(i), .keeper = keeper});
++num_created_consumers;
}
catch (const cppkafka::Exception &)
{
tryLogCurrentException(log);
}
}
// Start the reader thread
for (auto & task : tasks)
task->holder->activateAndSchedule();