diff --git a/src/Processors/QueryPlan/ReadFromStreamLikeEngine.cpp b/src/Processors/QueryPlan/ReadFromStreamLikeEngine.cpp new file mode 100644 index 00000000000..d4e55ca97d4 --- /dev/null +++ b/src/Processors/QueryPlan/ReadFromStreamLikeEngine.cpp @@ -0,0 +1,43 @@ +#include + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int QUERY_NOT_ALLOWED; +} + +ReadFromStreamLikeEngine::ReadFromStreamLikeEngine( + const Names & column_names_, + const StorageSnapshotPtr & storage_snapshot_, + std::shared_ptr storage_limits_, + ContextPtr context_) + : ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}} + , storage_limits{std::move(storage_limits_)} + , context{context_} +{ +} + +void ReadFromStreamLikeEngine::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + if (!context->getSettingsRef().stream_like_engine_allow_direct_select) + throw Exception( + ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`"); + + auto pipe = makePipe(); + + /// Add storage limits. + for (const auto & processor : pipe.getProcessors()) + processor->setStorageLimits(storage_limits); + + /// Add to processors to get processor info through explain pipeline statement. + for (const auto & processor : pipe.getProcessors()) + processors.emplace_back(processor); + + pipeline.init(std::move(pipe)); +} +} diff --git a/src/Processors/QueryPlan/ReadFromStreamLikeEngine.h b/src/Processors/QueryPlan/ReadFromStreamLikeEngine.h new file mode 100644 index 00000000000..7273bd5e247 --- /dev/null +++ b/src/Processors/QueryPlan/ReadFromStreamLikeEngine.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ +class ReadFromStreamLikeEngine : public ISourceStep +{ +public: + ReadFromStreamLikeEngine( + const Names & column_names_, + const StorageSnapshotPtr & storage_snapshot_, + std::shared_ptr storage_limits_, + ContextPtr context_); + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) final; + +protected: + virtual Pipe makePipe() = 0; + + std::shared_ptr storage_limits; + ContextPtr context; +}; +} diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 2eea619d654..ad501c89b35 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -1,7 +1,7 @@ #include +#include #include #include -#include #include #include #include @@ -13,9 +13,12 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -50,6 +53,76 @@ namespace static constexpr auto TMP_SUFFIX = ".tmp"; + +class ReadFromStorageFileLogStep final : public ReadFromStreamLikeEngine +{ +public: + ReadFromStorageFileLogStep( + const Names & column_names_, + StoragePtr storage_, + const StorageSnapshotPtr & storage_snapshot_, + SelectQueryInfo & query_info, + ContextPtr context_) + : ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_} + , column_names{column_names_} + , storage{storage_} + , storage_snapshot{storage_snapshot_} + { + } + + String getName() const override { return "ReadFromStorageFileLog"; } + +private: + Pipe makePipe() final + { + auto & file_log = storage->as(); + if (file_log.mv_attached) + throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views"); + + std::lock_guard lock(file_log.file_infos_mutex); + if (file_log.running_streams) + throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish."); + + file_log.updateFileInfos(); + + /// No files to parse + if (file_log.file_infos.file_names.empty()) + { + LOG_WARNING(file_log.log, "There is a idle table named {}, no files need to parse.", getName()); + return Pipe{}; + } + + auto modified_context = Context::createCopy(context); + + auto max_streams_number = std::min(file_log.filelog_settings->max_threads, file_log.file_infos.file_names.size()); + + /// Each stream responsible for closing it's files and store meta + file_log.openFilesAndSetPos(); + + Pipes pipes; + pipes.reserve(max_streams_number); + for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number) + { + pipes.emplace_back(std::make_shared( + file_log, + storage_snapshot, + modified_context, + column_names, + file_log.getMaxBlockSize(), + file_log.getPollTimeoutMillisecond(), + stream_number, + max_streams_number, + file_log.filelog_settings->handle_error_mode)); + } + + return Pipe::unitePipes(std::move(pipes)); + } + + const Names column_names; + StoragePtr storage; + StorageSnapshotPtr storage_snapshot; +}; + StorageFileLog::StorageFileLog( const StorageID & table_id_, ContextPtr context_, @@ -296,62 +369,19 @@ UInt64 StorageFileLog::getInode(const String & file_name) return file_stat.st_ino; } -Pipe StorageFileLog::read( +void StorageFileLog::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & /* query_info */, - ContextPtr local_context, + SelectQueryInfo & query_info, + ContextPtr query_context, QueryProcessingStage::Enum /* processed_stage */, size_t /* max_block_size */, size_t /* num_streams */) + { - /// If there are MVs depended on this table, we just forbid reading - if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select) - throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, - "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`"); - - if (mv_attached) - throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views"); - - std::lock_guard lock(file_infos_mutex); - if (running_streams) - { - throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish."); - } - - updateFileInfos(); - - /// No files to parse - if (file_infos.file_names.empty()) - { - LOG_WARNING(log, "There is a idle table named {}, no files need to parse.", getName()); - return Pipe{}; - } - - auto modified_context = Context::createCopy(local_context); - - auto max_streams_number = std::min(filelog_settings->max_threads, file_infos.file_names.size()); - - /// Each stream responsible for closing it's files and store meta - openFilesAndSetPos(); - - Pipes pipes; - pipes.reserve(max_streams_number); - for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number) - { - pipes.emplace_back(std::make_shared( - *this, - storage_snapshot, - modified_context, - column_names, - getMaxBlockSize(), - getPollTimeoutMillisecond(), - stream_number, - max_streams_number, - filelog_settings->handle_error_mode)); - } - - return Pipe::unitePipes(std::move(pipes)); + query_plan.addStep(std::make_unique( + column_names, shared_from_this(), storage_snapshot, query_info, std::move(query_context))); } void StorageFileLog::increaseStreams() diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index 3cb6ac1fbbf..e8bc60475a2 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -49,7 +49,8 @@ public: void startup() override; void shutdown(bool is_drop) override; - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -133,6 +134,8 @@ public: const auto & getFileLogSettings() const { return filelog_settings; } private: + friend class ReadFromStorageFileLogStep; + std::unique_ptr filelog_settings; const String path; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index d8386839a6f..dfe9b2d4821 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -178,7 +179,7 @@ struct StorageKafkaInterceptors } }; -class ReadFromStorageKafkaStep final : public ISourceStep +class ReadFromStorageKafkaStep final : public ReadFromStreamLikeEngine { public: ReadFromStorageKafkaStep( @@ -187,44 +188,22 @@ public: const StorageSnapshotPtr & storage_snapshot_, SelectQueryInfo & query_info, ContextPtr context_) - : ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}} + : ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_} , column_names{column_names_} , storage{storage_} , storage_snapshot{storage_snapshot_} - , storage_limits{query_info.storage_limits} - , context{context_} { } String getName() const override { return "ReadFromStorageKafka"; } - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override - { - auto pipe = makePipe(); - - /// Add storage limits. - for (const auto & processor : pipe.getProcessors()) - processor->setStorageLimits(storage_limits); - - /// Add to processors to get processor info through explain pipeline statement. - for (const auto & processor : pipe.getProcessors()) - processors.emplace_back(processor); - - pipeline.init(std::move(pipe)); - } - private: - Pipe makePipe() + Pipe makePipe() final { auto & kafka_storage = storage->as(); if (kafka_storage.shutdown_called) throw Exception(ErrorCodes::ABORTED, "Table is detached"); - if (!context->getSettingsRef().stream_like_engine_allow_direct_select) - throw Exception( - ErrorCodes::QUERY_NOT_ALLOWED, - "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`"); - if (kafka_storage.mv_attached) throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views"); @@ -255,13 +234,10 @@ private: LOG_DEBUG(kafka_storage.log, "Starting reading {} streams", pipes.size()); return Pipe::unitePipes(std::move(pipes)); } - ActionsDAGPtr buildFilterDAG(); const Names column_names; StoragePtr storage; StorageSnapshotPtr storage_snapshot; - std::shared_ptr storage_limits; - ContextPtr context; }; namespace