Delay reading from StorageKafka by creating a query plan step

This commit is contained in:
János Benjamin Antal 2024-01-03 17:13:41 +00:00
parent 570d1c013b
commit 81b5d8fddd
2 changed files with 107 additions and 40 deletions

View File

@ -8,8 +8,10 @@
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
@ -17,18 +19,19 @@
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h> #include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <Storages/MessageQueueSink.h> #include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Kafka/KafkaProducer.h> #include <Storages/Kafka/KafkaProducer.h>
#include <Storages/Kafka/KafkaSettings.h> #include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaSource.h> #include <Storages/Kafka/KafkaSource.h>
#include <Storages/MessageQueueSink.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h> #include <Storages/StorageMaterializedView.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <base/getFQDNOrHostName.h> #include <base/getFQDNOrHostName.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp> #include <boost/algorithm/string/trim.hpp>
@ -37,11 +40,12 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h> #include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Formats/FormatFactory.h>
#include <Storages/ColumnDefault.h> #include <Storages/ColumnDefault.h>
#include <Common/config_version.h> #include <Common/config_version.h>
@ -174,6 +178,92 @@ struct StorageKafkaInterceptors
} }
}; };
class ReadFromStorageKafkaStep final : public ISourceStep
{
public:
ReadFromStorageKafkaStep(
const Names & column_names_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & query_info,
ContextPtr context_)
: ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}}
, column_names{column_names_}
, storage{storage_}
, storage_snapshot{storage_snapshot_}
, storage_limits{query_info.storage_limits}
, context{context_}
{
}
String getName() const override { return "ReadFromStorageKafka"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
auto pipe = makePipe();
/// Add storage limits.
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(storage_limits);
/// Add to processors to get processor info through explain pipeline statement.
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
private:
Pipe makePipe()
{
auto & kafka_storage = storage->as<StorageKafka &>();
if (kafka_storage.shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Table is detached");
if (!context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (kafka_storage.mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(kafka_storage.num_consumers);
auto modified_context = Context::createCopy(context);
modified_context->applySettingsChanges(kafka_storage.settings_adjustments);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < kafka_storage.num_consumers; ++i)
{
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
pipes.emplace_back(std::make_shared<KafkaSource>(
kafka_storage,
storage_snapshot,
modified_context,
column_names,
kafka_storage.log,
1,
kafka_storage.kafka_settings->kafka_commit_on_select));
}
LOG_DEBUG(kafka_storage.log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
}
ActionsDAGPtr buildFilterDAG();
const Names column_names;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<const StorageLimitsList> storage_limits;
ContextPtr context;
};
namespace namespace
{ {
const String CONFIG_KAFKA_TAG = "kafka"; const String CONFIG_KAFKA_TAG = "kafka";
@ -347,45 +437,18 @@ String StorageKafka::getDefaultClientId(const StorageID & table_id_)
return fmt::format("{}-{}-{}-{}", VERSION_NAME, getFQDNOrHostName(), table_id_.database_name, table_id_.table_name); return fmt::format("{}-{}-{}-{}", VERSION_NAME, getFQDNOrHostName(), table_id_.database_name, table_id_.table_name);
} }
void StorageKafka::read(
Pipe StorageKafka::read( QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /* query_info */, SelectQueryInfo & query_info,
ContextPtr local_context, ContextPtr query_context,
QueryProcessingStage::Enum /* processed_stage */, QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */, size_t /* max_block_size */,
size_t /* num_streams */) size_t /* num_streams */)
{ {
if (shutdown_called) query_plan.addStep(std::make_unique<ReadFromStorageKafkaStep>(
throw Exception(ErrorCodes::ABORTED, "Table is detached"); column_names, shared_from_this(), storage_snapshot, query_info, std::move(query_context)));
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_consumers);
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_consumers; ++i)
{
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
pipes.emplace_back(std::make_shared<KafkaSource>(*this, storage_snapshot, modified_context, column_names, log, 1, kafka_settings->kafka_commit_on_select));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
} }

View File

@ -20,6 +20,7 @@ namespace DB
{ {
class StorageSystemKafkaConsumers; class StorageSystemKafkaConsumers;
class ReadFromStorageKafkaStep;
struct StorageKafkaInterceptors; struct StorageKafkaInterceptors;
@ -48,7 +49,8 @@ public:
void startup() override; void startup() override;
void shutdown(bool is_drop) override; void shutdown(bool is_drop) override;
Pipe read( void read(
QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
@ -86,6 +88,8 @@ public:
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; } SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; }
private: private:
friend class ReadFromStorageKafkaStep;
// Configuration and state // Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings; std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info; Macros::MacroExpansionInfo macros_info;