mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
Normal Projections analysis using query plan [In progress]
This commit is contained in:
parent
e6e486c4a1
commit
522a39f93f
@ -93,7 +93,8 @@ using Stack = std::vector<Frame>;
|
|||||||
void optimizePrimaryKeyCondition(const Stack & stack);
|
void optimizePrimaryKeyCondition(const Stack & stack);
|
||||||
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
|
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
|
||||||
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
|
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
|
||||||
bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &);
|
bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
|
||||||
|
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
|
||||||
|
|
||||||
/// Enable memory bound merging of aggregation states for remote queries
|
/// Enable memory bound merging of aggregation states for remote queries
|
||||||
/// in case it was enabled for local plan
|
/// in case it was enabled for local plan
|
||||||
|
@ -122,7 +122,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
|
|||||||
optimizeReadInOrder(*frame.node, nodes);
|
optimizeReadInOrder(*frame.node, nodes);
|
||||||
|
|
||||||
if (optimization_settings.optimize_projection)
|
if (optimization_settings.optimize_projection)
|
||||||
applied_projection |= optimizeUseProjections(*frame.node, nodes);
|
applied_projection |= optimizeUseAggProjections(*frame.node, nodes);
|
||||||
|
|
||||||
if (optimization_settings.aggregation_in_order)
|
if (optimization_settings.aggregation_in_order)
|
||||||
optimizeAggregationInOrder(*frame.node, nodes);
|
optimizeAggregationInOrder(*frame.node, nodes);
|
||||||
@ -140,6 +140,14 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (optimization_settings.optimize_projection)
|
||||||
|
{
|
||||||
|
bool applied = optimizeUseNormalProjections(stack, nodes);
|
||||||
|
applied_projection |= applied;
|
||||||
|
if (applied && stack.back().next_child == 0)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
optimizePrimaryKeyCondition(stack);
|
optimizePrimaryKeyCondition(stack);
|
||||||
enableMemoryBoundMerging(*frame.node, nodes);
|
enableMemoryBoundMerging(*frame.node, nodes);
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||||
#include <Storages/StorageReplicatedMergeTree.h>
|
#include <Storages/StorageReplicatedMergeTree.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
#include <Processors/QueryPlan/UnionStep.h>
|
||||||
#include <AggregateFunctions/AggregateFunctionCount.h>
|
#include <AggregateFunctions/AggregateFunctionCount.h>
|
||||||
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
||||||
#include <Processors/Sources/NullSource.h>
|
#include <Processors/Sources/NullSource.h>
|
||||||
@ -167,6 +168,16 @@ struct AggregateProjectionCandidate
|
|||||||
size_t sum_marks = 0;
|
size_t sum_marks = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct NormalProjectionCandidate
|
||||||
|
{
|
||||||
|
const ProjectionDescription * projection;
|
||||||
|
|
||||||
|
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
|
||||||
|
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
|
||||||
|
|
||||||
|
size_t sum_marks = 0;
|
||||||
|
};
|
||||||
|
|
||||||
ActionsDAGPtr analyzeAggregateProjection(
|
ActionsDAGPtr analyzeAggregateProjection(
|
||||||
const AggregateProjectionInfo & info,
|
const AggregateProjectionInfo & info,
|
||||||
ActionsDAG & query_dag,
|
ActionsDAG & query_dag,
|
||||||
@ -430,7 +441,7 @@ ActionsDAGPtr analyzeAggregateProjection(
|
|||||||
return proj_dag;
|
return proj_dag;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
|
bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
|
||||||
{
|
{
|
||||||
if (node.children.size() != 1)
|
if (node.children.size() != 1)
|
||||||
return false;
|
return false;
|
||||||
@ -724,4 +735,226 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
|
||||||
|
{
|
||||||
|
const auto & frame = stack.back();
|
||||||
|
|
||||||
|
auto * reading = typeid_cast<ReadFromMergeTree *>(frame.node->step.get());
|
||||||
|
if (!reading)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto iter = stack.rbegin();
|
||||||
|
while (iter != stack.rend())
|
||||||
|
{
|
||||||
|
auto next = std::next(iter);
|
||||||
|
|
||||||
|
if (!typeid_cast<FilterStep *>(next->node->step.get()) &&
|
||||||
|
!typeid_cast<ExpressionStep *>(next->node->step.get()))
|
||||||
|
break;
|
||||||
|
|
||||||
|
iter = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter == stack.rbegin())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const auto metadata = reading->getStorageMetadata();
|
||||||
|
const auto & projections = metadata->projections;
|
||||||
|
|
||||||
|
std::vector<const ProjectionDescription *> normal_projections;
|
||||||
|
for (const auto & projection : projections)
|
||||||
|
if (projection.type == ProjectionDescription::Type::Normal)
|
||||||
|
normal_projections.push_back(&projection);
|
||||||
|
|
||||||
|
if (normal_projections.empty())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
ActionsDAGPtr dag;
|
||||||
|
ActionsDAG::NodeRawConstPtrs filter_nodes;
|
||||||
|
if (!buildAggregatingDAG(*iter->node->children.front(), dag, filter_nodes))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG());
|
||||||
|
|
||||||
|
const ActionsDAG::Node * filter_node = nullptr;
|
||||||
|
if (!filter_nodes.empty())
|
||||||
|
{
|
||||||
|
filter_node = filter_nodes.front();
|
||||||
|
if (filter_nodes.size() > 1)
|
||||||
|
{
|
||||||
|
FunctionOverloadResolverPtr func_builder_and =
|
||||||
|
std::make_unique<FunctionToOverloadResolverAdaptor>(
|
||||||
|
std::make_shared<FunctionAnd>());
|
||||||
|
|
||||||
|
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
|
||||||
|
}
|
||||||
|
|
||||||
|
dag->getOutputs().push_back(filter_node);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::list<NormalProjectionCandidate> candidates;
|
||||||
|
NormalProjectionCandidate * best_candidate = nullptr;
|
||||||
|
|
||||||
|
const Block & header = frame.node->step->getOutputStream().header;
|
||||||
|
const Names & required_columns = reading->getRealColumnNames();
|
||||||
|
const auto & parts = reading->getParts();
|
||||||
|
const auto & query_info = reading->getQueryInfo();
|
||||||
|
ContextPtr context = reading->getContext();
|
||||||
|
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
|
||||||
|
|
||||||
|
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
|
||||||
|
if (context->getSettingsRef().select_sequential_consistency)
|
||||||
|
{
|
||||||
|
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
|
||||||
|
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto * projection : normal_projections)
|
||||||
|
{
|
||||||
|
bool has_all_columns = true;
|
||||||
|
for (const auto & col : required_columns)
|
||||||
|
{
|
||||||
|
if (!projection->sample_block.has(col))
|
||||||
|
{
|
||||||
|
has_all_columns = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!has_all_columns)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
MergeTreeData::DataPartsVector projection_parts;
|
||||||
|
MergeTreeData::DataPartsVector normal_parts;
|
||||||
|
for (const auto & part : parts)
|
||||||
|
{
|
||||||
|
const auto & created_projections = part->getProjectionParts();
|
||||||
|
auto it = created_projections.find(projection->name);
|
||||||
|
if (it != created_projections.end())
|
||||||
|
projection_parts.push_back(it->second);
|
||||||
|
else
|
||||||
|
normal_parts.push_back(part);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (projection_parts.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
ActionDAGNodes added_filter_nodes;
|
||||||
|
if (filter_node)
|
||||||
|
added_filter_nodes.nodes.push_back(filter_node);
|
||||||
|
|
||||||
|
auto projection_result_ptr = reader.estimateNumMarksToRead(
|
||||||
|
std::move(projection_parts),
|
||||||
|
nullptr,
|
||||||
|
header.getNames(),
|
||||||
|
metadata,
|
||||||
|
projection->metadata,
|
||||||
|
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
|
||||||
|
added_filter_nodes,
|
||||||
|
context,
|
||||||
|
context->getSettingsRef().max_threads,
|
||||||
|
max_added_blocks);
|
||||||
|
|
||||||
|
if (projection_result_ptr->error())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto & candidate = candidates.emplace_back();
|
||||||
|
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
|
||||||
|
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
|
||||||
|
|
||||||
|
if (!normal_parts.empty())
|
||||||
|
{
|
||||||
|
auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts));
|
||||||
|
|
||||||
|
if (normal_result_ptr->error())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (normal_result_ptr->marks() != 0)
|
||||||
|
{
|
||||||
|
candidate.sum_marks += normal_result_ptr->marks();
|
||||||
|
candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
|
||||||
|
best_candidate = &candidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!best_candidate)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto storage_snapshot = reading->getStorageSnapshot();
|
||||||
|
auto proj_snapshot = std::make_shared<StorageSnapshot>(
|
||||||
|
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data);
|
||||||
|
proj_snapshot->addProjection(best_candidate->projection);
|
||||||
|
|
||||||
|
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
|
||||||
|
|
||||||
|
auto projection_reading = reader.readFromParts(
|
||||||
|
{},
|
||||||
|
header.getNames(),
|
||||||
|
proj_snapshot,
|
||||||
|
query_info,
|
||||||
|
context,
|
||||||
|
reading->getMaxBlockSize(),
|
||||||
|
reading->getNumStreams(),
|
||||||
|
max_added_blocks,
|
||||||
|
best_candidate->merge_tree_projection_select_result_ptr,
|
||||||
|
reading->isParallelReadingEnabled());
|
||||||
|
|
||||||
|
if (!projection_reading)
|
||||||
|
{
|
||||||
|
Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(header.getNames())));
|
||||||
|
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool has_nornal_parts = best_candidate->merge_tree_normal_select_result_ptr != nullptr;
|
||||||
|
if (has_nornal_parts)
|
||||||
|
reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr));
|
||||||
|
|
||||||
|
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure());
|
||||||
|
|
||||||
|
projection_reading->setStepDescription(best_candidate->projection->name);
|
||||||
|
|
||||||
|
auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
|
||||||
|
auto & expr_or_filter_node = nodes.emplace_back();
|
||||||
|
|
||||||
|
if (filter_node)
|
||||||
|
{
|
||||||
|
expr_or_filter_node.step = std::make_unique<FilterStep>(
|
||||||
|
projection_reading_node.step->getOutputStream(),
|
||||||
|
dag,
|
||||||
|
dag->getOutputs().front()->result_name,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
expr_or_filter_node.step = std::make_unique<ExpressionStep>(
|
||||||
|
projection_reading_node.step->getOutputStream(),
|
||||||
|
dag);
|
||||||
|
|
||||||
|
expr_or_filter_node.children.push_back(&projection_reading_node);
|
||||||
|
|
||||||
|
if (!has_nornal_parts)
|
||||||
|
{
|
||||||
|
/// All parts are taken from projection
|
||||||
|
iter->node->children.front() = &expr_or_filter_node;
|
||||||
|
|
||||||
|
//optimizeAggregationInOrder(node, nodes);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto & union_node = nodes.emplace_back();
|
||||||
|
DataStreams input_streams = {iter->node->children.front()->step->getOutputStream(), expr_or_filter_node.step->getOutputStream()};
|
||||||
|
union_node.step = std::make_unique<UnionStep>(std::move(input_streams));
|
||||||
|
union_node.children = {iter->node->children.front(), &expr_or_filter_node};
|
||||||
|
iter->node->children.front() = &union_node;
|
||||||
|
|
||||||
|
iter->next_child = 0;
|
||||||
|
stack.resize(iter.base() - stack.begin() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user