Merge branch 'master' of github.com:ClickHouse/ClickHouse into enable-coverage-for-debug-build

This commit is contained in:
Alexey Milovidov 2024-02-08 22:08:15 +01:00
commit cd0acc512d
118 changed files with 1390 additions and 2575 deletions

View File

@ -401,6 +401,22 @@ jobs:
test_name: Stateless tests (release, s3 storage)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestS3Debug:
needs: [RunConfig, BuilderDebDebug]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (debug, s3 storage)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestS3Tsan:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (tsan, s3 storage)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestAarch64:
needs: [RunConfig, BuilderDebAarch64]
if: ${{ !failure() && !cancelled() }}
@ -517,6 +533,55 @@ jobs:
test_name: Stateful tests (debug)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
# Parallel replicas
FunctionalStatefulTestDebugParallelReplicas:
needs: [RunConfig, BuilderDebDebug]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (debug, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestUBsanParallelReplicas:
needs: [RunConfig, BuilderDebUBsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (ubsan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestMsanParallelReplicas:
needs: [RunConfig, BuilderDebMsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (msan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestTsanParallelReplicas:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (tsan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestAsanParallelReplicas:
needs: [RunConfig, BuilderDebAsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (asan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestReleaseParallelReplicas:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (release, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
########################### ClickBench #######################################################
##############################################################################################
@ -724,6 +789,28 @@ jobs:
runner_type: func-tester-aarch64
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
############################ SQLLOGIC TEST ###################################################
##############################################################################################
SQLLogicTestRelease:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Sqllogic test (release)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
##################################### SQL TEST ###############################################
##############################################################################################
SQLTest:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: SQLTest
runner_type: fuzzer-unit-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
###################################### SQLANCER FUZZERS ######################################
##############################################################################################
SQLancerTestRelease:
@ -756,6 +843,8 @@ jobs:
- FunctionalStatelessTestTsan
- FunctionalStatelessTestMsan
- FunctionalStatelessTestUBsan
- FunctionalStatelessTestS3Debug
- FunctionalStatelessTestS3Tsan
- FunctionalStatefulTestDebug
- FunctionalStatefulTestRelease
- FunctionalStatefulTestAarch64
@ -763,6 +852,12 @@ jobs:
- FunctionalStatefulTestTsan
- FunctionalStatefulTestMsan
- FunctionalStatefulTestUBsan
- FunctionalStatefulTestDebugParallelReplicas
- FunctionalStatefulTestUBsanParallelReplicas
- FunctionalStatefulTestMsanParallelReplicas
- FunctionalStatefulTestTsanParallelReplicas
- FunctionalStatefulTestAsanParallelReplicas
- FunctionalStatefulTestReleaseParallelReplicas
- StressTestDebug
- StressTestAsan
- StressTestTsan
@ -788,6 +883,8 @@ jobs:
- UnitTestsReleaseClang
- SQLancerTestRelease
- SQLancerTestDebug
- SQLLogicTestRelease
- SQLTest
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code

View File

@ -1026,7 +1026,7 @@ jobs:
####################################### libFuzzer ###########################################
#############################################################################################
libFuzzer:
if: ${{ !failure() && !cancelled() && contains(github.event.pull_request.labels.*.name, 'libFuzzer') }}
if: ${{ !failure() && !cancelled() }}
needs: [RunConfig, StyleCheck]
uses: ./.github/workflows/libfuzzer.yml
with:

View File

@ -85,6 +85,7 @@ jobs:
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.build_name}}'
- name: Mark as done
if: ${{ !cancelled() }}
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.build_name}}'
- name: Clean

View File

@ -107,6 +107,7 @@ jobs:
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.test_name}}'
- name: Mark as done
if: ${{ !cancelled() }}
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.test_name}}' --batch ${{matrix.batch}}
- name: Clean

View File

@ -112,7 +112,7 @@ Note that:
For the query to run successfully, the following conditions must be met:
- Both tables must have the same structure.
- Both tables must have the same order by key and the same primary key.
- Both tables must have the same partition key, the same order by key and the same primary key.
- Both tables must have the same indices and projections.
- Both tables must have the same storage policy.

View File

@ -108,6 +108,11 @@ public:
*/
QueryTreeNodePtr getColumnSourceOrNull() const;
void setColumnSource(const QueryTreeNodePtr & source)
{
getSourceWeakPointer() = source;
}
QueryTreeNodeType getNodeType() const override
{
return QueryTreeNodeType::COLUMN;

View File

@ -31,7 +31,7 @@ public:
virtual String getDescription() = 0;
/// Run pass over query tree
virtual void run(QueryTreeNodePtr query_tree_node, ContextPtr context) = 0;
virtual void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) = 0;
};

View File

@ -194,7 +194,7 @@ private:
}
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
AggregateFunctionsArithmericOperationsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Extract arithmeric operations from aggregate functions."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -92,7 +92,7 @@ public:
}
void RewriteArrayExistsToHasPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void RewriteArrayExistsToHasPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
RewriteArrayExistsToHasVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Rewrite arrayExists(func, arr) functions to has(arr, elem) when logically equivalent"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -67,7 +67,7 @@ private:
}
void AutoFinalOnQueryPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void AutoFinalOnQueryPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto visitor = AutoFinalOnQueryPassVisitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -25,7 +25,7 @@ public:
return "Automatically applies final modifier to table expressions in queries if it is supported and if user level final setting is set";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -213,7 +213,7 @@ private:
}
void ComparisonTupleEliminationPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void ComparisonTupleEliminationPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
ComparisonTupleEliminationPassVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Rewrite tuples comparison into equivalent comparison of tuples arguments"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -132,7 +132,7 @@ private:
}
void ConvertOrLikeChainPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void ConvertOrLikeChainPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto or_function_resolver = FunctionFactory::instance().get("or", context);
auto match_function_resolver = FunctionFactory::instance().get("multiMatchAny", context);

View File

@ -14,7 +14,7 @@ public:
String getDescription() override { return "Replaces all the 'or's with {i}like to multiMatchAny"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -718,7 +718,7 @@ public:
}
void ConvertLogicalExpressionToCNFPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void ConvertLogicalExpressionToCNFPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
const auto & settings = context->getSettingsRef();
if (!settings.convert_query_to_cnf)

View File

@ -12,7 +12,7 @@ public:
String getDescription() override { return "Convert logical expression to CNF and apply optimizations using constraints"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -87,7 +87,7 @@ public:
}
void CountDistinctPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void CountDistinctPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
CountDistinctVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
return "Optimize single countDistinct into count over subquery";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -264,7 +264,7 @@ private:
}
void CrossToInnerJoinPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void CrossToInnerJoinPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
CrossToInnerJoinVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -22,7 +22,7 @@ public:
return "Replace CROSS JOIN with INNER JOIN";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -224,7 +224,7 @@ private:
}
void FunctionToSubcolumnsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void FunctionToSubcolumnsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
FunctionToSubcolumnsVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -24,7 +24,7 @@ public:
String getDescription() override { return "Rewrite function to subcolumns, for example tupleElement(column, subcolumn) into column.subcolumn"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -256,7 +256,7 @@ void tryFuseQuantiles(QueryTreeNodePtr query_tree_node, ContextPtr context)
}
void FuseFunctionsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void FuseFunctionsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
tryFuseSumCountAvg(query_tree_node, context);
tryFuseQuantiles(query_tree_node, context);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Replaces several calls of aggregate functions of the same family into one call"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -249,7 +249,7 @@ private:
}
void GroupingFunctionsResolvePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void GroupingFunctionsResolvePass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
GroupingFunctionsResolveVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -24,7 +24,7 @@ public:
String getDescription() override { return "Resolve GROUPING functions based on GROUP BY modifiers"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -73,7 +73,7 @@ private:
}
void IfChainToMultiIfPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void IfChainToMultiIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto multi_if_function_ptr = FunctionFactory::instance().get("multiIf", context);
IfChainToMultiIfPassVisitor visitor(std::move(multi_if_function_ptr), std::move(context));

View File

