partition id pruning

This commit is contained in:
Amos Bird 2021-03-03 16:36:20 +08:00
parent 218542589a
commit 93b661ad5a
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
16 changed files with 278 additions and 81 deletions

View File

@ -0,0 +1,75 @@
#include <memory>
#include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionImpl.h>
#include <Storages/MergeTree/MergeTreePartition.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/** partitionID(x, y, ...) is a function that computes partition ids of arguments.
*/
class FunctionPartitionID : public IFunction
{
public:
static constexpr auto name = "partitionID";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPartitionID>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<DataTypeString>();
}
virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
size_t size = arguments.size();
Columns columns;
Block sample_block;
for (const auto & argument : arguments)
{
sample_block.insert(argument);
columns.push_back(argument.column);
}
auto result_column = ColumnString::create();
for (size_t j = 0; j < input_rows_count; ++j)
{
Row row(size);
for (size_t i = 0; i < size; ++i)
columns[i]->get(j, row[i]);
MergeTreePartition partition(std::move(row));
result_column->insert(partition.getID(sample_block));
}
return result_column;
}
};
void registerFunctionPartitionID(FunctionFactory & factory)
{
factory.registerFunction<FunctionPartitionID>();
}
}

View File

@ -70,6 +70,7 @@ void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionID(FunctionFactory & factory);
void registerFunctionPartitionID(FunctionFactory & factory);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -140,6 +141,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionID(factory);
registerFunctionPartitionID(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -373,6 +373,7 @@ SRCS(
now.cpp
now64.cpp
nullIf.cpp
partitionID.cpp
pi.cpp
plus.cpp
pointInEllipses.cpp

View File

@ -305,7 +305,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
}
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options)
{
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
@ -341,7 +341,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
prepared_sets[set_key] = std::move(set);
}
SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
SetPtr ExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
{
const auto * table = subquery_or_table_name->as<ASTIdentifier>();
if (!table)
@ -387,7 +387,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
if (settings.use_index_for_in_with_subqueries)
tryMakeSetForIndexFromSubquery(arg);
tryMakeSetForIndexFromSubquery(arg, query_options);
}
else
{

View File

@ -127,6 +127,19 @@ public:
void makeWindowDescriptions(ActionsDAGPtr actions);
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options = {});
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
protected:
ExpressionAnalyzer(
const ASTPtr & query_,
@ -297,19 +310,6 @@ private:
NameSet required_result_columns;
SelectQueryOptions query_options;
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);

View File

@ -656,7 +656,10 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
const auto & partition_desc = metadata_snapshot->getPartitionKey();
if (partition_desc.expression)
{
const auto & partition_source_columns = partition_desc.expression->getRequiredColumns();
auto partition_source_columns = partition_desc.expression->getRequiredColumns();
partition_source_columns.push_back("_part");
partition_source_columns.push_back("_partition_id");
partition_source_columns.push_back("_part_uuid");
optimize_trivial_count = true;
for (const auto & required_column : required)
{

View File

@ -42,6 +42,7 @@
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
@ -675,6 +676,38 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
Block part_block = MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns();
ASTPtr expression_ast;
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, part_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
std::unordered_set<String> part_values;
if (valid)
{
MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(parts, part_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, part_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(part_block, "_part");
if (part_values.empty())
return 0;
}
// At this point, empty `part_values` means all parts.
size_t res = 0;
for (const auto & part : parts)
{
if ((part_values.empty() || part_values.find(part->name) != part_values.end()) && !partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)

View File

@ -825,6 +825,9 @@ protected:
return {begin, end};
}
std::optional<UInt64> totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const;
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->setState(state); };

View File

@ -71,28 +71,30 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
}
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
Block MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns()
{
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid")});
}
void MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block)
{
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
for (const auto & part : parts)
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
}
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
block.setColumns(std::move(columns));
}
@ -176,8 +178,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
size_t total_parts = parts.size();
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -186,7 +186,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
@ -199,7 +198,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
@ -219,12 +217,21 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
std::unordered_set<String> part_values;
ASTPtr expression_ast;
Block virtual_columns_block = getSampleBlockWithVirtualPartColumns();
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, virtual_columns_block, expression_ast);
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return {};
}
// At this point, empty `part_values` means all parts.
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
@ -1899,7 +1906,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
@ -1950,7 +1957,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())

View File

@ -44,6 +44,12 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
/// Construct a sample block consisting only of possible virtual columns for part pruning.
static Block getSampleBlockWithVirtualPartColumns();
/// Fill in values of possible virtual columns for part pruning.
static void fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block);
private:
const MergeTreeData & data;

