Merge pull request #16253 from amosbird/pf

Prune partition in verbatim way.
This commit is contained in:
alexey-milovidov 2020-11-08 18:58:02 +03:00 committed by GitHub
commit 0e6ae4aff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 262 additions and 63 deletions

View File

@ -53,6 +53,7 @@ using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
*/
struct ExpressionAction
{
friend class KeyCondition;
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:

View File

@ -5,6 +5,7 @@
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/misc.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
@ -486,6 +487,29 @@ static std::pair<Field, DataTypePtr> applyFunctionForFieldOfUnknownType(
}
/// Same as above but for binary operators
static std::pair<Field, DataTypePtr> applyBinaryFunctionForFieldOfUnknownType(
const FunctionOverloadResolverPtr & func,
const DataTypePtr & arg_type,
const Field & arg_value,
const DataTypePtr & arg_type2,
const Field & arg_value2)
{
ColumnsWithTypeAndName arguments{
{arg_type->createColumnConst(1, arg_value), arg_type, "x"}, {arg_type2->createColumnConst(1, arg_value2), arg_type2, "y"}};
FunctionBasePtr func_base = func->build(arguments);
DataTypePtr return_type = func_base->getResultType();
auto col = func_base->execute(arguments, return_type, 1);
Field result = (*col)[0];
return {std::move(result), std::move(return_type)};
}
static FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
@ -569,7 +593,15 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
if (!sample_block.has(expr_name))
return false;
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
bool found_transformation = false;
auto input_column = sample_block.getByName(expr_name);
auto const_column = out_type->createColumnConst(1, out_value);
out_value = (*castColumn({const_column, out_type, "c"}, input_column.type))[0];
out_type = input_column.type;
for (const ExpressionAction & action : key_expr->getActions())
{
/** The key functional expression constraint may be inferred from a plain column in the expression.
@ -617,6 +649,77 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
return found_transformation;
}
/// Looking for possible transformation of `column = constant` into `partition_expr = function(constant)`
bool KeyCondition::canConstantBeWrappedByFunctions(
const ASTPtr & node, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type)
{
if (strict)
return false;
String expr_name = node->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
return false;
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
bool found_transformation = false;
auto input_column = sample_block.getByName(expr_name);
auto const_column = out_type->createColumnConst(1, out_value);
out_value = (*castColumn({const_column, out_type, "c"}, input_column.type))[0];
out_type = input_column.type;
Block transform({{input_column.type->createColumn(), input_column.type, input_column.name}});
for (const ExpressionAction & action : key_expr->getActions())
{
const auto & argument_names = action.argument_names;
if (action.type == ExpressionAction::Type::APPLY_FUNCTION)
{
if (!action.function_base->isDeterministic())
return false;
if (argument_names.size() == 1 && argument_names[0] == expr_name)
{
std::tie(out_value, out_type) = applyFunctionForFieldOfUnknownType(action.function_builder, out_type, out_value);
}
else if (argument_names.size() == 2)
{
if (!transform.has(argument_names[0]) || !transform.has(argument_names[1]))
return false;
auto left = transform.getByName(argument_names[0]);
auto right = transform.getByName(argument_names[1]);
if (isColumnConst(*left.column))
{
auto left_arg_type = left.type;
auto left_arg_value = (*left.column)[0];
std::tie(out_value, out_type) = applyBinaryFunctionForFieldOfUnknownType(
action.function_builder, left_arg_type, left_arg_value, out_type, out_value);
}
else if (isColumnConst(*right.column))
{
auto right_arg_type = right.type;
auto right_arg_value = (*right.column)[0];
std::tie(out_value, out_type) = applyBinaryFunctionForFieldOfUnknownType(
action.function_builder, out_type, out_value, right_arg_type, right_arg_value);
}
}
expr_name = action.result_name;
auto it = key_columns.find(expr_name);
if (key_columns.end() != it)
{
out_key_column_num = it->second;
out_key_column_type = sample_block.getByName(it->first).type;
found_transformation = true;
break;
}
}
action.execute(transform, true);
}
return found_transformation;
}
bool KeyCondition::tryPrepareSetIndex(
const ASTs & args,
const Context & context,
@ -863,33 +966,57 @@ bool KeyCondition::tryParseAtomFromAST(const ASTPtr & node, const Context & cont
bool is_set_const = false;
bool is_constant_transformed = false;
if (functionIsInOrGlobalInOperator(func_name)
&& tryPrepareSetIndex(args, context, out, key_column_num))
if (functionIsInOrGlobalInOperator(func_name))
{
key_arg_pos = 0;
is_set_const = true;
if (tryPrepareSetIndex(args, context, out, key_column_num))
{
key_arg_pos = 0;
is_set_const = true;
}
else
return false;
}
else if (getConstant(args[1], block_with_constants, const_value, const_type)
&& isKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain))
else if (getConstant(args[1], block_with_constants, const_value, const_type))
{
key_arg_pos = 0;
if (isKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 0;
}
else if (canConstantBeWrappedByMonotonicFunctions(args[0], key_column_num, key_expr_type, const_value, const_type))
{
key_arg_pos = 0;
is_constant_transformed = true;
}
else if (
single_point && func_name == "equals"
&& canConstantBeWrappedByFunctions(args[0], key_column_num, key_expr_type, const_value, const_type))
{
key_arg_pos = 0;
is_constant_transformed = true;
}
else
return false;
}
else if (getConstant(args[1], block_with_constants, const_value, const_type)
&& canConstantBeWrappedByMonotonicFunctions(args[0], key_column_num, key_expr_type, const_value, const_type))
else if (getConstant(args[0], block_with_constants, const_value, const_type))
{
key_arg_pos = 0;
is_constant_transformed = true;
}
else if (getConstant(args[0], block_with_constants, const_value, const_type)
&& isKeyPossiblyWrappedByMonotonicFunctions(args[1], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 1;
}
else if (getConstant(args[0], block_with_constants, const_value, const_type)
&& canConstantBeWrappedByMonotonicFunctions(args[1], key_column_num, key_expr_type, const_value, const_type))
{
key_arg_pos = 1;
is_constant_transformed = true;
if (isKeyPossiblyWrappedByMonotonicFunctions(args[1], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 1;
}
else if (canConstantBeWrappedByMonotonicFunctions(args[1], key_column_num, key_expr_type, const_value, const_type))
{
key_arg_pos = 1;
is_constant_transformed = true;
}
else if (
single_point && func_name == "equals"
&& canConstantBeWrappedByFunctions(args[1], key_column_num, key_expr_type, const_value, const_type))
{
key_arg_pos = 0;
is_constant_transformed = true;
}
else
return false;
}
else
return false;

View File

@ -402,6 +402,9 @@ private:
Field & out_value,
DataTypePtr & out_type);
bool canConstantBeWrappedByFunctions(
const ASTPtr & node, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);
/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.

View File

@ -6,6 +6,7 @@
#include <Poco/File.h>
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
@ -238,13 +239,15 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
}
std::optional<KeyCondition> minmax_idx_condition;
std::optional<PartitionPruner> partition_pruner;
if (data.minmax_idx_expr)
{
minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr);
partition_pruner.emplace(metadata_snapshot->getPartitionKey(), query_info, context, false /* strict */);
if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{
String msg = "MinMax index by columns (";
String msg = "Neither MinMax index by columns (";
bool first = true;
for (const String & col : data.minmax_idx_columns)
{
@ -254,7 +257,7 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
msg += ", ";
msg += col;
}
msg += ") is not used and setting 'force_index_by_date' is set";
msg += ") nor partition expr is used and setting 'force_index_by_date' is set";
throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
}
@ -278,6 +281,12 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);