@ -18,7 +18,7 @@ public:
String getDescription() override { return "Optimize if chain to multiIf"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -57,7 +57,7 @@ public:
}
void IfConstantConditionPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void IfConstantConditionPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
IfConstantConditionVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -21,7 +21,7 @@ public:
String getDescription() override { return "Optimize if, multiIf for constant condition."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -190,7 +190,7 @@ public:
}
void IfTransformStringsToEnumPass::run(QueryTreeNodePtr query, ContextPtr context)
void IfTransformStringsToEnumPass::run(QueryTreeNodePtr & query, ContextPtr context)
{
ConvertStringsToEnumVisitor visitor(std::move(context));
visitor.visit(query);

View File

@ -33,7 +33,7 @@ public:
String getDescription() override { return "Replaces string-type arguments in If and Transform to enum"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -554,7 +554,7 @@ private:
}
};
void LogicalExpressionOptimizerPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void LogicalExpressionOptimizerPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
LogicalExpressionOptimizerVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -109,7 +109,7 @@ public:
"replace chains of equality functions inside an OR with a single IN operator";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -52,7 +52,7 @@ private:
}
void MultiIfToIfPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void MultiIfToIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
auto if_function_ptr = FunctionFactory::instance().get("if", context);
MultiIfToIfVisitor visitor(std::move(if_function_ptr), std::move(context));

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Optimize multiIf with single condition to if."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -64,7 +64,7 @@ private:
}
void NormalizeCountVariantsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void NormalizeCountVariantsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
NormalizeCountVariantsVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Optimize count(literal), sum(1) into count()."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -215,7 +215,7 @@ private:
}
void OptimizeDateOrDateTimeConverterWithPreimagePass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void OptimizeDateOrDateTimeConverterWithPreimagePass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
OptimizeDateOrDateTimeConverterWithPreimageVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Replace predicate having Date/DateTime converters with their preimages"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -130,7 +130,7 @@ private:
}
};
void OptimizeGroupByFunctionKeysPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void OptimizeGroupByFunctionKeysPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
OptimizeGroupByFunctionKeysVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -16,7 +16,7 @@ public:
String getDescription() override { return "Eliminates functions of other keys in GROUP BY section."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -115,7 +115,7 @@ private:
}
void OptimizeGroupByInjectiveFunctionsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void OptimizeGroupByInjectiveFunctionsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
OptimizeGroupByInjectiveFunctionsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -14,7 +14,7 @@ public:
String getDescription() override { return "Replaces injective functions by it's arguments in GROUP BY section."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -124,7 +124,7 @@ private:
}
void OptimizeRedundantFunctionsInOrderByPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void OptimizeRedundantFunctionsInOrderByPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
OptimizeRedundantFunctionsInOrderByVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "If ORDER BY has argument x followed by f(x) transforms it to ORDER BY x."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -70,7 +70,7 @@ private:
}
void OrderByLimitByDuplicateEliminationPass::run(QueryTreeNodePtr query_tree_node, ContextPtr)
void OrderByLimitByDuplicateEliminationPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr)
{
OrderByLimitByDuplicateEliminationVisitor visitor;
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Remove duplicate columns from ORDER BY, LIMIT BY."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -50,7 +50,7 @@ public:
}
void OrderByTupleEliminationPass::run(QueryTreeNodePtr query_tree_node, ContextPtr)
void OrderByTupleEliminationPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr)
{
OrderByTupleEliminationVisitor visitor;
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Remove tuple from ORDER BY."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -79,6 +79,8 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/Identifier.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace ProfileEvents
{
@ -1066,7 +1068,7 @@ private:
class QueryAnalyzer
{
public:
void resolve(QueryTreeNodePtr node, const QueryTreeNodePtr & table_expression, ContextPtr context)
void resolve(QueryTreeNodePtr & node, const QueryTreeNodePtr & table_expression, ContextPtr context)
{
IdentifierResolveScope scope(node, nullptr /*parent_scope*/);
@ -7649,7 +7651,7 @@ QueryAnalysisPass::QueryAnalysisPass(QueryTreeNodePtr table_expression_)
: table_expression(std::move(table_expression_))
{}
void QueryAnalysisPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void QueryAnalysisPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
QueryAnalyzer analyzer;
analyzer.resolve(query_tree_node, table_expression, context);

View File

@ -89,7 +89,7 @@ public:
return "Resolve type for each query expression. Replace identifiers, matchers with query expressions. Perform constant folding. Evaluate scalar subqueries.";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
private:
QueryTreeNodePtr table_expression;

View File

@ -132,7 +132,7 @@ void updateUsedProjectionIndexes(const QueryTreeNodePtr & query_or_union_node, s
}
void RemoveUnusedProjectionColumnsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void RemoveUnusedProjectionColumnsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
std::vector<QueryTreeNodePtr> nodes_to_visit;
nodes_to_visit.push_back(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Remove unused projection columns in subqueries."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -109,7 +109,7 @@ private:
}
void RewriteAggregateFunctionWithIfPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void RewriteAggregateFunctionWithIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
RewriteAggregateFunctionWithIfVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
return "Rewrite aggregate functions with if expression as argument when logically equivalent";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -120,7 +120,7 @@ private:
}
void RewriteSumFunctionWithSumAndCountPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void RewriteSumFunctionWithSumAndCountPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
RewriteSumFunctionWithSumAndCountVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -20,7 +20,7 @@ public:
String getDescription() override { return "Rewrite sum(column +/- literal) into sum(column) and literal * count(column)"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -58,7 +58,7 @@ public:
}
void ShardNumColumnToFunctionPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void ShardNumColumnToFunctionPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
ShardNumColumnToFunctionVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Rewrite _shard_num column into shardNum() function"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -178,7 +178,7 @@ private:
}
void SumIfToCountIfPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void SumIfToCountIfPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
SumIfToCountIfVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -23,7 +23,7 @@ public:
String getDescription() override { return "Rewrite sum(if) and sumIf into countIf"; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -89,7 +89,7 @@ public:
}
void UniqInjectiveFunctionsEliminationPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void UniqInjectiveFunctionsEliminationPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
UniqInjectiveFunctionsEliminationVisitor visitor(std::move(context));
visitor.visit(query_tree_node);

View File

@ -17,7 +17,7 @@ public:
String getDescription() override { return "Remove injective functions from uniq functions arguments."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

View File

@ -185,7 +185,7 @@ public:
};
void UniqToCountPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
void UniqToCountPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
UniqToCountVisitor visitor(context);
visitor.visit(query_tree_node);

View File

@ -24,7 +24,7 @@ public:
return "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.";
}
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};
}

View File

@ -215,6 +215,8 @@ class IColumn;
M(UInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(UInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of bytes per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \
M(Bool, split_parts_ranges_into_intersecting_and_non_intersecting_final, true, "Split parts ranges into intersecting and non intersecting during FINAL optimization", 0) \
M(Bool, split_intersecting_parts_ranges_into_layers_final, true, "Split intersecting parts ranges into layers during FINAL optimization", 0) \
M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \
\
M(UInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
@ -855,7 +857,7 @@ class IColumn;
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(UInt64, extract_kvp_max_pairs_per_row, 1000, "Max number pairs that can be produced by extractKeyValuePairs function. Used to safeguard against consuming too much memory.", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, allow_create_index_without_type, false, "Allow CREATE INDEX query without TYPE. Query will be ignored. Made for SQL compatibility tests.", 0) \
M(Bool, create_index_ignore_unique, false, "Ignore UNIQUE keyword in CREATE UNIQUE INDEX. Made for SQL compatibility tests.", 0) \

View File

@ -90,7 +90,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"async_insert_busy_timeout_min_ms", 50, 50, "The minimum value of the asynchronous insert timeout in milliseconds; it also serves as the initial value, which may be increased later by the adaptive algorithm"},
{"async_insert_busy_timeout_max_ms", 200, 200, "The minimum value of the asynchronous insert timeout in milliseconds; async_insert_busy_timeout_ms is aliased to async_insert_busy_timeout_max_ms"},
{"async_insert_busy_timeout_increase_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases"},
{"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"}}},
{"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"},
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", true, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},
{"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"}}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},
{"output_format_arrow_use_signed_indexes_for_dictionary", false, true, "Use signed indexes type for Arrow dictionaries by default as it's recommended"},
@ -111,7 +113,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"},
{"optimize_injective_functions_in_group_by", false, true, "Replace injective functions by it's arguments in GROUP BY section in analyzer"},
{"update_insert_deduplication_token_in_dependent_materialized_views", false, false, "Allow to update insert deduplication token with table identifier during insert in dependent materialized views"},
{"azure_max_unexpected_write_error_retries", 4, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write"}}},
{"azure_max_unexpected_write_error_retries", 4, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write"},
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", false, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},
{"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"}}},
{"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."},
{"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"},
{"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"},

View File

@ -43,11 +43,11 @@ class ExtractKeyValuePairs : public IFunction
builder.withQuotingCharacter(parsed_arguments.quoting_character.value());
}
bool is_number_of_pairs_unlimited = context->getSettingsRef().extract_key_value_pairs_max_pairs_per_row == 0;
bool is_number_of_pairs_unlimited = context->getSettingsRef().extract_kvp_max_pairs_per_row == 0;
if (!is_number_of_pairs_unlimited)
{
builder.withMaxNumberOfPairs(context->getSettingsRef().extract_key_value_pairs_max_pairs_per_row);
builder.withMaxNumberOfPairs(context->getSettingsRef().extract_kvp_max_pairs_per_row);
}
return builder.build();

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/GatherUtils/Algorithms.h>
@ -188,7 +189,7 @@ namespace
arguments[2]->getName(),
getName());
return arguments[0];
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override

View File