View File

@ -208,18 +208,8 @@ std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
auto parts = getDataPartsVector({DataPartState::Committed});
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const

View File

@ -3890,17 +3890,9 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & set
std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
foreachCommittedParts([&](auto & part)
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}, context.getSettingsRef().select_sequential_consistency);
return res;
DataPartsVector parts;
foreachCommittedParts([&](auto & part) { parts.push_back(part); }, context.getSettingsRef().select_sequential_consistency);
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const

View File

@ -5,12 +5,14 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
@ -30,29 +32,52 @@ namespace
/// Verifying that the function depends only on the specified columns
bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
{
for (const auto & child : expression->children)
if (!isValidFunction(child, columns))
return false;
if (auto opt_name = IdentifierSemantic::getColumnName(expression))
return columns.count(*opt_name);
const auto * function = expression->as<ASTFunction>();
if (function)
{
if (functionIsInOrGlobalInOperator(function->name))
{
// Second argument of IN can be a scalar subquery
if (!isValidFunction(function->arguments->children[0], columns))
return false;
}
else
{
if (function->arguments)
{
for (const auto & child : function->arguments->children)
if (!isValidFunction(child, columns))
return false;
}
}
}
else
{
if (auto opt_name = IdentifierSemantic::getColumnName(expression))
return columns.count(*opt_name);
}
return true;
}
/// Extract all subfunctions of the main conjunction, but depending only on the specified columns
void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
bool extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
{
const auto * function = expression->as<ASTFunction>();
if (function && function->name == "and")
if (function && (function->name == "and" || function->name == "indexHint"))
{
bool ret = true;
for (const auto & child : function->arguments->children)
extractFunctions(child, columns, result);
ret &= extractFunctions(child, columns, result);
return ret;
}
else if (isValidFunction(expression, columns))
{
result.push_back(expression->clone());
return true;
}
else
return false;
}
/// Construct a conjunction from given functions
@ -65,6 +90,25 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return makeASTFunction("and", functions);
}
void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
{
const auto * func = expression->as<ASTFunction>();
if (func && functionIsInOrGlobalInOperator(func->name))
{
const IAST & args = *func->arguments;
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
analyzer.tryMakeSetForIndexFromSubquery(arg);
}
}
else
{
for (const auto & child : expression->children)
buildSets(child, analyzer);
}
}
}
namespace VirtualColumnUtils
@ -96,11 +140,12 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
}
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Block & block, ASTPtr & expression_ast)
{
bool ret = true;
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where() && !select.prewhere())
return;
return ret;
NameSet columns;
for (const auto & it : block.getNamesAndTypesList())
@ -109,17 +154,26 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & c
/// We will create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
std::vector<ASTPtr> functions;
if (select.where())
extractFunctions(select.where(), columns, functions);
ret &= extractFunctions(select.where(), columns, functions);
if (select.prewhere())
extractFunctions(select.prewhere(), columns, functions);
ret &= extractFunctions(select.prewhere(), columns, functions);
expression_ast = buildWhereExpression(functions);
return ret;
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast)
{
if (!expression_ast)
prepareFilterBlockWithQuery(query, block, expression_ast);
ASTPtr expression_ast = buildWhereExpression(functions);
if (!expression_ast)
return;
/// Let's analyze and calculate the expression.
auto syntax_result = TreeRewriter(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
buildSets(expression_ast, analyzer);
ExpressionActionsPtr actions = analyzer.getActions(false);
Block block_with_filter = block;

View File

@ -24,9 +24,14 @@ namespace VirtualColumnUtils
/// - `WITH toUInt16(9000) as _port`.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func = "");
/// Prepare `expression_ast` to filter block. Returns true if `expression_ast` is not trimmed, that is,
/// the sample block provides all needed columns, else return false.
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Block & sample_block, ASTPtr & expression_ast);
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast = {});
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -0,0 +1,7 @@
1 1
1 2
1 3
1 1
1 2
1 3
3

View File

@ -0,0 +1,19 @@
drop table if exists x;
create table x (i int, j int) engine MergeTree partition by i order by j settings index_granularity = 1;
insert into x values (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6);
set max_rows_to_read = 3;
select * from x where _partition_id = partitionID(1);
set max_rows_to_read = 4; -- one row for subquery
select * from x where _partition_id in (select partitionID(number + 1) from numbers(1));
-- trivial count optimization test
set max_rows_to_read = 1; -- one row for subquery
select count() from x where _partition_id in (select partitionID(number + 1) from numbers(1));
drop table x;