View File

@ -0,0 +1,25 @@
#include <Storages/MergeTree/PartitionPruner.h>
namespace DB
{
bool PartitionPruner::canBePruned(const DataPartPtr & part)
{
if (part->isEmpty())
return true;
const auto & partition_id = part->info.partition_id;
bool is_valid;
if (auto it = partition_filter_map.find(partition_id); it != partition_filter_map.end())
is_valid = it->second;
else
{
const auto & partition_value = part->partition.value;
std::vector<FieldRef> index_value(partition_value.begin(), partition_value.end());
is_valid = partition_condition.mayBeTrueInRange(
partition_value.size(), index_value.data(), index_value.data(), partition_key.data_types);
partition_filter_map.emplace(partition_id, is_valid);
}
return !is_valid;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <unordered_map>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/KeyCondition.h>
namespace DB
{
/// Pruning partitions in verbatim way using KeyCondition
class PartitionPruner
{
private:
std::unordered_map<String, bool> partition_filter_map;
const KeyDescription & partition_key;
KeyCondition partition_condition;
bool useless;
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
public:
PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, const Context & context, bool strict)
: partition_key(partition_key_)
, partition_condition(
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(partition_condition.alwaysUnknownOrTrue())
{
}
bool canBePruned(const DataPartPtr & part);
bool isUseless() const { return useless; }
};
}

View File

@ -21,6 +21,7 @@
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
@ -192,31 +193,14 @@ std::optional<UInt64> StorageMergeTree::totalRows() const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
Names partition_key_columns = partition_key.column_names;
KeyCondition key_condition(
query_info, context, partition_key_columns, partition_key.expression, true /* single_point */, true /* strict */);
if (key_condition.alwaysUnknownOrTrue())
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
std::unordered_map<String, bool> partition_filter_map;
size_t res = 0;
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (part->isEmpty())
continue;
const auto & partition_id = part->info.partition_id;
bool is_valid;
if (auto it = partition_filter_map.find(partition_id); it != partition_filter_map.end())
is_valid = it->second;
else
{
const auto & partition_value = part->partition.value;
std::vector<FieldRef> index_value(partition_value.begin(), partition_value.end());
is_valid = key_condition.mayBeTrueInRange(partition_value.size(), index_value.data(), index_value.data(), partition_key.data_types);
partition_filter_map.emplace(partition_id, is_valid);
}
if (is_valid)
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;

View File

@ -17,6 +17,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
@ -3666,28 +3667,13 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
Names partition_key_columns = partition_key.column_names;
KeyCondition key_condition(
query_info, context, partition_key_columns, partition_key.expression, true /* single_point */, true /* strict */);
if (key_condition.alwaysUnknownOrTrue())
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
std::unordered_map<String, bool> partition_filter_map;
size_t res = 0;
foreachCommittedParts([&](auto & part)
{
const auto & partition_id = part->info.partition_id;
bool is_valid;
if (auto it = partition_filter_map.find(partition_id); it != partition_filter_map.end())
is_valid = it->second;
else
{
const auto & partition_value = part->partition.value;
std::vector<FieldRef> index_value(partition_value.begin(), partition_value.end());
is_valid = key_condition.mayBeTrueInRange(partition_value.size(), index_value.data(), index_value.data(), partition_key.data_types);
partition_filter_map.emplace(partition_id, is_valid);
}
if (is_valid)
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
});
return res;

View File

@ -85,6 +85,7 @@ SRCS(
MergeTree/MergeType.cpp
MergeTree/MergedBlockOutputStream.cpp
MergeTree/MergedColumnOnlyOutputStream.cpp
MergeTree/PartitionPruner.cpp
MergeTree/ReplicatedFetchList.cpp
MergeTree/ReplicatedMergeTreeAddress.cpp
MergeTree/ReplicatedMergeTreeAltersSequence.cpp

View File

@ -0,0 +1,3 @@
2 3
9 5
8 4

View File

@ -0,0 +1,23 @@
drop table if exists xy;
create table xy(x int, y int) engine MergeTree partition by intHash64(x) % 2 order by y settings index_granularity = 1;
-- intHash64(0) % 2 = 0
-- intHash64(2) % 2 = 1
-- intHash64(8) % 2 = 0
-- intHash64(9) % 2 = 1
insert into xy values (0, 2), (2, 3), (8, 4), (9, 5);
-- Now we have two partitions: 0 and 1, each of which contains 2 values.
-- minmax index for the first partition is 0 <= x <= 8
-- minmax index for the second partition is 2 <= x <= 9
SET max_rows_to_read = 2;
select * from xy where intHash64(x) % 2 = intHash64(2) % 2;
-- Equality is another special operator that can be treated as an always monotonic indicator for deterministic functions.
-- minmax index is not enough.
select * from xy where x = 8;
drop table if exists xy;