@ -1,17 +1,13 @@
#pragma once
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Core/Range.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/FieldToDataType.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/applyFunction.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
@ -37,8 +33,6 @@ public:
ASTIdentifier * identifier = nullptr;
DataTypePtr arg_data_type = {};
Range range = Range::createWholeUniverse();
void reject() { monotonicity.is_monotonic = false; }
bool isRejected() const { return !monotonicity.is_monotonic; }
@ -103,30 +97,13 @@ public:
if (data.isRejected())
return;
/// Monotonicity check only works for functions that contain at most two arguments and one of them must be a constant.
if (!ast_function.arguments)
/// TODO: monotonicity for functions of several arguments
if (!ast_function.arguments || ast_function.arguments->children.size() != 1)
{
data.reject();
return;
}
auto arguments_size = ast_function.arguments->children.size();
if (arguments_size == 0 || arguments_size > 2)
{
data.reject();
return;
}
else if (arguments_size == 2)
{
/// If the function has two arguments, then one of them must be a constant.
if (!ast_function.arguments->children[0]->as<ASTLiteral>() && !ast_function.arguments->children[1]->as<ASTLiteral>())
{
data.reject();
return;
}
}
if (!data.canOptimize(ast_function))
{
data.reject();
@ -147,33 +124,14 @@ public:
return;
}
auto function_arguments = getFunctionArguments(ast_function, data);
auto function_base = function->build(function_arguments);
ColumnsWithTypeAndName args;
args.emplace_back(data.arg_data_type, "tmp");
auto function_base = function->build(args);
if (function_base && function_base->hasInformationAboutMonotonicity())
{
bool is_positive = data.monotonicity.is_positive;
data.monotonicity = function_base->getMonotonicityForRange(*data.arg_data_type, data.range.left, data.range.right);
auto & key_range = data.range;
/// If we apply function to open interval, we can get empty intervals in result.
/// E.g. for ('2020-01-03', '2020-01-20') after applying 'toYYYYMM' we will get ('202001', '202001').
/// To avoid this we make range left and right included.
/// Any function that treats NULL specially is not monotonic.
/// Thus we can safely use isNull() as an -Inf/+Inf indicator here.
if (!key_range.left.isNull())
{
key_range.left = applyFunction(function_base, data.arg_data_type, key_range.left);
key_range.left_included = true;
}
if (!key_range.right.isNull())
{
key_range.right = applyFunction(function_base, data.arg_data_type, key_range.right);
key_range.right_included = true;
}
data.monotonicity = function_base->getMonotonicityForRange(*data.arg_data_type, Field(), Field());
if (!is_positive)
data.monotonicity.is_positive = !data.monotonicity.is_positive;
@ -185,53 +143,13 @@ public:
static bool needChildVisit(const ASTPtr & parent, const ASTPtr &)
{
/// Multi-argument functions with all but one constant arguments can be monotonic.
/// Currently we check monotonicity only for single-argument functions.
/// Although, multi-argument functions with all but one constant arguments can also be monotonic.
if (const auto * func = typeid_cast<const ASTFunction *>(parent.get()))
return func->arguments->children.size() <= 2;
return func->arguments->children.size() < 2;
return true;
}
static ColumnWithTypeAndName extractLiteralColumnAndTypeFromAstLiteral(const ASTLiteral * literal)
{
ColumnWithTypeAndName result;
result.type = applyVisitor(FieldToDataType(), literal->value);
result.column = result.type->createColumnConst(0, literal->value);
return result;
}
static ColumnsWithTypeAndName getFunctionArguments(const ASTFunction & ast_function, const Data & data)
{
ColumnsWithTypeAndName args;
auto arguments_size = ast_function.arguments->children.size();
chassert(arguments_size == 1 || arguments_size == 2);
if (arguments_size == 2)
{
if (ast_function.arguments->children[0]->as<ASTLiteral>())
{
const auto * literal = ast_function.arguments->children[0]->as<ASTLiteral>();
args.push_back(extractLiteralColumnAndTypeFromAstLiteral(literal));
args.emplace_back(data.arg_data_type, "tmp");
}
else
{
const auto * literal = ast_function.arguments->children[1]->as<ASTLiteral>();
args.emplace_back(data.arg_data_type, "tmp");
args.push_back(extractLiteralColumnAndTypeFromAstLiteral(literal));
}
}
else
{
args.emplace_back(data.arg_data_type, "tmp");
}
return args;
}
};
using MonotonicityCheckVisitor = ConstInDepthNodeVisitor<MonotonicityCheckMatcher, false>;

View File

@ -1,43 +0,0 @@
#include <Interpreters/applyFunction.h>
#include <Core/Range.h>
#include <Functions/IFunction.h>
namespace DB
{
static Field applyFunctionForField(const FunctionBasePtr & func, const DataTypePtr & arg_type, const Field & arg_value)
{
ColumnsWithTypeAndName columns{
{arg_type->createColumnConst(1, arg_value), arg_type, "x"},
};
auto col = func->execute(columns, func->getResultType(), 1);
return (*col)[0];
}
FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
if (field.isExplicit())
return applyFunctionForField(func, current_type, field);
String result_name = "_" + func->getName() + "_" + toString(field.column_idx);
const auto & columns = field.columns;
size_t result_idx = columns->size();
for (size_t i = 0; i < result_idx; ++i)
if ((*columns)[i].name == result_name)
result_idx = i;
if (result_idx == columns->size())
{
ColumnsWithTypeAndName args{(*columns)[field.column_idx]};
field.columns->emplace_back(ColumnWithTypeAndName{nullptr, func->getResultType(), result_name});
(*columns)[result_idx].column = func->execute(args, (*columns)[result_idx].type, columns->front().column->size());
}
return {field.columns, field.row_idx, result_idx};
}
}

View File

@ -1,16 +0,0 @@
#pragma once
#include <memory>
namespace DB
{
struct FieldRef;
class IFunctionBase;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field);
}

View File

@ -3,11 +3,6 @@
namespace DB
{
String queryToStringNullable(const ASTPtr & query)
{
return query ? queryToString(query) : "";
}
String queryToString(const ASTPtr & query)
{
return queryToString(*query);

View File

@ -6,5 +6,4 @@ namespace DB
{
String queryToString(const ASTPtr & query);
String queryToString(const IAST & query);
String queryToStringNullable(const ASTPtr & query);
}

View File

@ -495,8 +495,8 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
return visitFunction(node);
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Expected column, constant, function. Actual {}",
node->formatASTForErrorMessage());
"Expected column, constant, function. Actual {} with type: {}",
node->formatASTForErrorMessage(), node_type);
}
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitColumn(const QueryTreeNodePtr & node)

View File

