From e2c32ccbca9d1ee13af2f3febee9891ce9692fbb Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 31 Jan 2023 20:33:01 +0000 Subject: [PATCH] Aggregate Projections analysis using query plan [In progress] --- src/Processors/QueryPlan/AggregatingStep.cpp | 23 +++ src/Processors/QueryPlan/AggregatingStep.h | 3 + .../Optimizations/optimizeUseProjections.cpp | 131 +++++++++++++++++- src/Processors/QueryPlan/ReadFromMergeTree.h | 11 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 22 ++- .../MergeTree/MergeTreeDataSelectExecutor.h | 2 +- src/Storages/StorageReplicatedMergeTree.h | 5 +- 7 files changed, 179 insertions(+), 18 deletions(-) diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 4fd6e7c11dd..b52982b0425 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -406,6 +406,11 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B return; } + if (input_streams.size() > 1) + { + + } + /// If there are several sources, then we perform parallel aggregation if (pipeline.getNumStreams() > 1) { @@ -465,6 +470,24 @@ void AggregatingStep::describePipeline(FormatSettings & settings) const } } +void AggregatingStep::requestOnlyMergeForAggregateProjection(const DataStream & input_stream) +{ + auto cur_header = getOutputStream().header; + input_streams.front() = input_stream; + params.only_merge = true; + updateOutputStream(); + assertBlocksHaveEqualStructure(cur_header, getOutputStream().header, "AggregatingStep"); +} + +void AggregatingStep::appendAggregateProjection(const DataStream & input_stream) +{ + input_streams.emplace_back(input_stream); + params.only_merge = true; + auto added_header = appendGroupingColumn(params.getHeader(input_streams.front().header, final), params.keys, !grouping_sets_params.empty(), group_by_use_nulls); + assertBlocksHaveEqualStructure(getOutputStream().header, added_header, "AggregatingStep"); + params.only_merge = false; +} + void AggregatingStep::updateOutputStream() { output_stream = createOutputStream( diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index 0dc06649d2d..2ceca926684 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -60,6 +60,9 @@ public: void applyOrder(SortDescription sort_description_for_merging_, SortDescription group_by_sort_description_); bool memoryBoundMergingWillBeUsed() const; + void requestOnlyMergeForAggregateProjection(const DataStream & input_stream); + void appendAggregateProjection(const DataStream & input_stream); + private: void updateOutputStream() override; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp index 5795687fbb3..2c9ad818ed4 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include namespace DB::QueryPlanOptimizations @@ -16,7 +18,7 @@ namespace DB::QueryPlanOptimizations QueryPlan::Node * findReadingStep(QueryPlan::Node & node) { - IQueryPlanStep * step = node.step.get();\ + IQueryPlanStep * step = node.step.get(); if (auto * reading = typeid_cast(step)) { /// Already read-in-order, skip. @@ -150,6 +152,9 @@ struct AggregateProjectionCandidate AggregateProjectionInfo info; const ProjectionDescription * projection; ActionsDAGPtr dag; + + MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr; + MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr; }; ActionsDAGPtr analyzeAggregateProjection( @@ -360,7 +365,7 @@ ActionsDAGPtr analyzeAggregateProjection( return query_dag.foldActionsByProjection(new_inputs); } -void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &) +void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) { if (node.children.size() != 1) return; @@ -432,6 +437,128 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &) if (candidates.empty()) return; + AggregateProjectionCandidate * best_candidate = nullptr; + size_t best_candidate_marks = 0; + + const auto & parts = reading->getParts(); + const auto & query_info = reading->getQueryInfo(); + + MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); + + std::shared_ptr max_added_blocks; + if (context->getSettingsRef().select_sequential_consistency) + { + if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&reading->getMergeTreeData())) + max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); + } + + for (auto & candidate : candidates) + { + MergeTreeData::DataPartsVector projection_parts; + MergeTreeData::DataPartsVector normal_parts; + for (const auto & part : parts) + { + const auto & created_projections = part->getProjectionParts(); + auto it = created_projections.find(candidate.projection->name); + if (it != created_projections.end()) + projection_parts.push_back(it->second); + else + normal_parts.push_back(part); + } + + if (projection_parts.empty()) + continue; + + ActionDAGNodes added_filter_nodes; + if (filter_node) + added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front()); + + auto projection_result_ptr = reader.estimateNumMarksToRead( + projection_parts, + nullptr, + candidate.dag->getRequiredColumnsNames(), + metadata, + candidate.projection->metadata, + query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes + added_filter_nodes, + context, + context->getSettingsRef().max_threads, + max_added_blocks); + + if (projection_result_ptr->error()) + continue; + + size_t sum_marks = projection_result_ptr->marks(); + + if (!normal_parts.empty()) + { + auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts)); + + if (normal_result_ptr->error()) + continue; + + if (normal_result_ptr->marks() != 0) + { + sum_marks += normal_result_ptr->marks(); + candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr); + } + } + + candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr); + + if (best_candidate == nullptr || best_candidate_marks > sum_marks) + { + best_candidate = &candidate; + best_candidate_marks = sum_marks; + } + } + + if (!best_candidate) + return; + + auto projection_reading = reader.readFromParts( + {}, + best_candidate->dag->getRequiredColumnsNames(), + reading->getStorageSnapshot(), + query_info, + context, + reading->getMaxBlockSize(), + reading->getNumStreams(), + max_added_blocks, + best_candidate->merge_tree_projection_select_result_ptr, + reading->isParallelReadingEnabled()); + + projection_reading->setStepDescription(best_candidate->projection->name); + + if (!best_candidate->merge_tree_normal_select_result_ptr) + { + /// All parts are taken from projection + + auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)}); + auto & expr_or_filter_node = nodes.emplace_back(); + + if (filter_node) + { + expr_or_filter_node.step = std::make_unique( + projection_reading_node.step->getOutputStream(), + best_candidate->dag, + best_candidate->dag->getOutputs().front()->result_name, + true); + } + else + expr_or_filter_node.step = std::make_unique( + projection_reading_node.step->getOutputStream(), + best_candidate->dag); + + expr_or_filter_node.children.push_back(&projection_reading_node); + aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream()); + node.children.front() = &expr_or_filter_node; + + optimizeAggregationInOrder(node, nodes); + + return; + } + } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index a3cea2a8afe..1c614ed09f1 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -147,13 +147,23 @@ public: bool sample_factor_column_queried, Poco::Logger * log); + MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const; + ContextPtr getContext() const { return context; } const SelectQueryInfo & getQueryInfo() const { return query_info; } StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; } + StorageSnapshotPtr getStorageSnapshot() const { return storage_snapshot; } const PrewhereInfo * getPrewhereInfo() const { return prewhere_info.get(); } void requestReadingInOrder(size_t prefix_size, int direction, size_t limit); + const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; } + const MergeTreeData & getMergeTreeData() const { return data; } + const Names & getRealColumnNames() const { return real_column_names; } + size_t getMaxBlockSize() const { return max_block_size; } + size_t getNumStreams() const { return requested_num_streams; } + bool isParallelReadingEnabled() const { return read_task_callback != std::nullopt; } + private: static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl( MergeTreeData::DataPartsVector parts, @@ -232,7 +242,6 @@ private: const Names & column_names, ActionsDAGPtr & out_projection); - MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const; ReadFromMergeTree::AnalysisResult getAnalysisResult() const; MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 1ca1779e4b0..242f86e171c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -156,7 +156,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( if (!query_info.projection) { - auto plan = readFromParts( + auto step = readFromParts( query_info.merge_tree_select_result_ptr ? MergeTreeData::DataPartsVector{} : parts, column_names_to_return, storage_snapshot, @@ -168,12 +168,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( query_info.merge_tree_select_result_ptr, enable_parallel_reading); - if (plan->isInitialized() && settings.allow_experimental_projection_optimization && settings.force_optimize_projection + if (!step && settings.allow_experimental_projection_optimization && settings.force_optimize_projection && !metadata_for_reading->projections.empty()) throw Exception( "No projection is used when allow_experimental_projection_optimization = 1 and force_optimize_projection = 1", ErrorCodes::PROJECTION_NOT_USED); + auto plan = std::make_unique(); + plan->addStep(std::move(step)); return plan; } @@ -197,7 +199,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( else if (query_info.projection->merge_tree_projection_select_result_ptr) { LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", ")); - projection_plan = readFromParts( + projection_plan->addStep(readFromParts( {}, query_info.projection->required_columns, storage_snapshot, @@ -207,7 +209,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( num_streams, max_block_numbers_to_read, query_info.projection->merge_tree_projection_select_result_ptr, - enable_parallel_reading); + enable_parallel_reading)); } if (projection_plan->isInitialized()) @@ -1337,7 +1339,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar log); } -QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( +QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names_to_return, const StorageSnapshotPtr & storage_snapshot, @@ -1353,10 +1355,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( if (merge_tree_select_result_ptr) { if (merge_tree_select_result_ptr->marks() == 0) - return std::make_unique(); + return {}; } else if (parts.empty()) - return std::make_unique(); + return {}; Names real_column_names; Names virt_column_names; @@ -1366,7 +1368,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried); - auto read_from_merge_tree = std::make_unique( + return std::make_unique( std::move(parts), real_column_names, virt_column_names, @@ -1382,10 +1384,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( merge_tree_select_result_ptr, enable_parallel_reading ); - - QueryPlanPtr plan = std::make_unique(); - plan->addStep(std::move(read_from_merge_tree)); - return plan; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 30d09312245..a337574bb64 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -39,7 +39,7 @@ public: bool enable_parallel_reading = false) const; /// The same as read, but with specified set of parts. - QueryPlanPtr readFromParts( + QueryPlanStepPtr readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c3bd682a29c..44f34adf3c0 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -322,12 +322,13 @@ public: const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper); bool canUseZeroCopyReplication() const; -private: - std::atomic_bool are_restoring_replica {false}; /// Get a sequential consistent view of current parts. ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; +private: + std::atomic_bool are_restoring_replica {false}; + /// Delete old parts from disk and from ZooKeeper. void clearOldPartsAndRemoveFromZK();