diff --git a/src/Processors/QueryPlan/Optimizations/Optimizations.h b/src/Processors/QueryPlan/Optimizations/Optimizations.h
index 0ee2cecb4df..fb791a4bd09 100644
--- a/src/Processors/QueryPlan/Optimizations/Optimizations.h
+++ b/src/Processors/QueryPlan/Optimizations/Optimizations.h
@@ -93,7 +93,8 @@ using Stack = std::vector;
void optimizePrimaryKeyCondition(const Stack & stack);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
-bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &);
+bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
+bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
/// Enable memory bound merging of aggregation states for remote queries
/// in case it was enabled for local plan
diff --git a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp
index d0d634d931d..f8eb4b34316 100644
--- a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp
+++ b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp
@@ -122,7 +122,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
optimizeReadInOrder(*frame.node, nodes);
if (optimization_settings.optimize_projection)
- applied_projection |= optimizeUseProjections(*frame.node, nodes);
+ applied_projection |= optimizeUseAggProjections(*frame.node, nodes);
if (optimization_settings.aggregation_in_order)
optimizeAggregationInOrder(*frame.node, nodes);
@@ -140,6 +140,14 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
continue;
}
+ if (optimization_settings.optimize_projection)
+ {
+ bool applied = optimizeUseNormalProjections(stack, nodes);
+ applied_projection |= applied;
+ if (applied && stack.back().next_child == 0)
+ continue;
+ }
+
optimizePrimaryKeyCondition(stack);
enableMemoryBoundMerging(*frame.node, nodes);
diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp
index 968dfa7631d..b16fa317ef7 100644
--- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp
+++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -167,6 +168,16 @@ struct AggregateProjectionCandidate
size_t sum_marks = 0;
};
+struct NormalProjectionCandidate
+{
+ const ProjectionDescription * projection;
+
+ MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
+ MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
+
+ size_t sum_marks = 0;
+};
+
ActionsDAGPtr analyzeAggregateProjection(
const AggregateProjectionInfo & info,
ActionsDAG & query_dag,
@@ -430,7 +441,7 @@ ActionsDAGPtr analyzeAggregateProjection(
return proj_dag;
}
-bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
+bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)
return false;
@@ -724,4 +735,226 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
return true;
}
+
+bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
+{
+ const auto & frame = stack.back();
+
+ auto * reading = typeid_cast(frame.node->step.get());
+ if (!reading)
+ return false;
+
+ auto iter = stack.rbegin();
+ while (iter != stack.rend())
+ {
+ auto next = std::next(iter);
+
+ if (!typeid_cast(next->node->step.get()) &&
+ !typeid_cast(next->node->step.get()))
+ break;
+
+ iter = next;
+ }
+
+ if (iter == stack.rbegin())
+ return false;
+
+ const auto metadata = reading->getStorageMetadata();
+ const auto & projections = metadata->projections;
+
+ std::vector normal_projections;
+ for (const auto & projection : projections)
+ if (projection.type == ProjectionDescription::Type::Normal)
+ normal_projections.push_back(&projection);
+
+ if (normal_projections.empty())
+ return false;
+
+ ActionsDAGPtr dag;
+ ActionsDAG::NodeRawConstPtrs filter_nodes;
+ if (!buildAggregatingDAG(*iter->node->children.front(), dag, filter_nodes))
+ return false;
+
+ LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG());
+
+ const ActionsDAG::Node * filter_node = nullptr;
+ if (!filter_nodes.empty())
+ {
+ filter_node = filter_nodes.front();
+ if (filter_nodes.size() > 1)
+ {
+ FunctionOverloadResolverPtr func_builder_and =
+ std::make_unique(
+ std::make_shared());
+
+ filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
+ }
+
+ dag->getOutputs().push_back(filter_node);
+ }
+
+ std::list candidates;
+ NormalProjectionCandidate * best_candidate = nullptr;
+
+ const Block & header = frame.node->step->getOutputStream().header;
+ const Names & required_columns = reading->getRealColumnNames();
+ const auto & parts = reading->getParts();
+ const auto & query_info = reading->getQueryInfo();
+ ContextPtr context = reading->getContext();
+ MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
+
+ std::shared_ptr max_added_blocks;
+ if (context->getSettingsRef().select_sequential_consistency)
+ {
+ if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&reading->getMergeTreeData()))
+ max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks());
+ }
+
+ for (const auto * projection : normal_projections)
+ {
+ bool has_all_columns = true;
+ for (const auto & col : required_columns)
+ {
+ if (!projection->sample_block.has(col))
+ {
+ has_all_columns = false;
+ break;
+ }
+ }
+
+ if (!has_all_columns)
+ continue;
+
+ MergeTreeData::DataPartsVector projection_parts;
+ MergeTreeData::DataPartsVector normal_parts;
+ for (const auto & part : parts)
+ {
+ const auto & created_projections = part->getProjectionParts();
+ auto it = created_projections.find(projection->name);
+ if (it != created_projections.end())
+ projection_parts.push_back(it->second);
+ else
+ normal_parts.push_back(part);
+ }
+
+ if (projection_parts.empty())
+ continue;
+
+ ActionDAGNodes added_filter_nodes;
+ if (filter_node)
+ added_filter_nodes.nodes.push_back(filter_node);
+
+ auto projection_result_ptr = reader.estimateNumMarksToRead(
+ std::move(projection_parts),
+ nullptr,
+ header.getNames(),
+ metadata,
+ projection->metadata,
+ query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
+ added_filter_nodes,
+ context,
+ context->getSettingsRef().max_threads,
+ max_added_blocks);
+
+ if (projection_result_ptr->error())
+ continue;
+
+ auto & candidate = candidates.emplace_back();
+ candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
+ candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
+
+ if (!normal_parts.empty())
+ {
+ auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts));
+
+ if (normal_result_ptr->error())
+ continue;
+
+ if (normal_result_ptr->marks() != 0)
+ {
+ candidate.sum_marks += normal_result_ptr->marks();
+ candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
+ }
+ }
+
+ if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
+ best_candidate = &candidate;
+ }
+
+ if (!best_candidate)
+ return false;
+
+ auto storage_snapshot = reading->getStorageSnapshot();
+ auto proj_snapshot = std::make_shared(
+ storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data);
+ proj_snapshot->addProjection(best_candidate->projection);
+
+ LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
+
+ auto projection_reading = reader.readFromParts(
+ {},
+ header.getNames(),
+ proj_snapshot,
+ query_info,
+ context,
+ reading->getMaxBlockSize(),
+ reading->getNumStreams(),
+ max_added_blocks,
+ best_candidate->merge_tree_projection_select_result_ptr,
+ reading->isParallelReadingEnabled());
+
+ if (!projection_reading)
+ {
+ Pipe pipe(std::make_shared(proj_snapshot->getSampleBlockForColumns(header.getNames())));
+ projection_reading = std::make_unique(std::move(pipe));
+ }
+
+ bool has_nornal_parts = best_candidate->merge_tree_normal_select_result_ptr != nullptr;
+ if (has_nornal_parts)
+ reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr));
+
+ LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure());
+
+ projection_reading->setStepDescription(best_candidate->projection->name);
+
+ auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
+ auto & expr_or_filter_node = nodes.emplace_back();
+
+ if (filter_node)
+ {
+ expr_or_filter_node.step = std::make_unique(
+ projection_reading_node.step->getOutputStream(),
+ dag,
+ dag->getOutputs().front()->result_name,
+ true);
+ }
+ else
+ expr_or_filter_node.step = std::make_unique(
+ projection_reading_node.step->getOutputStream(),
+ dag);
+
+ expr_or_filter_node.children.push_back(&projection_reading_node);
+
+ if (!has_nornal_parts)
+ {
+ /// All parts are taken from projection
+ iter->node->children.front() = &expr_or_filter_node;
+
+ //optimizeAggregationInOrder(node, nodes);
+ }
+ else
+ {
+ auto & union_node = nodes.emplace_back();
+ DataStreams input_streams = {iter->node->children.front()->step->getOutputStream(), expr_or_filter_node.step->getOutputStream()};
+ union_node.step = std::make_unique(std::move(input_streams));
+ union_node.children = {iter->node->children.front(), &expr_or_filter_node};
+ iter->node->children.front() = &union_node;
+
+ iter->next_child = 0;
+ stack.resize(iter.base() - stack.begin() + 1);
+ }
+
+ return true;
+}
+
}