Multithreading implementation of Kafka engine

This commit is contained in:
Peng Jian 2020-08-21 15:52:44 +08:00
parent 69630972d4
commit 61ac52cf29
2 changed files with 38 additions and 40 deletions

View File

@ -142,8 +142,11 @@ StorageKafka::StorageKafka(
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
for (size_t i = 0; i < num_consumers; i++) {
auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); });
task->deactivate();
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}
}
SettingsChanges StorageKafka::createSettingsAdjustments()
@ -254,20 +257,22 @@ void StorageKafka::startup()
{
tryLogCurrentException(log);
}
// Start the reader thread
tasks[i]->holder->activateAndSchedule();
}
// Start the reader thread
task->activateAndSchedule();
}
void StorageKafka::shutdown()
{
// Interrupt streaming thread
stream_cancelled = true;
LOG_TRACE(log, "Waiting for cleanup");
task->deactivate();
for (size_t i = 0; i < num_consumers; i++)
{
tasks[i]->stream_cancelled = false;
tasks[i]->holder->deactivate();
}
LOG_TRACE(log, "Closing consumers");
for (size_t i = 0; i < num_created_consumers; ++i)
@ -368,6 +373,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number)
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
@ -464,8 +470,9 @@ bool StorageKafka::checkDependencies(const StorageID & table_id)
return true;
}
void StorageKafka::threadFunc()
void StorageKafka::threadFunc(size_t idx)
{
auto& stream_cancelled = tasks[idx]->stream_cancelled;
try
{
auto table_id = getStorageID();
@ -508,7 +515,9 @@ void StorageKafka::threadFunc()
// Wait for attached views
if (!stream_cancelled)
task->scheduleAfter(RESCHEDULE_MS);
{
tasks[idx]->holder->scheduleAfter(RESCHEDULE_MS);
}
}
@ -535,44 +544,27 @@ bool StorageKafka::streamToViews()
InterpreterInsertQuery interpreter(insert, *kafka_context, false, true, true);
auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed
? kafka_settings->kafka_flush_interval_ms
: global_context.getSettingsRef().stream_flush_interval_ms;
limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed
? kafka_settings->kafka_flush_interval_ms
: global_context.getSettingsRef().stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
}
// Join multiple streams if necessary
BlockInputStreamPtr in;
if (streams.size() > 1)
in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
else
in = streams[0];
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
copyData(*stream, *block_io.out, &stub);
bool some_stream_is_stalled = false;
for (auto & stream : streams)
{
some_stream_is_stalled = some_stream_is_stalled || stream->as<KafkaBlockInputStream>()->isStalled();
stream->as<KafkaBlockInputStream>()->commit();
}
some_stream_is_stalled = some_stream_is_stalled || stream->isStalled();
stream->commit();
return some_stream_is_stalled;
}

View File

@ -94,8 +94,14 @@ private:
std::mutex mutex;
// Stream thread
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
struct TaskContext {
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled {false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_))
{
}
};
std::vector<std::shared_ptr<TaskContext>> tasks;
SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
@ -103,7 +109,7 @@ private:
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc();
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const;