From 53b006dd5cef3501d01e781025e99f097dfa7c36 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 24 Feb 2023 19:59:13 +0000 Subject: [PATCH] Refactor a bit more. --- .../Optimizations/optimizeUseProjections.cpp | 122 ++++++++++-------- 1 file changed, 67 insertions(+), 55 deletions(-) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp index 18b6140686d..8d21539a252 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp @@ -124,7 +124,7 @@ bool QueryDAG::build(QueryPlan::Node & node) return false; } -bool canUseProjectionForReadingStep(ReadFromMergeTree * reading) +static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading) { /// Probably some projection already was applied. if (reading->hasAnalyzedResult()) @@ -615,8 +615,8 @@ ActionsDAGPtr analyzeAggregateProjection( struct MinMaxProjectionCandidate { AggregateProjectionCandidate candidate; - Block minmax_count_projection_block; - MergeTreeData::DataPartsVector minmax_projection_normal_parts; + Block block; + MergeTreeData::DataPartsVector normal_parts; }; struct AggregateProjectionCandidates @@ -693,8 +693,8 @@ AggregateProjectionCandidates getAggregateProjectionCandidates( { MinMaxProjectionCandidate minmax; minmax.candidate = std::move(candidate); - minmax.minmax_count_projection_block = std::move(block); - minmax.minmax_projection_normal_parts = std::move(minmax_projection_normal_parts); + minmax.block = std::move(block); + minmax.normal_parts = std::move(minmax_projection_normal_parts); minmax.candidate.projection = projection; candidates.minmax_projection.emplace(std::move(minmax)); } @@ -722,6 +722,19 @@ AggregateProjectionCandidates getAggregateProjectionCandidates( return candidates; } +static std::shared_ptr getMaxAddedBlocks(ReadFromMergeTree * reading) +{ + ContextPtr context = reading->getContext(); + + if (context->getSettingsRef().select_sequential_consistency) + { + if (const auto * replicated = dynamic_cast(&reading->getMergeTreeData())) + return std::make_shared(replicated->getMaxAddedBlocks()); + } + + return {}; +} + bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) { if (node.children.size() != 1) @@ -745,15 +758,7 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) if (!canUseProjectionForReadingStep(reading)) return false; - const auto metadata = reading->getStorageMetadata(); - ContextPtr context = reading->getContext(); - - std::shared_ptr max_added_blocks; - if (context->getSettingsRef().select_sequential_consistency) - { - if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&reading->getMergeTreeData())) - max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); - } + std::shared_ptr max_added_blocks = getMaxAddedBlocks(reading); auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks); @@ -763,10 +768,13 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) else if (candidates.real.empty()) return false; - MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); const auto & parts = reading->getParts(); const auto & query_info = reading->getQueryInfo(); + const auto metadata = reading->getStorageMetadata(); + ContextPtr context = reading->getContext(); + MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); + /// Selecting best candidate. for (auto & candidate : candidates.real) { MergeTreeData::DataPartsVector projection_parts; @@ -830,25 +838,28 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) QueryPlanStepPtr projection_reading; bool has_nornal_parts; + /// Add reading from projection step. if (candidates.minmax_projection) { - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", candidates.minmax_projection->minmax_count_projection_block.dumpStructure()); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", + // candidates.minmax_projection->block.dumpStructure()); - Pipe pipe(std::make_shared(std::move(candidates.minmax_projection->minmax_count_projection_block))); + Pipe pipe(std::make_shared(std::move(candidates.minmax_projection->block))); projection_reading = std::make_unique(std::move(pipe)); - has_nornal_parts = !candidates.minmax_projection->minmax_projection_normal_parts.empty(); + has_nornal_parts = !candidates.minmax_projection->normal_parts.empty(); if (has_nornal_parts) - reading->resetParts(std::move(candidates.minmax_projection->minmax_projection_normal_parts)); + reading->resetParts(std::move(candidates.minmax_projection->normal_parts)); } else { auto storage_snapshot = reading->getStorageSnapshot(); auto proj_snapshot = std::make_shared( - storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); + storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); proj_snapshot->addProjection(best_candidate->projection); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", + // proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; @@ -867,7 +878,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) if (!projection_reading) { - Pipe pipe(std::make_shared(proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames()))); + auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames()); + Pipe pipe(std::make_shared(std::move(header))); projection_reading = std::make_unique(std::move(pipe)); } @@ -876,7 +888,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr)); } - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure()); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", + // projection_reading->getOutputStream().header.dumpStructure()); projection_reading->setStepDescription(best_candidate->projection->name); @@ -901,12 +914,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) if (!has_nornal_parts) { /// All parts are taken from projection - - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Expr stream {}", expr_or_filter_node.step->getOutputStream().header.dumpStructure()); aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream()); node.children.front() = &expr_or_filter_node; - - //optimizeAggregationInOrder(node, nodes); } else { @@ -917,7 +926,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) return true; } -ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header) + +static ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header) { /// Materialize constants in case we don't have it in output header. /// This may happen e.g. if we have PREWHERE. @@ -951,6 +961,16 @@ ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_h return dag; } +static bool hasAllRequiredColumns(const ProjectionDescription * projection, const Names & required_columns) +{ + for (const auto & col : required_columns) + { + if (!projection->sample_block.has(col)) + return false; + } + + return true; +} bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) { @@ -1009,28 +1029,14 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) auto ordinary_reading_select_result = reading->selectRangesToRead(parts); size_t ordinary_reading_marks = ordinary_reading_select_result->marks(); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Marks for ordinary reading {}", ordinary_reading_marks); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), + // "Marks for ordinary reading {}", ordinary_reading_marks); - std::shared_ptr max_added_blocks; - if (context->getSettingsRef().select_sequential_consistency) - { - if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&reading->getMergeTreeData())) - max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); - } + std::shared_ptr max_added_blocks = getMaxAddedBlocks(reading); for (const auto * projection : normal_projections) { - bool has_all_columns = true; - for (const auto & col : required_columns) - { - if (!projection->sample_block.has(col)) - { - has_all_columns = false; - break; - } - } - - if (!has_all_columns) + if (!hasAllRequiredColumns(projection, required_columns)) continue; MergeTreeData::DataPartsVector projection_parts; @@ -1086,7 +1092,15 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) } } - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Marks for projection {} {}", projection->name ,candidate.sum_marks); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), + // "Marks for projection {} {}", projection->name ,candidate.sum_marks); + + // if (candidate.sum_marks > ordinary_reading_marks) + // continue; + + // if (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks) + // best_candidate = &candidate; + if (candidate.sum_marks < ordinary_reading_marks && (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks)) best_candidate = &candidate; } @@ -1102,7 +1116,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); proj_snapshot->addProjection(best_candidate->projection); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", + // proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); auto query_info_copy = query_info; query_info_copy.prewhere_info = nullptr; @@ -1129,7 +1144,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) if (has_nornal_parts) reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr)); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure()); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", + // projection_reading->getOutputStream().header.dumpStructure()); projection_reading->setStepDescription(best_candidate->projection->name); @@ -1169,12 +1185,6 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) if (auto materializing = makeMaterializingDAG(proj_stream->header, main_stream.header)) { - // auto convert_actions_dag = ActionsDAG::makeConvertingActions( - // proj_stream->header.getColumnsWithTypeAndName(), - // main_stream.header.getColumnsWithTypeAndName(), - // ActionsDAG::MatchColumnsMode::Name, - // true); - auto converting = std::make_unique(*proj_stream, materializing); proj_stream = &converting->getOutputStream(); auto & expr_node = nodes.emplace_back(); @@ -1189,6 +1199,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) union_node.children = {iter->node->children.front(), next_node}; iter->node->children.front() = &union_node; + /// Here we remove last steps from stack to be able to optimize again. + /// In theory, read-in-order can be applied to projection. iter->next_child = 0; stack.resize(iter.base() - stack.begin() + 1); }