@ -228,7 +228,7 @@ struct SplitPartsRangesResult
RangesInDataParts intersecting_parts_ranges;
};
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, const LoggerPtr & logger)
{
/** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
*
@ -483,10 +483,15 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts)
intersecting_ranges_in_data_parts.end(),
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
LOG_TEST(logger, "Non intersecting ranges in data parts {}", non_intersecting_ranges_in_data_parts.getDescriptions().describe());
LOG_TEST(logger, "Intersecting ranges in data parts {}", intersecting_ranges_in_data_parts.getDescriptions().describe());
return {std::move(non_intersecting_ranges_in_data_parts), std::move(intersecting_ranges_in_data_parts)};
}
std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersectingPartsRangesIntoLayers(RangesInDataParts intersecting_ranges_in_data_parts, size_t max_layers)
std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersectingPartsRangesIntoLayers(RangesInDataParts intersecting_ranges_in_data_parts,
size_t max_layers,
const LoggerPtr & logger)
{
// We will advance the iterator pointing to the mark with the smallest PK value until
// there will be not less than rows_per_layer rows in the current layer (roughly speaking).
@ -591,8 +596,18 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
result_layers.back() = std::move(current_layer_builder.getCurrentRangesInDataParts());
}
for (auto & layer : result_layers)
size_t result_layers_size = result_layers.size();
LOG_TEST(logger, "Split intersecting ranges into {} layers", result_layers_size);
for (size_t i = 0; i < result_layers_size; ++i)
{
auto & layer = result_layers[i];
LOG_TEST(logger, "Layer {} {} filter values in ({}, {}])",
i,
layer.getDescriptions().describe(),
i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
std::stable_sort(
layer.begin(),
layer.end(),
@ -712,23 +727,32 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && in_order_reading_step_getter,
bool force_process_all_ranges)
bool split_parts_ranges_into_intersecting_and_non_intersecting_final,
bool split_intersecting_parts_ranges_into_layers)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
auto logger = getLogger("PartsSplitter");
SplitPartsWithRangesByPrimaryKeyResult result;
RangesInDataParts intersecting_parts_ranges = std::move(parts);
if (!force_process_all_ranges)
if (split_parts_ranges_into_intersecting_and_non_intersecting_final)
{
SplitPartsRangesResult split_result = splitPartsRanges(intersecting_parts_ranges);
SplitPartsRangesResult split_result = splitPartsRanges(intersecting_parts_ranges, logger);
result.non_intersecting_parts_ranges = std::move(split_result.non_intersecting_parts_ranges);
intersecting_parts_ranges = std::move(split_result.intersecting_parts_ranges);
}
auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(intersecting_parts_ranges, max_layers);
if (!split_intersecting_parts_ranges_into_layers)
{
result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
return result;
}
auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(intersecting_parts_ranges, max_layers, logger);
auto filters = buildFilters(primary_key, borders);
result.merging_pipes.resize(layers.size());

View File

@ -34,5 +34,6 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && in_order_reading_step_getter,
bool force_process_all_ranges);
bool split_parts_ranges_into_intersecting_and_non_intersecting,
bool split_intersecting_parts_ranges_into_layers);
}

View File

@ -1175,7 +1175,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
bool force_process_all_ranges = !data.merging_params.is_deleted_column.empty();
bool split_parts_ranges_into_intersecting_and_non_intersecting_final = settings.split_parts_ranges_into_intersecting_and_non_intersecting_final &&
data.merging_params.is_deleted_column.empty();
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),
@ -1184,7 +1185,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
num_streams,
context,
std::move(in_order_reading_step_getter),
force_process_all_ranges);
split_parts_ranges_into_intersecting_and_non_intersecting_final,
settings.split_intersecting_parts_ranges_into_layers_final);
for (auto && non_intersecting_parts_range : split_ranges_result.non_intersecting_parts_ranges)
non_intersecting_parts_by_primary_key.push_back(std::move(non_intersecting_parts_range));

View File

@ -7,7 +7,7 @@ namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_)
: SourceStepWithFilter(DataStream{.header = pipe_.getHeader()})
: ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_))
{
}
@ -35,11 +35,4 @@ ReadFromStorageStep::ReadFromStorageStep(
processor->setStorageLimits(query_info.storage_limits);
}
void ReadFromStorageStep::applyFilters()
{
for (const auto & processor : pipe.getProcessors())
if (auto * source = dynamic_cast<SourceWithKeyCondition *>(processor.get()))
source->setKeyCondition(filter_nodes.nodes, context);
}
}

View File

@ -2,7 +2,6 @@
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
@ -10,7 +9,7 @@ namespace DB
{
/// Create source from prepared pipe.
class ReadFromPreparedSource : public SourceStepWithFilter
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_);
@ -28,7 +27,6 @@ public:
ReadFromStorageStep(Pipe pipe_, String storage_name, ContextPtr context_, const SelectQueryInfo & query_info_);
String getName() const override { return "ReadFromStorage"; }
void applyFilters() override;
private:
ContextPtr context;

View File

@ -81,7 +81,6 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Par
auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
size_t minmax_idx_size = minmax_column_types.size();
hyperrectangle.clear();
hyperrectangle.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
@ -105,39 +104,6 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Par
initialized = true;
}
Block IMergeTreeDataPart::MinMaxIndex::getBlock(const MergeTreeData & data) const
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to get block from uninitialized MinMax index.");
Block block;
const auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
const auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
const auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
const auto minmax_idx_size = minmax_column_types.size();
for (size_t i = 0; i < minmax_idx_size; ++i)
{
const auto & data_type = minmax_column_types[i];
const auto & column_name = minmax_column_names[i];
const auto column = data_type->createColumn();
const auto min_val = hyperrectangle.at(i).left;
const auto max_val = hyperrectangle.at(i).right;
column->insert(min_val);
column->insert(max_val);
block.insert(ColumnWithTypeAndName(column->getPtr(), data_type, column_name));
}
return block;
}
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
const MergeTreeData & data, IDataPartStorage & part_storage, Checksums & out_checksums) const
{
@ -219,7 +185,8 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
if (!initialized)
{
*this = other;
hyperrectangle = other.hyperrectangle;
initialized = true;
}
else
{

View File

@ -342,7 +342,6 @@ public:
}
void load(const MergeTreeData & data, const PartMetadataManagerPtr & manager);
Block getBlock(const MergeTreeData & data) const;
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;

View File

@ -1,37 +1,36 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/BoolMask.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/Utils.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/indexHint.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Set.h>
#include <DataTypes/Utils.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/applyFunction.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/MergeTree/BoolMask.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/indexHint.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/IFunction.h>
#include <Common/FieldVisitorToString.h>
#include <Common/MortonUtils.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/Set.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/MergeTreeIndexUtils.h>
#include <algorithm>
#include <cassert>
@ -837,6 +836,21 @@ bool KeyCondition::getConstant(const ASTPtr & expr, Block & block_with_constants
return node.tryGetConstant(out_value, out_type);
}
static Field applyFunctionForField(
const FunctionBasePtr & func,
const DataTypePtr & arg_type,
const Field & arg_value)
{
ColumnsWithTypeAndName columns
{
{ arg_type->createColumnConst(1, arg_value), arg_type, "x" },
};
auto col = func->execute(columns, func->getResultType(), 1);
return (*col)[0];
}
/// The case when arguments may have types different than in the primary key.
static std::pair<Field, DataTypePtr> applyFunctionForFieldOfUnknownType(
const FunctionBasePtr & func,
@ -876,6 +890,33 @@ static std::pair<Field, DataTypePtr> applyBinaryFunctionForFieldOfUnknownType(
return {std::move(result), std::move(return_type)};
}
static FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
if (field.isExplicit())
return applyFunctionForField(func, current_type, field);
String result_name = "_" + func->getName() + "_" + toString(field.column_idx);
const auto & columns = field.columns;
size_t result_idx = columns->size();
for (size_t i = 0; i < result_idx; ++i)
{
if ((*columns)[i].name == result_name)
result_idx = i;
}
if (result_idx == columns->size())
{
ColumnsWithTypeAndName args{(*columns)[field.column_idx]};
field.columns->emplace_back(ColumnWithTypeAndName {nullptr, func->getResultType(), result_name});
(*columns)[result_idx].column = func->execute(args, (*columns)[result_idx].type, columns->front().column->size());
}
return {field.columns, field.row_idx, result_idx};
}
/** When table's key has expression with these functions from a column,
* and when a column in a query is compared with a constant, such as:
* CREATE TABLE (x String) ORDER BY toDate(x)

View File

@ -8,6 +8,21 @@
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -27,20 +42,19 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTHelpers.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTPartition.h>
@ -48,37 +62,25 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTAlterQuery.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/AlterCommands.h>
#include <Storages/BlockNumberColumn.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/CurrentMetrics.h>
#include <Common/Increment.h>
#include <Common/ProfileEventsScope.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h>
#include <Common/noexcept_scope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
@ -190,50 +192,6 @@ namespace ErrorCodes
extern const int LIMIT_EXCEEDED;
}
static size_t getPartitionAstFieldsCount(const ASTPartition & partition_ast, ASTPtr partition_value_ast)
{
if (partition_ast.fields_count.has_value())
return *partition_ast.fields_count;
if (partition_value_ast->as<ASTLiteral>())
return 1;
const auto * tuple_ast = partition_value_ast->as<ASTFunction>();
if (!tuple_ast)
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected literal or tuple for partition key, got {}", partition_value_ast->getID());
}
if (tuple_ast->name != "tuple")
{
if (!isFunctionCast(tuple_ast))
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
if (tuple_ast->arguments->as<ASTExpressionList>()->children.empty())
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
auto first_arg = tuple_ast->arguments->as<ASTExpressionList>()->children.at(0);
if (const auto * inner_tuple = first_arg->as<ASTFunction>(); inner_tuple && inner_tuple->name == "tuple")
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
return arguments_ast ? arguments_ast->children.size() : 0;
}
else if (const auto * inner_literal_tuple = first_arg->as<ASTLiteral>(); inner_literal_tuple)
{
return inner_literal_tuple->value.getType() == Field::Types::Tuple ? inner_literal_tuple->value.safeGet<Tuple>().size() : 1;
}
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
else
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
return arguments_ast ? arguments_ast->children.size() : 0;
}
}
static void checkSuspiciousIndices(const ASTFunction * index_function)
{
std::unordered_set<UInt64> unique_index_expression_hashes;
@ -4902,7 +4860,7 @@ void MergeTreeData::removePartContributionToColumnAndSecondaryIndexSizes(const D
}
void MergeTreeData::checkAlterPartitionIsPossible(
const PartitionCommands & commands, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & settings, ContextPtr) const
const PartitionCommands & commands, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & settings, ContextPtr local_context) const
{
for (const auto & command : commands)
{
@ -4930,15 +4888,7 @@ void MergeTreeData::checkAlterPartitionIsPossible(
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH PARTITION ALL currently");
}
else
{
// The below `getPartitionIDFromQuery` call will not work for attach / replace because it assumes the partition expressions
// are the same and deliberately uses this storage. Later on, `MergeTreeData::replaceFrom` is called, and it makes the right
// call to `getPartitionIDFromQuery` using source storage.
// Note: `PartitionCommand::REPLACE_PARTITION` is used both for `REPLACE PARTITION` and `ATTACH PARTITION FROM` queries.
// But not for `ATTACH PARTITION` queries.
if (command.type != PartitionCommand::REPLACE_PARTITION)
getPartitionIDFromQuery(command.partition, getContext());
}
getPartitionIDFromQuery(command.partition, local_context);
}
}
}
@ -5675,8 +5625,69 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
MergeTreePartInfo::validatePartitionID(partition_ast.id->clone(), format_version);
return partition_ast.id->as<ASTLiteral>()->value.safeGet<String>();
}
size_t partition_ast_fields_count = 0;
ASTPtr partition_value_ast = partition_ast.value->clone();
auto partition_ast_fields_count = getPartitionAstFieldsCount(partition_ast, partition_value_ast);
if (!partition_ast.fields_count.has_value())
{
if (partition_value_ast->as<ASTLiteral>())
{
partition_ast_fields_count = 1;
}
else if (const auto * tuple_ast = partition_value_ast->as<ASTFunction>())
{
if (tuple_ast->name != "tuple")
{
if (isFunctionCast(tuple_ast))
{
if (tuple_ast->arguments->as<ASTExpressionList>()->children.empty())
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
auto first_arg = tuple_ast->arguments->as<ASTExpressionList>()->children.at(0);
if (const auto * inner_tuple = first_arg->as<ASTFunction>(); inner_tuple && inner_tuple->name == "tuple")
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
if (arguments_ast)
partition_ast_fields_count = arguments_ast->children.size();
else
partition_ast_fields_count = 0;
}
else if (const auto * inner_literal_tuple = first_arg->as<ASTLiteral>(); inner_literal_tuple)
{
if (inner_literal_tuple->value.getType() == Field::Types::Tuple)
partition_ast_fields_count = inner_literal_tuple->value.safeGet<Tuple>().size();
else
partition_ast_fields_count = 1;
}
else
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
}
else
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Expected tuple for complex partition key, got {}", tuple_ast->name);
}
else
{
const auto * arguments_ast = tuple_ast->arguments->as<ASTExpressionList>();
if (arguments_ast)
partition_ast_fields_count = arguments_ast->children.size();
else
partition_ast_fields_count = 0;
}
}
else
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Expected literal or tuple for partition key, got {}", partition_value_ast->getID());
}
}
else
{
partition_ast_fields_count = *partition_ast.fields_count;
}
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -7012,35 +7023,23 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
if (my_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
if (queryToStringNullable(my_snapshot->getSortingKeyAST()) != queryToStringNullable(src_snapshot->getSortingKeyAST()))
auto query_to_string = [] (const ASTPtr & ast)
{
return ast ? queryToString(ast) : "";
};
if (query_to_string(my_snapshot->getSortingKeyAST()) != query_to_string(src_snapshot->getSortingKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different ordering");
if (query_to_string(my_snapshot->getPartitionKeyAST()) != query_to_string(src_snapshot->getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
if (format_version != src_data->format_version)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different format_version");
if (queryToStringNullable(my_snapshot->getPrimaryKeyAST()) != queryToStringNullable(src_snapshot->getPrimaryKeyAST()))
if (query_to_string(my_snapshot->getPrimaryKeyAST()) != query_to_string(src_snapshot->getPrimaryKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different primary key");
const auto is_a_subset_of = [](const auto & lhs, const auto & rhs)
{
if (lhs.size() > rhs.size())
return false;
const auto rhs_set = NameSet(rhs.begin(), rhs.end());
for (const auto & lhs_element : lhs)
if (!rhs_set.contains(lhs_element))
return false;
return true;
};
if (!is_a_subset_of(my_snapshot->getColumnsRequiredForPartitionKey(), src_snapshot->getColumnsRequiredForPartitionKey()))
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Destination table partition expression columns must be a subset of source table partition expression columns");
}
const auto check_definitions = [](const auto & my_descriptions, const auto & src_descriptions)
{
if (my_descriptions.size() != src_descriptions.size())
@ -7081,56 +7080,130 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
const ReadSettings & read_settings,
const WriteSettings & write_settings)
{
return MergeTreeDataPartCloner::clone(
this, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, require_part_metadata, params, read_settings, write_settings);
}
chassert(!isStaticStorage());
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
const MergeTreeData::DataPartPtr & src_part,
const MergeTreePartition & new_partition,
const String & partition_id,
const IMergeTreeDataPart::MinMaxIndex & min_max_index,
const String & tmp_part_prefix,
const StorageMetadataPtr & my_metadata_snapshot,
const IDataPartStorage::ClonePartParams & clone_params,
ContextPtr local_context,
Int64 min_block,
Int64 max_block
)
{
MergeTreePartInfo dst_part_info(partition_id, min_block, max_block, src_part->info.level);
/// Check that the storage policy contains the disk where the src_part is located.
bool does_storage_policy_allow_same_disk = false;
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
{
does_storage_policy_allow_same_disk = true;
break;
}
}
if (!does_storage_policy_allow_same_disk)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->getDataPartStorage().getFullPath()));
return MergeTreeDataPartCloner::cloneWithDistinctPartitionExpression(
this,
src_part,
my_metadata_snapshot,
dst_part_info,
tmp_part_prefix,
local_context->getReadSettings(),
local_context->getWriteSettings(),
new_partition,
min_max_index,
false,
clone_params);
}
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
std::pair<MergeTreePartition, IMergeTreeDataPart::MinMaxIndex> MergeTreeData::createPartitionAndMinMaxIndexFromSourcePart(
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context)
{
const auto & src_data = src_part->storage;
/// Why it is needed if we only hardlink files?
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
auto src_part_storage = src_part->getDataPartStoragePtr();
auto metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(src_part.get());
IMergeTreeDataPart::MinMaxIndex min_max_index;
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
min_max_index.load(src_data, metadata_manager);
/// If source part is in memory, flush it to disk and clone it already in on-disk format
/// Protect tmp dir from removing by cleanup thread with src_flushed_tmp_dir_lock
/// Construct src_flushed_tmp_part in order to delete part with its directory at destructor
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
auto flushed_part_path = *src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
MergeTreePartition new_partition;
auto tmp_src_part_file_name = fs::path(tmp_dst_part_name).filename();
src_flushed_tmp_dir_lock = src_part->storage.getTemporaryPartDirectoryHolder(tmp_src_part_file_name);
new_partition.create(metadata_snapshot, min_max_index.getBlock(src_data), 0u, local_context);
auto flushed_part_storage = src_part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot);
return {new_partition, min_max_index};
src_flushed_tmp_part = MergeTreeDataPartBuilder(*this, src_part->name, flushed_part_storage)
.withPartInfo(src_part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
src_part_storage = flushed_part_storage;
}
String with_copy;
if (params.copy_instead_of_hardlink)
with_copy = " (copying data)";
auto dst_part_storage = src_part_storage->freeze(
relative_data_path,
tmp_dst_part_name,
read_settings,
write_settings,
/* save_metadata_callback= */ {},
params);
if (params.metadata_version_to_write.has_value())
{
chassert(!params.keep_metadata_version);
auto out_metadata = dst_part_storage->writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, getContext()->getWriteSettings());
writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
out_metadata->finalize();
if (getSettings()->fsync_after_insert)
out_metadata->sync();
}
LOG_DEBUG(log, "Clone{} part {} to {}{}",
src_flushed_tmp_part ? " flushed" : "",
src_part_storage->getFullPath(),
std::string(fs::path(dst_part_storage->getFullRootPath()) / tmp_dst_part_name),
with_copy);
auto dst_data_part = MergeTreeDataPartBuilder(*this, dst_part_name, dst_part_storage)
.withPartFormatFromDisk()
.build();
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
{
params.hardlinked_files->source_part_name = src_part->name;
params.hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!params.files_to_copy_instead_of_hardlinks.contains(it->name())
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(it->name());
}
}
auto projections = src_part->getProjectionParts();
for (const auto & [name, projection_part] : projections)
{
const auto & projection_storage = projection_part->getDataPartStorage();
for (auto it = projection_storage.iterate(); it->isValid(); it->next())
{
auto file_name_with_projection_prefix = fs::path(projection_storage.getPartDirectory()) / it->name();
if (!params.files_to_copy_instead_of_hardlinks.contains(file_name_with_projection_prefix)
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(file_name_with_projection_prefix);
}
}
}
}
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = params.txn ? params.txn->tid : Tx::PrehistoricTID;
dst_data_part->version.setCreationTID(tid, nullptr);
dst_data_part->storeVersionMetadata();
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
}
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const

