From 0bf4e8e6e912eda3f9d7131cf4581020e69c16e9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 17 Sep 2020 16:22:24 +0300 Subject: [PATCH] Support reading with QueryPlan for StorageView. --- src/Processors/QueryPipeline.h | 2 + src/Processors/QueryPlan/MaterializingStep.h | 2 +- .../QueryPlan/SettingQuotaAndLimitsStep.cpp | 53 +++++++++++++++++++ .../QueryPlan/SettingQuotaAndLimitsStep.h | 42 +++++++++++++++ src/Processors/ya.make | 1 + src/Storages/StorageView.cpp | 52 ++++++++++++++++++ src/Storages/StorageView.h | 13 +++++ 7 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp create mode 100644 src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 45b410ab323..40aabf43ecb 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -99,6 +99,8 @@ public: void addInterpreterContext(std::shared_ptr context) { pipe.addInterpreterContext(std::move(context)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addQueryPlan(std::unique_ptr plan) { pipe.addQueryPlan(std::move(plan)); } + void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); } + void setQuota(const std::shared_ptr & quota) { pipe.setQuota(quota); }; /// For compatibility with IBlockInputStream. void setProgressCallback(const ProgressCallback & callback); diff --git a/src/Processors/QueryPlan/MaterializingStep.h b/src/Processors/QueryPlan/MaterializingStep.h index c1ffcaeb775..72b3133dfe4 100644 --- a/src/Processors/QueryPlan/MaterializingStep.h +++ b/src/Processors/QueryPlan/MaterializingStep.h @@ -4,7 +4,7 @@ namespace DB { -/// Convert one block structure to another. See ConvertingTransform. +/// Materialize constants. See MaterializingTransform. class MaterializingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp new file mode 100644 index 00000000000..73cd459fa5d --- /dev/null +++ b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp @@ -0,0 +1,53 @@ +#include +#include + +namespace DB +{ + +static ITransformingStep::Traits getTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = true, + } + }; +} + +SettingQuotaAndLimitsStep::SettingQuotaAndLimitsStep( + const DataStream & input_stream_, + StoragePtr storage_, + TableLockHolder table_lock_, + StreamLocalLimits limits_, + std::shared_ptr quota_, + std::shared_ptr context_) + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + , storage(std::move(storage_)) + , table_lock(std::move(table_lock_)) + , limits(std::move(limits_)) + , quota(std::move(quota_)) + , context(std::move(context_)) +{ +} + +void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline) +{ + /// Table lock is stored inside pipeline here. + pipeline.addTableLock(table_lock); + + pipeline.setLimits(limits); + + if (quota) + pipeline.setQuota(quota); + + pipeline.addInterpreterContext(std::move(context)); + pipeline.addStorageHolder(std::move(storage)); +} + +} diff --git a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h new file mode 100644 index 00000000000..538d3c35b9d --- /dev/null +++ b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.h @@ -0,0 +1,42 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +class IStorage; +using StoragePtr = std::shared_ptr; + +struct StorageInMemoryMetadata; +using StorageMetadataPtr = std::shared_ptr; + +class EnabledQuota; + +/// Add limits, quota, table_lock and other stuff to pipeline. +/// Doesn't change DataStream. +class SettingQuotaAndLimitsStep : public ITransformingStep +{ +public: + SettingQuotaAndLimitsStep( + const DataStream & input_stream_, + StoragePtr storage_, + TableLockHolder table_lock_, + StreamLocalLimits limits_, + std::shared_ptr quota_, + std::shared_ptr context_); + + String getName() const override { return "SettingQuotaAndLimits"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + StoragePtr storage; + TableLockHolder table_lock; + StreamLocalLimits limits; + std::shared_ptr quota; + std::shared_ptr context; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index cd8857926bb..08de142479b 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -116,6 +116,7 @@ SRCS( QueryPlan/ReadFromStorageStep.cpp QueryPlan/ReadNothingStep.cpp QueryPlan/RollupStep.cpp + QueryPlan/SettingQuotaAndLimitsStep.cpp QueryPlan/TotalsHavingStep.cpp QueryPlan/UnionStep.cpp ResizeProcessor.cpp diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index 4b7733c1cd2..a7cba22bebf 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -16,6 +16,9 @@ #include #include #include +#include +#include +#include namespace DB { @@ -89,6 +92,55 @@ Pipe StorageView::read( return QueryPipeline::getPipe(std::move(pipeline)); } +void StorageView::read( + QueryPlan & query_plan, + TableLockHolder table_lock, + StorageMetadataPtr metadata_snapshot, + StreamLocalLimits & limits, + std::shared_ptr quota, + const Names & column_names, + const SelectQueryInfo & query_info, + std::shared_ptr context, + QueryProcessingStage::Enum /*processed_stage*/, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + ASTPtr current_inner_query = metadata_snapshot->getSelectQuery().inner_query; + + if (query_info.view_query) + { + if (!query_info.view_query->as()) + throw Exception("Unexpected optimized VIEW query", ErrorCodes::LOGICAL_ERROR); + current_inner_query = query_info.view_query->clone(); + } + + InterpreterSelectWithUnionQuery interpreter(current_inner_query, *context, {}, column_names); + interpreter.buildQueryPlan(query_plan); + + /// It's expected that the columns read from storage are not constant. + /// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery. + auto materializing = std::make_unique(query_plan.getCurrentDataStream()); + materializing->setStepDescription("Materialize constants after VIEW subquery"); + query_plan.addStep(std::move(materializing)); + + /// And also convert to expected structure. + auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); + auto converting = std::make_unique(query_plan.getCurrentDataStream(), header); + converting->setStepDescription("Convert VIEW subquery result to VIEW table structure"); + query_plan.addStep(std::move(converting)); + + /// Extend lifetime of context, table lock, storage. Set limits and quota. + auto adding_limits_and_quota = std::make_unique( + query_plan.getCurrentDataStream(), + shared_from_this(), + std::move(table_lock), + limits, + std::move(quota), + std::move(context)); + adding_limits_and_quota->setStepDescription("Set limits and quota for VIEW subquery"); + query_plan.addStep(std::move(adding_limits_and_quota)); +} + static ASTTableExpression * getFirstTableExpression(ASTSelectQuery & select_query) { auto * select_element = select_query.tables()->children[0]->as(); diff --git a/src/Storages/StorageView.h b/src/Storages/StorageView.h index 682c7424b98..79155209ff8 100644 --- a/src/Storages/StorageView.h +++ b/src/Storages/StorageView.h @@ -30,6 +30,19 @@ public: size_t max_block_size, unsigned num_streams) override; + void read( + QueryPlan & query_plan, + TableLockHolder table_lock, + StorageMetadataPtr metadata_snapshot, + StreamLocalLimits & limits, + std::shared_ptr quota, + const Names & column_names, + const SelectQueryInfo & query_info, + std::shared_ptr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + void replaceWithSubquery(ASTSelectQuery & select_query, ASTPtr & view_name, const StorageMetadataPtr & metadata_snapshot) const { replaceWithSubquery(select_query, metadata_snapshot->getSelectQuery().inner_query->clone(), view_name);