From cd6414639ee0c08f92d43b96974c7d91517b7b89 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Thu, 22 Apr 2021 21:32:17 +0800 Subject: [PATCH] add metadata_snapshot to getQueryProcessingStage --- src/Interpreters/InterpreterSelectQuery.cpp | 4 ++++ .../Transforms/ProjectionPartTransform.h | 1 + src/Storages/IStorage.h | 3 ++- src/Storages/LiveView/StorageBlocks.h | 6 +++++- src/Storages/SelectQueryInfo.h | 11 +++++++++++ src/Storages/StorageBuffer.cpp | 8 ++++++-- src/Storages/StorageBuffer.h | 3 ++- src/Storages/StorageDistributed.cpp | 6 ++++-- src/Storages/StorageDistributed.h | 3 ++- src/Storages/StorageMaterializedView.cpp | 7 +++++-- src/Storages/StorageMaterializedView.h | 3 ++- src/Storages/StorageMerge.cpp | 14 ++++++++++---- src/Storages/StorageMerge.h | 3 ++- src/Storages/StorageProxy.h | 7 +++++-- src/Storages/StorageS3Cluster.cpp | 2 +- src/Storages/StorageS3Cluster.h | 3 ++- src/Storages/tests/gtest_storage_log.cpp | 2 +- 17 files changed, 65 insertions(+), 21 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 1a1b5ac914a..d695284d7f9 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -432,6 +432,10 @@ InterpreterSelectQuery::InterpreterSelectQuery( // ugly but works metadata_snapshot->selected_projection = query_info.aggregate_projection; + // TODO In expression analyzer we need to check if storage mayBenefitFromIndexForIn. In case + // we use some projection to execute the query, we need to provide what projection we are + // going to use here. + // It analyzes aggregation query_analyzer = std::make_unique( query_ptr, syntax_analyzer_result, context, metadata_snapshot, NameSet(required_result_column_names.begin(), required_result_column_names.end()), diff --git a/src/Processors/Transforms/ProjectionPartTransform.h b/src/Processors/Transforms/ProjectionPartTransform.h index de51455f710..ac2438dae0f 100644 --- a/src/Processors/Transforms/ProjectionPartTransform.h +++ b/src/Processors/Transforms/ProjectionPartTransform.h @@ -39,6 +39,7 @@ protected: private: Block projection; + // Hold projection's parent parts duration execution MergeTreeData::DataPartsVector parent_parts; }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 007bb6b6710..77561615e15 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -235,7 +235,8 @@ public: * QueryProcessingStage::Enum required for Distributed over Distributed, * since it cannot return Complete for intermediate queries never. */ - virtual QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const + virtual QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const { return QueryProcessingStage::FetchColumns; } diff --git a/src/Storages/LiveView/StorageBlocks.h b/src/Storages/LiveView/StorageBlocks.h index f4ba8d7b09c..6cf7ce59fa2 100644 --- a/src/Storages/LiveView/StorageBlocks.h +++ b/src/Storages/LiveView/StorageBlocks.h @@ -33,7 +33,11 @@ public: bool supportsSampling() const override { return true; } bool supportsFinal() const override { return true; } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override { return to_stage; } + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override + { + return to_stage; + } Pipe read( const Names & /*column_names*/, diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 1321c4ec31d..0ebdda6932e 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -113,6 +113,8 @@ struct InputOrderInfo bool operator !=(const InputOrderInfo & other) const { return !(*this == other); } }; +class IMergeTreeDataPart; + /** Query along with some additional data, * that can be used during query processing * inside storage engines. @@ -144,10 +146,19 @@ struct SelectQueryInfo ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; } + /// If not null, it means we choose an aggregate projection to execute current query. const ProjectionDescription * aggregate_projection{}; ProjectionKeyActions key_actions; Names projection_names; Block projection_block; + + /// Store to-be-scanned data parts if some aggregate projection is used + using DataPart = IMergeTreeDataPart; + using DataPartPtr = std::shared_ptr; + using DataPartsVector = std::vector; + DataPartsVector projection_parts; + DataPartsVector parent_parts; + DataPartsVector normal_parts; }; } diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 08c55c79775..bfc282ed4eb 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -178,7 +178,11 @@ private: }; -QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const +QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage( + ContextPtr local_context, + QueryProcessingStage::Enum to_stage, + const StorageMetadataPtr &, + SelectQueryInfo & query_info) const { if (destination_id) { @@ -187,7 +191,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(ContextPtr loc if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - return destination->getQueryProcessingStage(local_context, to_stage, query_info); + return destination->getQueryProcessingStage(local_context, to_stage, destination->getInMemoryMetadataPtr(), query_info); } return QueryProcessingStage::FetchColumns; diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 1747c024a74..2224bce14b9 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -58,7 +58,8 @@ public: std::string getName() const override { return "Buffer"; } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override; + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; Pipe read( const Names & column_names, diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index a402c3e0218..fa2f77d49a7 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -459,10 +459,12 @@ StorageDistributed::StorageDistributed( } QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( - ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const + ContextPtr local_context, + QueryProcessingStage::Enum to_stage, + const StorageMetadataPtr & metadata_snapshot, + SelectQueryInfo & query_info) const { const auto & settings = local_context->getSettingsRef(); - auto metadata_snapshot = getInMemoryMetadataPtr(); ClusterPtr cluster = getCluster(); query_info.cluster = cluster; diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 886a8e032de..241c6ddb1aa 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -55,7 +55,8 @@ public: bool isRemote() const override { return true; } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override; + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; Pipe read( const Names & column_names, diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 89b8bc72526..27cd649aae4 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -119,9 +119,12 @@ StorageMaterializedView::StorageMaterializedView( } QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage( - ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const + ContextPtr local_context, + QueryProcessingStage::Enum to_stage, + const StorageMetadataPtr &, + SelectQueryInfo & query_info) const { - return getTargetTable()->getQueryProcessingStage(local_context, to_stage, query_info); + return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getInMemoryMetadataPtr(), query_info); } Pipe StorageMaterializedView::read( diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index cda8112a8c3..8f9c8a9d3f1 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -66,7 +66,8 @@ public: void shutdown() override; - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override; + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; StoragePtr getTargetTable() const; StoragePtr tryGetTargetTable() const; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 1110b850ba9..3ab084086b1 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -179,8 +179,11 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, Cont } -QueryProcessingStage::Enum -StorageMerge::getQueryProcessingStage(ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const +QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage( + ContextPtr local_context, + QueryProcessingStage::Enum to_stage, + const StorageMetadataPtr &, + SelectQueryInfo & query_info) const { /// In case of JOIN the first stage (which includes JOIN) /// should be done on the initiator always. @@ -204,7 +207,9 @@ StorageMerge::getQueryProcessingStage(ContextPtr local_context, QueryProcessingS if (table && table.get() != this) { ++selected_table_size; - stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(local_context, to_stage, query_info)); + stage_in_source_tables = std::max( + stage_in_source_tables, + table->getQueryProcessingStage(local_context, to_stage, table->getInMemoryMetadataPtr(), query_info)); } iterator->next(); @@ -352,7 +357,8 @@ Pipe StorageMerge::createSources( return pipe; } - auto storage_stage = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, modified_query_info); + auto storage_stage + = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, false, metadata_snapshot, modified_query_info); if (processed_stage <= storage_stage) { /// If there are only virtual columns in query, you must request at least one other column. diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index ff016952686..f7381bf5193 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -27,7 +27,8 @@ public: bool supportsIndexForIn() const override { return true; } bool supportsSubcolumns() const override { return true; } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override; + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; Pipe read( const Names & column_names, diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index 2c3e9d610b0..205440261b8 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -32,9 +32,12 @@ public: NamesAndTypesList getVirtuals() const override { return getNested()->getVirtuals(); } QueryProcessingStage::Enum getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & ast) const override + ContextPtr context, + QueryProcessingStage::Enum to_stage, + const StorageMetadataPtr &, + SelectQueryInfo & info) const override { - return getNested()->getQueryProcessingStage(context, to_stage, ast); + return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info); } BlockInputStreams watch( diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 8afc0e44023..df5b655bf4e 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -140,7 +140,7 @@ Pipe StorageS3Cluster::read( } QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageMetadataPtr &, SelectQueryInfo &) const { /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index c98840d62fc..b2c8d4a086c 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -36,7 +36,8 @@ public: Pipe read(const Names &, const StorageMetadataPtr &, SelectQueryInfo &, ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, unsigned /*num_streams*/) override; - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, SelectQueryInfo &) const override; + QueryProcessingStage::Enum + getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override; NamesAndTypesList getVirtuals() const override; diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index 41c1b6ac75a..a4886de434c 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -118,7 +118,7 @@ std::string readData(DB::StoragePtr & table, const DB::ContextPtr context) SelectQueryInfo query_info; QueryProcessingStage::Enum stage = table->getQueryProcessingStage( - context, QueryProcessingStage::Complete, query_info); + context, QueryProcessingStage::Complete, metadata_snapshot, query_info); QueryPipeline pipeline; pipeline.init(table->read(column_names, metadata_snapshot, query_info, context, stage, 8192, 1));