View File

@ -232,7 +232,6 @@ public:
}
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
@ -855,23 +854,6 @@ public:
const ReadSettings & read_settings,
const WriteSettings & write_settings);
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadPartOnSameDiskWithDifferentPartitionKey(
const MergeTreeData::DataPartPtr & src_part,
const MergeTreePartition & new_partition,
const String & partition_id,
const IMergeTreeDataPart::MinMaxIndex & min_max_index,
const String & tmp_part_prefix,
const StorageMetadataPtr & my_metadata_snapshot,
const IDataPartStorage::ClonePartParams & clone_params,
ContextPtr local_context,
Int64 min_block,
Int64 max_block);
static std::pair<MergeTreePartition, IMergeTreeDataPart::MinMaxIndex> createPartitionAndMinMaxIndexFromSourcePart(
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context);
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
/// Returns true if table can create new parts with adaptive granularity

View File

@ -1,319 +0,0 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DistinctPartitionExpression
{
std::unique_ptr<WriteBufferFromFileBase> updatePartitionFile(
const MergeTreeData & merge_tree_data,
const MergeTreePartition & partition,
const MergeTreeData::MutableDataPartPtr & dst_part,
IDataPartStorage & storage)
{
storage.removeFile("partition.dat");
// Leverage already implemented MergeTreePartition::store to create & store partition.dat.
// Checksum is re-calculated later.
return partition.store(merge_tree_data, storage, dst_part->checksums);
}
IMergeTreeDataPart::MinMaxIndex::WrittenFiles updateMinMaxFiles(
const MergeTreeData & merge_tree_data,
const MergeTreeData::MutableDataPartPtr & dst_part,
IDataPartStorage & storage,
const StorageMetadataPtr & metadata_snapshot)
{
for (const auto & column_name : MergeTreeData::getMinMaxColumnsNames(metadata_snapshot->partition_key))
{
auto file = "minmax_" + escapeForFileName(column_name) + ".idx";
storage.removeFile(file);
}
return dst_part->minmax_idx->store(merge_tree_data, storage, dst_part->checksums);
}
void finalizeNewFiles(const std::vector<std::unique_ptr<WriteBufferFromFileBase>> & files, bool sync_new_files)
{
for (const auto & file : files)
{
file->finalize();
if (sync_new_files)
file->sync();
}
}
void updateNewPartFiles(
const MergeTreeData & merge_tree_data,
const MergeTreeData::MutableDataPartPtr & dst_part,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
const StorageMetadataPtr & src_metadata_snapshot,
bool sync_new_files)
{
auto & storage = dst_part->getDataPartStorage();
*dst_part->minmax_idx = new_min_max_index;
auto partition_file = updatePartitionFile(merge_tree_data, new_partition, dst_part, storage);
auto min_max_files = updateMinMaxFiles(merge_tree_data, dst_part, storage, src_metadata_snapshot);
IMergeTreeDataPart::MinMaxIndex::WrittenFiles written_files;
if (partition_file)
written_files.emplace_back(std::move(partition_file));
written_files.insert(written_files.end(), std::make_move_iterator(min_max_files.begin()), std::make_move_iterator(min_max_files.end()));
finalizeNewFiles(written_files, sync_new_files);
// MergeTreeDataPartCloner::finalize_part calls IMergeTreeDataPart::loadColumnsChecksumsIndexes, which will re-create
// the checksum file if it doesn't exist. Relying on that is cumbersome, but this refactoring is simply a code extraction
// with small improvements. It can be further improved in the future.
storage.removeFile("checksums.txt");
}
}
namespace
{
bool doesStoragePolicyAllowSameDisk(MergeTreeData * merge_tree_data, const MergeTreeData::DataPartPtr & src_part)
{
for (const DiskPtr & disk : merge_tree_data->getStoragePolicy()->getDisks())
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
return true;
return false;
}
DataPartStoragePtr flushPartStorageToDiskIfInMemory(
MergeTreeData * merge_tree_data,
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const String & tmp_part_prefix,
const String & tmp_dst_part_name,
scope_guard & src_flushed_tmp_dir_lock,
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part)
{
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
auto flushed_part_path = src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
auto tmp_src_part_file_name = fs::path(tmp_dst_part_name).filename();
src_flushed_tmp_dir_lock = src_part->storage.getTemporaryPartDirectoryHolder(tmp_src_part_file_name);
auto flushed_part_storage = src_part_in_memory->flushToDisk(*flushed_part_path, metadata_snapshot);
src_flushed_tmp_part = MergeTreeDataPartBuilder(*merge_tree_data, src_part->name, flushed_part_storage)
.withPartInfo(src_part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
return flushed_part_storage;
}
return src_part->getDataPartStoragePtr();
}
std::shared_ptr<IDataPartStorage> hardlinkAllFiles(
MergeTreeData * merge_tree_data,
const DB::ReadSettings & read_settings,
const DB::WriteSettings & write_settings,
const DataPartStoragePtr & storage,
const String & path,
const DB::IDataPartStorage::ClonePartParams & params)
{
return storage->freeze(
merge_tree_data->getRelativeDataPath(),
path,
read_settings,
write_settings,
/*save_metadata_callback=*/{},
params);
}
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneSourcePart(
MergeTreeData * merge_tree_data,
const MergeTreeData::DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const DB::IDataPartStorage::ClonePartParams & params)
{
const auto dst_part_name = src_part->getNewName(dst_part_info);
const auto tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto temporary_directory_lock = merge_tree_data->getTemporaryPartDirectoryHolder(tmp_dst_part_name);
src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
auto src_part_storage = flushPartStorageToDiskIfInMemory(
merge_tree_data, src_part, metadata_snapshot, tmp_part_prefix, tmp_dst_part_name, src_flushed_tmp_dir_lock, src_flushed_tmp_part);
auto dst_part_storage = hardlinkAllFiles(merge_tree_data, read_settings, write_settings, src_part_storage, tmp_dst_part_name, params);
if (params.metadata_version_to_write.has_value())
{
chassert(!params.keep_metadata_version);
auto out_metadata = dst_part_storage->writeFile(
IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, merge_tree_data->getContext()->getWriteSettings());
writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
out_metadata->finalize();
if (merge_tree_data->getSettings()->fsync_after_insert)
out_metadata->sync();
}
LOG_DEBUG(
&Poco::Logger::get("MergeTreeDataPartCloner"),
"Clone {} part {} to {}{}",
src_flushed_tmp_part ? "flushed" : "",
src_part_storage->getFullPath(),
std::string(fs::path(dst_part_storage->getFullRootPath()) / tmp_dst_part_name),
false);
auto part = MergeTreeDataPartBuilder(*merge_tree_data, dst_part_name, dst_part_storage).withPartFormatFromDisk().build();
return std::make_pair(part, std::move(temporary_directory_lock));
}
void handleHardLinkedParameterFiles(const MergeTreeData::DataPartPtr & src_part, const DB::IDataPartStorage::ClonePartParams & params)
{
const auto & hardlinked_files = params.hardlinked_files;
hardlinked_files->source_part_name = src_part->name;
hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!params.files_to_copy_instead_of_hardlinks.contains(it->name())
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
hardlinked_files->hardlinks_from_source_part.insert(it->name());
}
}
}
void handleProjections(const MergeTreeData::DataPartPtr & src_part, const DB::IDataPartStorage::ClonePartParams & params)
{
auto projections = src_part->getProjectionParts();
for (const auto & [name, projection_part] : projections)
{
const auto & projection_storage = projection_part->getDataPartStorage();
for (auto it = projection_storage.iterate(); it->isValid(); it->next())
{
auto file_name_with_projection_prefix = fs::path(projection_storage.getPartDirectory()) / it->name();
if (!params.files_to_copy_instead_of_hardlinks.contains(file_name_with_projection_prefix)
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
{
params.hardlinked_files->hardlinks_from_source_part.insert(file_name_with_projection_prefix);
}
}
}
}
MergeTreeData::MutableDataPartPtr finalizePart(
const MergeTreeData::MutableDataPartPtr & dst_part, const DB::IDataPartStorage::ClonePartParams & params, bool require_part_metadata)
{
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = params.txn ? params.txn->tid : Tx::PrehistoricTID;
dst_part->version.setCreationTID(tid, nullptr);
dst_part->storeVersionMetadata();
dst_part->is_temp = true;
dst_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
dst_part->modification_time = dst_part->getDataPartStorage().getLastModified().epochTime();
return dst_part;
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> cloneAndHandleHardlinksAndProjections(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const IDataPartStorage::ClonePartParams & params)
{
chassert(!merge_tree_data->isStaticStorage());
if (!doesStoragePolicyAllowSameDisk(merge_tree_data, src_part))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Could not clone and load part {} because disk does not belong to storage policy",
quoteString(src_part->getDataPartStorage().getFullPath()));
auto [destination_part, temporary_directory_lock] = cloneSourcePart(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
{
handleHardLinkedParameterFiles(src_part, params);
handleProjections(src_part, params);
}
return std::make_pair(destination_part, std::move(temporary_directory_lock));
}
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> MergeTreeDataPartCloner::clone(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
bool require_part_metadata,
const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings)
{
auto [destination_part, temporary_directory_lock] = cloneAndHandleHardlinksAndProjections(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
return std::make_pair(finalizePart(destination_part, params, require_part_metadata), std::move(temporary_directory_lock));
}
std::pair<MergeTreeDataPartCloner::MutableDataPartPtr, scope_guard> MergeTreeDataPartCloner::cloneWithDistinctPartitionExpression(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
bool sync_new_files,
const IDataPartStorage::ClonePartParams & params)
{
auto [destination_part, temporary_directory_lock] = cloneAndHandleHardlinksAndProjections(
merge_tree_data, src_part, metadata_snapshot, dst_part_info, tmp_part_prefix, read_settings, write_settings, params);
DistinctPartitionExpression::updateNewPartFiles(
*merge_tree_data, destination_part, new_partition, new_min_max_index, src_part->storage.getInMemoryMetadataPtr(), sync_new_files);
return std::make_pair(finalizePart(destination_part, params, false), std::move(temporary_directory_lock));
}
}

View File

@ -1,43 +0,0 @@
#pragma once
namespace DB
{
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct MergeTreePartition;
class IMergeTreeDataPart;
class MergeTreeDataPartCloner
{
public:
using DataPart = IMergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
using DataPartPtr = std::shared_ptr<const DataPart>;
static std::pair<MutableDataPartPtr, scope_guard> clone(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
bool require_part_metadata,
const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings);
static std::pair<MutableDataPartPtr, scope_guard> cloneWithDistinctPartitionExpression(
MergeTreeData * merge_tree_data,
const DataPartPtr & src_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreePartInfo & dst_part_info,
const String & tmp_part_prefix,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const MergeTreePartition & new_partition,
const IMergeTreeDataPart::MinMaxIndex & new_min_max_index,
bool sync_new_files,
const IDataPartStorage::ClonePartParams & params);
};
}

View File

@ -467,45 +467,6 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl
}
}
void MergeTreePartition::createAndValidateMinMaxPartitionIds(
const StorageMetadataPtr & metadata_snapshot, Block block_with_min_max_partition_ids, ContextPtr context)
{
if (!metadata_snapshot->hasPartitionKey())
return;
auto partition_key_names_and_types = executePartitionByExpression(metadata_snapshot, block_with_min_max_partition_ids, context);
value.resize(partition_key_names_and_types.size());
/// Executing partition_by expression adds new columns to passed block according to partition functions.
/// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back
/// with just `modulo`, because it was a temporary substitution.
static constexpr std::string_view modulo_legacy_function_name = "moduloLegacy";
size_t i = 0;
for (const auto & element : partition_key_names_and_types)
{
auto & partition_column = block_with_min_max_partition_ids.getByName(element.name);
if (element.name.starts_with(modulo_legacy_function_name))
partition_column.name.replace(0, modulo_legacy_function_name.size(), "modulo");
Field extracted_min_partition_id_field;
Field extracted_max_partition_id_field;
partition_column.column->get(0, extracted_min_partition_id_field);
partition_column.column->get(1, extracted_max_partition_id_field);
if (extracted_min_partition_id_field != extracted_max_partition_id_field)
{
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE,
"Can not create the partition. A partition can not contain values that have different partition ids");
}
partition_column.column->get(0u, value[i++]);
}
}
NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
{
auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context);

