WIP on StorageMerge

This commit is contained in:
Dmitry Novik 2024-01-31 13:50:15 +00:00
parent 722f3db738
commit 0d21004218
6 changed files with 30 additions and 19 deletions

View File

@ -115,7 +115,7 @@ private:
}
void OptimizeGroupByInjectiveFunctionsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void OptimizeGroupByInjectiveFunctionsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
OptimizeGroupByInjectiveFunctionsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -14,7 +14,7 @@ public:
String getDescription() override { return "Replaces injective functions by it's arguments in GROUP BY section."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -120,7 +120,7 @@ private:
}
void RewriteSumFunctionWithSumAndCountPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void RewriteSumFunctionWithSumAndCountPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
RewriteSumFunctionWithSumAndCountVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Rewrite sum(column +/- literal) into sum(column) and literal * count(column)"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -54,6 +54,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <base/defines.h>
#include <base/range.h>
#include "Common/logger_useful.h"
#include <Common/Exception.h>
#include <Common/assert_cast.h>
#include <Common/checkStackSize.h>
@ -388,7 +389,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
Names column_names_as_aliases;
Aliases aliases;
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, column_names_as_aliases, aliases);
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, column_names, column_names_as_aliases, aliases);
auto source_pipeline = createSources(
child_plan.plan,
@ -524,8 +525,6 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
Names column_names_as_aliases;
Names real_column_names = column_names;
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, column_names_as_aliases, aliases);
const auto & database_name = std::get<0>(table);
const auto & table_name = std::get<3>(table);
auto row_policy_filter_ptr = context->getRowPolicyFilter(
@ -538,6 +537,8 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt->extendNames(real_column_names);
}
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
if (!context->getSettingsRef().allow_experimental_analyzer)
{
auto storage_columns = storage_metadata_snapshot->getColumns();
@ -628,15 +629,14 @@ public:
}
};
bool hasUnknownColumn(const QueryTreeNodePtr & node,
QueryTreeNodePtr original_table_expression,
QueryTreeNodePtr replacement_table_expression)
bool hasUnknownColumn(const QueryTreeNodePtr & node, QueryTreeNodePtr replacement_table_expression)
{
QueryTreeNodes stack = { node };
while (!stack.empty())
{
auto current = stack.back();
stack.pop_back();
LOG_DEBUG(&Poco::Logger::get("hasUnknownColumn"), "Expression: {}", current->formatASTForErrorMessage());
switch (current->getNodeType())
{
@ -646,15 +646,13 @@ bool hasUnknownColumn(const QueryTreeNodePtr & node,
{
auto * column_node = current->as<ColumnNode>();
auto source = column_node->getColumnSourceOrNull();
if (source != original_table_expression)
if (source != replacement_table_expression)
return true;
else
column_node->setColumnSource(replacement_table_expression);
break;
}
default:
{
for (const auto & child : node->getChildren())
for (const auto & child : current->getChildren())
{
if (child)
stack.push_back(child);
@ -670,9 +668,16 @@ QueryTreeNodePtr removeJoin(
QueryTreeNodePtr original_table_expression,
QueryTreeNodePtr replacement_table_expression)
{
LOG_DEBUG(&Poco::Logger::get("removeJoin"), "Entered the function");
auto * query_node = query->as<QueryNode>();
auto join_tree = query_node->getJoinTree();
auto modified_query = query_node->cloneAndReplace(join_tree, replacement_table_expression);
auto modified_query = query_node->cloneAndReplace(original_table_expression, replacement_table_expression);
auto * modified_query_node = modified_query->as<QueryNode>();
modified_query = modified_query->cloneAndReplace(modified_query_node->getJoinTree(), replacement_table_expression);
modified_query_node = modified_query->as<QueryNode>();
query_node = modified_query->as<QueryNode>();
@ -685,11 +690,12 @@ QueryTreeNodePtr removeJoin(
if (join_tree->as<TableNode>() == nullptr && join_tree->as<TableFunctionNode>() == nullptr)
{
auto & projection = query_node->getProjection().getNodes();
auto projection_columns = query_node->getProjectionColumns();
auto & projection = modified_query_node->getProjection().getNodes();
auto projection_columns = modified_query_node->getProjectionColumns();
for (size_t i = 0; i < projection.size();)
{
if (hasUnknownColumn(projection[i], original_table_expression, replacement_table_expression))
LOG_DEBUG(&Poco::Logger::get("removeJoin"), "Processing: {}", i);
if (hasUnknownColumn(projection[i], replacement_table_expression))
{
projection.erase(projection.begin() + i);
projection_columns.erase(projection_columns.begin() + i);
@ -701,6 +707,8 @@ QueryTreeNodePtr removeJoin(
query_node->resolveProjectionColumns(std::move(projection_columns));
}
LOG_DEBUG(&Poco::Logger::get("removeJoin"), "Result:\n{}", modified_query->dumpTree());
return modified_query;
}
@ -709,9 +717,11 @@ QueryTreeNodePtr removeJoin(
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot,
Names real_column_names,
Names & column_names_as_aliases,
Aliases & aliases) const
{
LOG_DEBUG(&Poco::Logger::get("getModifiedQueryInfo"), "Procesing query");
const auto & [database_name, storage, storage_lock, table_name] = storage_with_lock_and_name;
const StorageID current_storage_id = storage->getStorageID();
@ -753,7 +763,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_
if (with_aliases)
{
auto filter_actions_dag = std::make_shared<ActionsDAG>();
for (const auto & column : column_names)
for (const auto & column : real_column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;

View File

@ -192,6 +192,7 @@ private:
SelectQueryInfo getModifiedQueryInfo(const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot,
Names real_column_names,
Names & column_names_as_aliases,
Aliases & aliases) const;