From 50674a320fc083122c58a1023a11bc50dadfe570 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 18 Sep 2020 14:39:07 +0300 Subject: [PATCH] Refactor IStorage::read with query plan. --- src/Interpreters/InterpreterSelectQuery.cpp | 18 +++++- src/Processors/QueryPipeline.h | 1 + .../QueryPlan/ReadFromStorageStep.cpp | 55 +++-------------- .../QueryPlan/ReadFromStorageStep.h | 21 +------ .../QueryPlan/SettingQuotaAndLimitsStep.cpp | 17 +++++- .../QueryPlan/SettingQuotaAndLimitsStep.h | 4 +- src/Storages/IStorage.cpp | 11 +--- src/Storages/IStorage.h | 18 +++--- src/Storages/StorageView.cpp | 60 +++---------------- src/Storages/StorageView.h | 7 +-- 10 files changed, 66 insertions(+), 146 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index cd2c16813b4..073198c3a27 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include #include @@ -1456,8 +1456,20 @@ void InterpreterSelectQuery::executeFetchColumns( if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete)) quota = context->getQuota(); - storage->read(query_plan, table_lock, metadata_snapshot, limits, leaf_limits, std::move(quota), - required_columns, query_info, context, processing_stage, max_block_size, max_streams); + storage->read(query_plan, required_columns, metadata_snapshot, + query_info, *context, processing_stage, max_block_size, max_streams); + + /// Extend lifetime of context, table lock, storage. Set limits and quota. + auto adding_limits_and_quota = std::make_unique( + query_plan.getCurrentDataStream(), + storage, + std::move(table_lock), + limits, + leaf_limits, + std::move(quota), + std::move(context)); + adding_limits_and_quota->setStepDescription("Set limits and quota after reading from storage"); + query_plan.addStep(std::move(adding_limits_and_quota)); } else throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR); diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 03547170286..f923df43d76 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -100,6 +100,7 @@ public: void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addQueryPlan(std::unique_ptr plan) { pipe.addQueryPlan(std::move(plan)); } void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); } + void setLeafLimits(const SizeLimits & limits) { pipe.setLeafLimits(limits); } void setQuota(const std::shared_ptr & quota) { pipe.setQuota(quota); } /// For compatibility with IBlockInputStream. diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.cpp b/src/Processors/QueryPlan/ReadFromStorageStep.cpp index b085c177ad4..a88a396193b 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromStorageStep.cpp @@ -12,35 +12,19 @@ namespace DB { ReadFromStorageStep::ReadFromStorageStep( - TableLockHolder table_lock_, - StorageMetadataPtr metadata_snapshot_, - StreamLocalLimits & limits_, - SizeLimits & leaf_limits_, - std::shared_ptr quota_, - StoragePtr storage_, - const Names & required_columns_, - const SelectQueryInfo & query_info_, - std::shared_ptr context_, - QueryProcessingStage::Enum processing_stage_, - size_t max_block_size_, - size_t max_streams_) - : table_lock(std::move(table_lock_)) - , metadata_snapshot(std::move(metadata_snapshot_)) - , limits(limits_) - , leaf_limits(leaf_limits_) - , quota(std::move(quota_)) - , storage(std::move(storage_)) - , required_columns(required_columns_) - , query_info(query_info_) - , context(std::move(context_)) - , processing_stage(processing_stage_) - , max_block_size(max_block_size_) - , max_streams(max_streams_) + StoragePtr storage, + const Names & required_columns, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processing_stage, + size_t max_block_size, + size_t max_streams) { /// Note: we read from storage in constructor of step because we don't know real header before reading. /// It will be fixed when storage return QueryPlanStep itself. - Pipe pipe = storage->read(required_columns, metadata_snapshot, query_info, *context, processing_stage, max_block_size, max_streams); + Pipe pipe = storage->read(required_columns, metadata_snapshot, query_info, context, processing_stage, max_block_size, max_streams); if (pipe.empty()) { @@ -83,29 +67,8 @@ ReadFromStorageStep::ReadFromStorageStep( pipeline = std::make_unique(); QueryPipelineProcessorsCollector collector(*pipeline, this); - /// Table lock is stored inside pipeline here. - pipeline->addTableLock(table_lock); - - pipe.setLimits(limits); - - /** - * Leaf size limits should be applied only for local processing of distributed queries. - * Such limits allow to control the read stage on leaf nodes and exclude the merging stage. - * Consider the case when distributed query needs to read from multiple shards. Then leaf - * limits will be applied on the shards only (including the root node) but will be ignored - * on the results merging stage. - */ - if (!storage->isRemote()) - pipe.setLeafLimits(leaf_limits); - - if (quota) - pipe.setQuota(quota); - pipeline->init(std::move(pipe)); - pipeline->addInterpreterContext(std::move(context)); - pipeline->addStorageHolder(std::move(storage)); - processors = collector.detachProcessors(); output_stream = DataStream{.header = pipeline->getHeader(), .has_single_port = pipeline->getNumStreams() == 1}; diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.h b/src/Processors/QueryPlan/ReadFromStorageStep.h index 98cde63a863..59276d13081 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromStorageStep.h @@ -23,15 +23,11 @@ class ReadFromStorageStep : public IQueryPlanStep { public: ReadFromStorageStep( - TableLockHolder table_lock, - StorageMetadataPtr metadata_snapshot, - StreamLocalLimits & limits, - SizeLimits & leaf_limits, - std::shared_ptr quota, StoragePtr storage, const Names & required_columns, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - std::shared_ptr context, + const Context & context, QueryProcessingStage::Enum processing_stage, size_t max_block_size, size_t max_streams); @@ -45,19 +41,6 @@ public: void describePipeline(FormatSettings & settings) const override; private: - TableLockHolder table_lock; - StorageMetadataPtr metadata_snapshot; - StreamLocalLimits limits; - SizeLimits leaf_limits; - std::shared_ptr quota; - - StoragePtr storage; - const Names & required_columns; - const SelectQueryInfo & query_info; - std::shared_ptr context; - QueryProcessingStage::Enum processing_stage; - size_t max_block_size; - size_t max_streams; QueryPipelinePtr pipeline; Processors processors; diff --git a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp index 73cd459fa5d..2a03d1fd82f 100644 --- a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp +++ b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { @@ -24,13 +25,15 @@ SettingQuotaAndLimitsStep::SettingQuotaAndLimitsStep( const DataStream & input_stream_, StoragePtr storage_, TableLockHolder table_lock_, - StreamLocalLimits limits_, + StreamLocalLimits & limits_, + SizeLimits & leaf_limits_, std::shared_ptr quota_, std::shared_ptr context_) : ITransformingStep(input_stream_, input_stream_.header, getTraits()) , storage(std::move(storage_)) , table_lock(std::move(table_lock_)) - , limits(std::move(limits_)) + , limits(limits_) + , leaf_limits(leaf_limits_) , quota(std::move(quota_)) , context(std::move(context_)) { @@ -43,6 +46,16 @@ void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline) pipeline.setLimits(limits); + /** + * Leaf size limits should be applied only for local processing of distributed queries. + * Such limits allow to control the read stage on leaf nodes and exclude the merging stage. + * Consider the case when distributed query needs to read from multiple shards. Then leaf + * limits will be applied on the shards only (including the root node) but will be ignored + * on the results merging stage. + */ + if (!storage->isRemote()) + pipeline.setLeafLimits(leaf_limits); + if (quota) pipeline.setQuota(quota); diff --git a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h index 538d3c35b9d..7ec4cfa91c6 100644 --- a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h +++ b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h @@ -23,7 +23,8 @@ public: const DataStream & input_stream_, StoragePtr storage_, TableLockHolder table_lock_, - StreamLocalLimits limits_, + StreamLocalLimits & limits_, + SizeLimits & leaf_limits_, std::shared_ptr quota_, std::shared_ptr context_); @@ -35,6 +36,7 @@ private: StoragePtr storage; TableLockHolder table_lock; StreamLocalLimits limits; + SizeLimits leaf_limits; std::shared_ptr quota; std::shared_ptr context; }; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 50b36ced19c..73dabca0871 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -94,21 +94,16 @@ Pipe IStorage::read( void IStorage::read( QueryPlan & query_plan, - TableLockHolder table_lock, - StorageMetadataPtr metadata_snapshot, - StreamLocalLimits & limits, - SizeLimits & leaf_limits, - std::shared_ptr quota, const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - std::shared_ptr context, + const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) { auto read_step = std::make_unique( - std::move(table_lock), std::move(metadata_snapshot), limits, leaf_limits, std::move(quota), shared_from_this(), - column_names, query_info, std::move(context), processed_stage, max_block_size, num_streams); + shared_from_this(), column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); read_step->setStepDescription("Read from " + getName()); query_plan.addStep(std::move(read_step)); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index dbd18c9558e..58a39497e38 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -285,17 +285,13 @@ public: /// Default implementation creates ReadFromStorageStep and uses usual read. virtual void read( QueryPlan & query_plan, - TableLockHolder table_lock, - StorageMetadataPtr metadata_snapshot, - StreamLocalLimits & limits, - SizeLimits & leaf_limits, - std::shared_ptr quota, - const Names & column_names, - const SelectQueryInfo & query_info, - std::shared_ptr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams); + const Names & /*column_names*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + unsigned /*num_streams*/); /** Writes the data to a table. * Receives a description of the query, which can contain information about the data write method. diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index a7cba22bebf..e71228f2a23 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -55,52 +55,21 @@ Pipe StorageView::read( const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum /*processed_stage*/, - const size_t /*max_block_size*/, - const unsigned /*num_streams*/) + QueryProcessingStage::Enum processed_stage, + const size_t max_block_size, + const unsigned num_streams) { - Pipes pipes; - - ASTPtr current_inner_query = metadata_snapshot->getSelectQuery().inner_query; - - if (query_info.view_query) - { - if (!query_info.view_query->as()) - throw Exception("Unexpected optimized VIEW query", ErrorCodes::LOGICAL_ERROR); - current_inner_query = query_info.view_query->clone(); - } - - InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names); - - auto pipeline = interpreter.execute().pipeline; - - /// It's expected that the columns read from storage are not constant. - /// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery. - pipeline.addSimpleTransform([](const Block & header) - { - return std::make_shared(header); - }); - - /// And also convert to expected structure. - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, metadata_snapshot->getSampleBlockForColumns( - column_names, getVirtuals(), getStorageID()), ConvertingTransform::MatchColumnsMode::Name); - }); - - return QueryPipeline::getPipe(std::move(pipeline)); + QueryPlan plan; + read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); } void StorageView::read( QueryPlan & query_plan, - TableLockHolder table_lock, - StorageMetadataPtr metadata_snapshot, - StreamLocalLimits & limits, - std::shared_ptr quota, const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - std::shared_ptr context, + const Context & context, QueryProcessingStage::Enum /*processed_stage*/, const size_t /*max_block_size*/, const unsigned /*num_streams*/) @@ -114,7 +83,7 @@ void StorageView::read( current_inner_query = query_info.view_query->clone(); } - InterpreterSelectWithUnionQuery interpreter(current_inner_query, *context, {}, column_names); + InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names); interpreter.buildQueryPlan(query_plan); /// It's expected that the columns read from storage are not constant. @@ -128,17 +97,6 @@ void StorageView::read( auto converting = std::make_unique(query_plan.getCurrentDataStream(), header); converting->setStepDescription("Convert VIEW subquery result to VIEW table structure"); query_plan.addStep(std::move(converting)); - - /// Extend lifetime of context, table lock, storage. Set limits and quota. - auto adding_limits_and_quota = std::make_unique( - query_plan.getCurrentDataStream(), - shared_from_this(), - std::move(table_lock), - limits, - std::move(quota), - std::move(context)); - adding_limits_and_quota->setStepDescription("Set limits and quota for VIEW subquery"); - query_plan.addStep(std::move(adding_limits_and_quota)); } static ASTTableExpression * getFirstTableExpression(ASTSelectQuery & select_query) diff --git a/src/Storages/StorageView.h b/src/Storages/StorageView.h index 79155209ff8..1b43888baf3 100644 --- a/src/Storages/StorageView.h +++ b/src/Storages/StorageView.h @@ -32,13 +32,10 @@ public: void read( QueryPlan & query_plan, - TableLockHolder table_lock, - StorageMetadataPtr metadata_snapshot, - StreamLocalLimits & limits, - std::shared_ptr quota, const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - std::shared_ptr context, + const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;