View File

@ -1,12 +1,11 @@
#pragma once
#include <Core/Field.h>
#include <base/types.h>
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <base/types.h>
#include <Core/Field.h>
namespace DB
{
@ -52,11 +51,6 @@ public:
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
/// Copy of MergeTreePartition::create, but also validates if min max partition keys are equal. If they are different,
/// it means the partition can't be created because the data doesn't belong to the same partition.
void createAndValidateMinMaxPartitionIds(
const StorageMetadataPtr & metadata_snapshot, Block block_with_min_max_partition_ids, ContextPtr context);
static void appendFiles(const MergeTreeData & storage, Strings & files);
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.

View File

@ -1,91 +0,0 @@
#include <Interpreters/MonotonicityCheckVisitor.h>
#include <Interpreters/getTableExpressions.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergeTreePartitionGlobalMinMaxIdxCalculator.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
bool isDestinationPartitionExpressionMonotonicallyIncreasing(
const std::vector<Range> & hyperrectangle, const MergeTreeData & destination_storage)
{
auto destination_table_metadata = destination_storage.getInMemoryMetadataPtr();
auto key_description = destination_table_metadata->getPartitionKey();
auto definition_ast = key_description.definition_ast->clone();
auto table_identifier = std::make_shared<ASTIdentifier>(destination_storage.getStorageID().getTableName());
auto table_with_columns
= TableWithColumnNamesAndTypes{DatabaseAndTableWithAlias(table_identifier), destination_table_metadata->getColumns().getOrdinary()};
auto expression_list = extractKeyExpressionList(definition_ast);
MonotonicityCheckVisitor::Data data{{table_with_columns}, destination_storage.getContext(), /*group_by_function_hashes*/ {}};
for (auto i = 0u; i < expression_list->children.size(); i++)
{
data.range = hyperrectangle[i];
MonotonicityCheckVisitor(data).visit(expression_list->children[i]);
if (!data.monotonicity.is_monotonic || !data.monotonicity.is_positive)
return false;
}
return true;
}
bool isExpressionDirectSubsetOf(const ASTPtr source, const ASTPtr destination)
{
auto source_expression_list = extractKeyExpressionList(source);
auto destination_expression_list = extractKeyExpressionList(destination);
std::unordered_set<std::string> source_columns;
for (auto i = 0u; i < source_expression_list->children.size(); ++i)
source_columns.insert(source_expression_list->children[i]->getColumnName());
for (auto i = 0u; i < destination_expression_list->children.size(); ++i)
if (!source_columns.contains(destination_expression_list->children[i]->getColumnName()))
return false;
return true;
}
}
void MergeTreePartitionCompatibilityVerifier::verify(
const MergeTreeData & source_storage, const MergeTreeData & destination_storage, const DataPartsVector & source_parts)
{
const auto source_metadata = source_storage.getInMemoryMetadataPtr();
const auto destination_metadata = destination_storage.getInMemoryMetadataPtr();
const auto source_partition_key_ast = source_metadata->getPartitionKeyAST();
const auto destination_partition_key_ast = destination_metadata->getPartitionKeyAST();
// If destination partition expression columns are a subset of source partition expression columns,
// there is no need to check for monotonicity.
if (isExpressionDirectSubsetOf(source_partition_key_ast, destination_partition_key_ast))
return;
const auto src_global_min_max_indexes = MergeTreePartitionGlobalMinMaxIdxCalculator::calculate(source_parts, destination_storage);
assert(!src_global_min_max_indexes.hyperrectangle.empty());
if (!isDestinationPartitionExpressionMonotonicallyIncreasing(src_global_min_max_indexes.hyperrectangle, destination_storage))
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Destination table partition expression is not monotonically increasing");
MergeTreePartition().createAndValidateMinMaxPartitionIds(
destination_storage.getInMemoryMetadataPtr(),
src_global_min_max_indexes.getBlock(destination_storage),
destination_storage.getContext());
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Core/Field.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/*
* Verifies that source and destination partitions are compatible.
* To be compatible, one of the following criteria must be met:
* 1. Destination partition expression columns are a subset of source partition columns; or
* 2. Destination partition expression is monotonic on the source global min_max idx Range AND the computer partition id for
* the source global min_max idx range is the same.
*
* If not, an exception is thrown.
* */
class MergeTreePartitionCompatibilityVerifier
{
public:
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
static void
verify(const MergeTreeData & source_storage, const MergeTreeData & destination_storage, const DataPartsVector & source_parts);
};
}

View File

@ -1,25 +0,0 @@
#include <Storages/MergeTree/MergeTreePartitionGlobalMinMaxIdxCalculator.h>
namespace DB
{
IMergeTreeDataPart::MinMaxIndex
MergeTreePartitionGlobalMinMaxIdxCalculator::calculate(const DataPartsVector & parts, const MergeTreeData & storage)
{
IMergeTreeDataPart::MinMaxIndex global_min_max_indexes;
for (const auto & part : parts)
{
auto metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(part.get());
auto local_min_max_index = MergeTreeData::DataPart::MinMaxIndex();
local_min_max_index.load(storage, metadata_manager);
global_min_max_indexes.merge(local_min_max_index);
}
return global_min_max_indexes;
}
}

View File

@ -1,24 +0,0 @@
#pragma once
#include <utility>
#include <Core/Field.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
/*
* Calculates global min max indexes for a given set of parts on given storage.
* */
class MergeTreePartitionGlobalMinMaxIdxCalculator
{
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
public:
static IMergeTreeDataPart::MinMaxIndex calculate(const DataPartsVector & parts, const MergeTreeData & storage);
};
}

View File

@ -773,7 +773,8 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
table_function_node->setTableExpressionModifiers(*table_expression_modifiers);
QueryAnalysisPass query_analysis_pass;
query_analysis_pass.run(table_function_node, query_context);
QueryTreeNodePtr node = table_function_node;
query_analysis_pass.run(node, query_context);
replacement_table_expression = std::move(table_function_node);
}

View File

@ -1,6 +1,13 @@
#include <algorithm>
#include <functional>
#include <iterator>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/IdentifierNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/Utils.h>
#include <Columns/ColumnSet.h>
@ -25,6 +32,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/Utils.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -40,6 +48,7 @@
#include <QueryPipeline/narrowPipe.h>
#include <Storages/AlterCommands.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageView.h>
@ -51,6 +60,8 @@
#include <Common/assert_cast.h>
#include <Common/checkStackSize.h>
#include <Common/typeid_cast.h>
#include <Core/NamesAndTypes.h>
#include <Functions/FunctionFactory.h>
namespace
{
@ -78,13 +89,13 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int LOGICAL_ERROR;
}
StorageMerge::DatabaseNameOrRegexp::DatabaseNameOrRegexp(
@ -379,7 +390,14 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
const auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
const auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
Names column_names_as_aliases;
Aliases aliases;
Names real_column_names = column_names;
if (child_plan.row_policy_data_opt)
child_plan.row_policy_data_opt->extendNames(real_column_names);
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto source_pipeline = createSources(
child_plan.plan,
@ -512,7 +530,6 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
Names column_names_as_aliases;
Names real_column_names = column_names;
@ -528,6 +545,8 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt->extendNames(real_column_names);
}
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
if (!context->getSettingsRef().allow_experimental_analyzer)
{
auto storage_columns = storage_metadata_snapshot->getColumns();
@ -580,6 +599,10 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
}
}
else
{
}
res.back().plan = createPlanForTable(
nested_storage_snaphsot,
@ -596,10 +619,198 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
return res;
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & query_info,
const ContextPtr & modified_context,
namespace
{
class ApplyAliasColumnExpressionsVisitor : public InDepthQueryTreeVisitor<ApplyAliasColumnExpressionsVisitor>
{
public:
explicit ApplyAliasColumnExpressionsVisitor(QueryTreeNodePtr replacement_table_expression_)
: replacement_table_expression(replacement_table_expression_)
{}
void visitImpl(QueryTreeNodePtr & node)
{
if (auto * column = node->as<ColumnNode>(); column != nullptr)
{
if (column->hasExpression())
{
node = column->getExpressionOrThrow();
node->setAlias(column->getColumnName());
}
else
column->setColumnSource(replacement_table_expression);
}
}
private:
QueryTreeNodePtr replacement_table_expression;
};
bool hasUnknownColumn(const QueryTreeNodePtr & node, QueryTreeNodePtr replacement_table_expression)
{
QueryTreeNodes stack = { node };
while (!stack.empty())
{
auto current = stack.back();
stack.pop_back();
switch (current->getNodeType())
{
case QueryTreeNodeType::CONSTANT:
break;
case QueryTreeNodeType::COLUMN:
{
auto * column_node = current->as<ColumnNode>();
auto source = column_node->getColumnSourceOrNull();
if (source != replacement_table_expression)
return true;
break;
}
default:
{
for (const auto & child : current->getChildren())
{
if (child)
stack.push_back(child);
}
}
}
}
return false;
}
void replaceFilterExpression(
QueryTreeNodePtr & expression,
const QueryTreeNodePtr & replacement_table_expression,
const ContextPtr & context)
{
auto * function = expression->as<FunctionNode>();
if (!function)
return;
if (function->getFunctionName() != "and")
{
if (hasUnknownColumn(expression, replacement_table_expression))
expression = nullptr;
return;
}
QueryTreeNodes conjunctions;
QueryTreeNodes processing{ expression };
while (!processing.empty())
{
auto node = std::move(processing.back());
processing.pop_back();
if (auto * function_node = node->as<FunctionNode>())
{
if (function_node->getFunctionName() == "and")
std::copy(
function_node->getArguments().begin(),
function_node->getArguments().end(),
std::back_inserter(processing)
);
else
conjunctions.push_back(node);
}
else
{
conjunctions.push_back(node);
}
}
std::swap(processing, conjunctions);
for (const auto & node : processing)
{
if (!hasUnknownColumn(node, replacement_table_expression))
conjunctions.push_back(node);
}
if (conjunctions.empty())
{
expression = {};
return;
}
if (conjunctions.size() == 1)
{
expression = conjunctions[0];
return;
}
function->getArguments().getNodes() = std::move(conjunctions);
const auto function_impl = FunctionFactory::instance().get("and", context);
function->resolveAsFunction(function_impl->build(function->getArgumentColumns()));
}
QueryTreeNodePtr replaceTableExpressionAndRemoveJoin(
QueryTreeNodePtr query,
QueryTreeNodePtr original_table_expression,
QueryTreeNodePtr replacement_table_expression,
const ContextPtr & context,
const Names & required_column_names)
{
auto * query_node = query->as<QueryNode>();
auto join_tree_type = query_node->getJoinTree()->getNodeType();
auto modified_query = query_node->cloneAndReplace(original_table_expression, replacement_table_expression);
if (join_tree_type == QueryTreeNodeType::TABLE || join_tree_type == QueryTreeNodeType::TABLE_FUNCTION)
return modified_query;
auto * modified_query_node = modified_query->as<QueryNode>();
modified_query = modified_query->cloneAndReplace(modified_query_node->getJoinTree(), replacement_table_expression);
modified_query_node = modified_query->as<QueryNode>();
query_node = modified_query->as<QueryNode>();
if (query_node->hasPrewhere())
replaceFilterExpression(query_node->getPrewhere(), replacement_table_expression, context);
if (query_node->hasWhere())
replaceFilterExpression(query_node->getWhere(), replacement_table_expression, context);
query_node->getGroupBy().getNodes().clear();
query_node->getHaving() = {};
query_node->getOrderBy().getNodes().clear();
auto & projection = modified_query_node->getProjection().getNodes();
projection.clear();
NamesAndTypes projection_columns;
for (auto const & column_name : required_column_names)
{
QueryTreeNodePtr fake_node = std::make_shared<IdentifierNode>(Identifier{column_name});
QueryAnalysisPass query_analysis_pass(original_table_expression);
query_analysis_pass.run(fake_node, context);
auto * resolved_column = fake_node->as<ColumnNode>();
if (!resolved_column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required column '{}' is not resolved", column_name);
auto fake_column = resolved_column->getColumn();
ApplyAliasColumnExpressionsVisitor visitor(replacement_table_expression);
visitor.visit(fake_node);
projection.push_back(fake_node);
projection_columns.push_back(fake_column);
}
query_node->resolveProjectionColumns(std::move(projection_columns));
return modified_query;
}
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot)
const StorageSnapshotPtr & storage_snapshot,
Names required_column_names,
Names & column_names_as_aliases,
Aliases & aliases) const
{
const auto & [database_name, storage, storage_lock, table_name] = storage_with_lock_and_name;
const StorageID current_storage_id = storage->getStorageID();
@ -612,8 +823,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
if (query_info.table_expression_modifiers)
replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers);
modified_query_info.query_tree = modified_query_info.query_tree->cloneAndReplace(modified_query_info.table_expression,
replacement_table_expression);
modified_query_info.query_tree = replaceTableExpressionAndRemoveJoin(modified_query_info.query_tree, modified_query_info.table_expression, replacement_table_expression, modified_context, required_column_names);
modified_query_info.table_expression = replacement_table_expression;
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
@ -624,10 +834,65 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
if (!storage_snapshot->tryGetColumn(get_column_options, "_table"))
column_name_to_node.emplace("_table", std::make_shared<ConstantNode>(current_storage_id.table_name));
{
auto table_name_node = std::make_shared<ConstantNode>(current_storage_id.table_name);
table_name_node->setAlias("_table");
column_name_to_node.emplace("_table", table_name_node);
}
if (!storage_snapshot->tryGetColumn(get_column_options, "_database"))
column_name_to_node.emplace("_database", std::make_shared<ConstantNode>(current_storage_id.database_name));
{
auto database_name_node = std::make_shared<ConstantNode>(current_storage_id.database_name);
database_name_node->setAlias("_database");
column_name_to_node.emplace("_database", database_name_node);
}
auto storage_columns = storage_snapshot->metadata->getColumns();
bool with_aliases = /* common_processed_stage == QueryProcessingStage::FetchColumns && */ !storage_columns.getAliases().empty();
if (with_aliases)
{
auto filter_actions_dag = std::make_shared<ActionsDAG>();
for (const auto & column : required_column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
QueryTreeNodePtr column_node;
if (is_alias)
{
QueryTreeNodePtr fake_node = std::make_shared<IdentifierNode>(Identifier{column});
QueryAnalysisPass query_analysis_pass(modified_query_info.table_expression);
query_analysis_pass.run(fake_node, modified_context);
auto * resolved_column = fake_node->as<ColumnNode>();
column_node = fake_node;
ApplyAliasColumnExpressionsVisitor visitor(replacement_table_expression);
visitor.visit(column_node);
if (!resolved_column || !resolved_column->getExpression())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Alias column is not resolved");
column_name_to_node.emplace(column, column_node);
aliases.push_back({ .name = column, .type = resolved_column->getResultType(), .expression = column_node->toAST() });
}
else
{
column_node = std::make_shared<ColumnNode>(NameAndTypePair{column, storage_columns.getColumn(get_column_options, column).type }, modified_query_info.table_expression);
}
PlannerActionsVisitor actions_visitor(modified_query_info.planner_context, false /*use_column_identifier_as_action_node_name*/);
actions_visitor.visit(filter_actions_dag, column_node);
}
column_names_as_aliases = filter_actions_dag->getRequiredColumnsNames();
if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
}
if (!column_name_to_node.empty())
{
@ -756,7 +1021,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertAndFilterSourceStream(header, storage_snapshot->metadata, aliases, row_policy_data_opt, modified_context, *builder, processed_stage);
convertAndFilterSourceStream(header, modified_query_info, storage_snapshot, aliases, row_policy_data_opt, modified_context, *builder, processed_stage);
}
return builder;
@ -1107,18 +1372,51 @@ void StorageMerge::alter(
void ReadFromMerge::convertAndFilterSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & modified_query_info,
const StorageSnapshotPtr & snapshot,
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
ContextPtr local_context,
ContextMutablePtr local_context,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = builder.getHeader();
auto storage_sample_block = metadata_snapshot->getSampleBlock();
auto storage_sample_block = snapshot->metadata->getSampleBlock();
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
for (const auto & alias : aliases)
{
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
auto actions_dag = std::make_shared<ActionsDAG>(pipe_columns);
QueryTreeNodePtr query_tree = buildQueryTree(alias.expression, local_context);
query_tree->setAlias(alias.name);
QueryAnalysisPass query_analysis_pass(modified_query_info.table_expression);
query_analysis_pass.run(query_tree, local_context);
PlannerActionsVisitor actions_visitor(modified_query_info.planner_context, false /*use_column_identifier_as_action_node_name*/);
const auto & nodes = actions_visitor.visit(actions_dag, query_tree);
if (nodes.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected to have 1 output but got {}", nodes.size());
actions_dag->addOrReplaceInOutputs(actions_dag->addAlias(*nodes.front(), alias.name));
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
}
else
{
for (const auto & alias : aliases)
{
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
@ -1135,10 +1433,12 @@ void ReadFromMerge::convertAndFilterSourceStream(
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
}
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
if (local_context->getSettingsRef().allow_experimental_analyzer
&& (processed_stage != QueryProcessingStage::FetchColumns || dynamic_cast<const StorageDistributed *>(&snapshot->storage) != nullptr))
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
if (row_policy_data_opt)

View File

@ -189,6 +189,13 @@ private:
using Aliases = std::vector<AliasData>;
SelectQueryInfo getModifiedQueryInfo(const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot,
Names required_column_names,
Names & column_names_as_aliases,
Aliases & aliases) const;
/// An object of this helper class is created
/// when processing a Merge table data source (subordinary table)
/// that has row policies
@ -261,17 +268,13 @@ private:
ContextMutablePtr modified_context,
bool concat_streams = false) const;
static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info,
const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot);
static void convertAndFilterSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & modified_query_info,
const StorageSnapshotPtr & snapshot,
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
ContextPtr context,
ContextMutablePtr context,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage);

Some files were not shown because too many files have changed in this diff Show More