Merge pull request #21401 from amosbird/virtualutil

Partition pruning via virtual columns (partition_id)
This commit is contained in:
alexey-milovidov 2021-03-31 08:20:08 +03:00 committed by GitHub
commit f8534acb13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 324 additions and 117 deletions

View File

@ -7,15 +7,15 @@
namespace DB
{
/// Get the connection ID. It's used for MySQL handler only.
class FunctionConnectionID : public IFunction
/// Get the connection Id. It's used for MySQL handler only.
class FunctionConnectionId : public IFunction
{
public:
static constexpr auto name = "connectionID";
static constexpr auto name = "connectionId";
explicit FunctionConnectionID(const Context & context_) : context(context_) {}
explicit FunctionConnectionId(const Context & context_) : context(context_) {}
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionID>(context); }
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionId>(context); }
String getName() const override { return name; }
@ -32,9 +32,9 @@ private:
const Context & context;
};
void registerFunctionConnectionID(FunctionFactory & factory)
void registerFunctionConnectionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionConnectionID>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionConnectionId>(FunctionFactory::CaseInsensitive);
factory.registerAlias("connection_id", "connectionID", FunctionFactory::CaseInsensitive);
}

View File

@ -0,0 +1,70 @@
#include <memory>
#include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionImpl.h>
#include <Storages/MergeTree/MergeTreePartition.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/** partitionId(x, y, ...) is a function that computes partition ids of arguments.
* The function is slow and should not be called for large amount of rows.
*/
class FunctionPartitionId : public IFunction
{
public:
static constexpr auto name = "partitionId";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPartitionId>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<DataTypeString>();
}
virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
Block sample_block(arguments);
size_t size = arguments.size();
auto result_column = ColumnString::create();
for (size_t j = 0; j < input_rows_count; ++j)
{
Row row(size);
for (size_t i = 0; i < size; ++i)
arguments[i].column->get(j, row[i]);
MergeTreePartition partition(std::move(row));
result_column->insert(partition.getID(sample_block));
}
return result_column;
}
};
void registerFunctionPartitionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionPartitionId>();
}
}

View File

@ -70,7 +70,8 @@ void registerFunctionErrorCodeToName(FunctionFactory &);
void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionID(FunctionFactory & factory);
void registerFunctionConnectionId(FunctionFactory & factory);
void registerFunctionPartitionId(FunctionFactory & factory);
void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
#if USE_ICU
@ -142,7 +143,8 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionTcpPort(factory);
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionID(factory);
registerFunctionConnectionId(factory);
registerFunctionPartitionId(factory);
registerFunctionIsIPAddressContainedIn(factory);
#if USE_ICU

View File

@ -210,7 +210,7 @@ SRCS(
cbrt.cpp
coalesce.cpp
concat.cpp
connectionID.cpp
connectionId.cpp
convertCharset.cpp
cos.cpp
cosh.cpp
@ -374,6 +374,7 @@ SRCS(
now.cpp
now64.cpp
nullIf.cpp
partitionId.cpp
pi.cpp
plus.cpp
pointInEllipses.cpp

View File

@ -201,7 +201,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
{
/// Don't descend into subqueries in arguments of IN operator.
/// But if an argument is not subquery, than deeper may be scalar subqueries and we need to descend in them.
/// But if an argument is not subquery, then deeper may be scalar subqueries and we need to descend in them.
std::vector<ASTPtr *> out;
if (checkFunctionIsInOrGlobalInOperator(func))

View File

@ -299,7 +299,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
}
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options)
{
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
@ -335,7 +335,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
prepared_sets[set_key] = std::move(set);
}
SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
SetPtr ExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
{
const auto * table = subquery_or_table_name->as<ASTIdentifier>();
if (!table)
@ -381,7 +381,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
if (settings.use_index_for_in_with_subqueries)
tryMakeSetForIndexFromSubquery(arg);
tryMakeSetForIndexFromSubquery(arg, query_options);
}
else
{
@ -1334,9 +1334,9 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje
}
ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAndName & constant_inputs)
{
auto actions = std::make_shared<ActionsDAG>(NamesAndTypesList());
auto actions = std::make_shared<ActionsDAG>(constant_inputs);
getRootActions(query, true, actions, true);
return std::make_shared<ExpressionActions>(actions, ExpressionActionsSettings::fromContext(context));

View File

@ -111,7 +111,7 @@ public:
/// Actions that can be performed on an empty block: adding constants and applying functions that depend only on constants.
/// Does not execute subqueries.
ExpressionActionsPtr getConstActions();
ExpressionActionsPtr getConstActions(const ColumnsWithTypeAndName & constant_inputs = {});
/** Sets that require a subquery to be create.
* Only the sets needed to perform actions returned from already executed `append*` or `getActions`.
@ -128,6 +128,19 @@ public:
void makeWindowDescriptions(ActionsDAGPtr actions);
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options = {});
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
protected:
ExpressionAnalyzer(
const ASTPtr & query_,
@ -299,19 +312,6 @@ private:
NameSet required_result_columns;
SelectQueryOptions query_options;
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);

View File

@ -662,7 +662,10 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
const auto & partition_desc = metadata_snapshot->getPartitionKey();
if (partition_desc.expression)
{
const auto & partition_source_columns = partition_desc.expression->getRequiredColumns();
auto partition_source_columns = partition_desc.expression->getRequiredColumns();
partition_source_columns.push_back("_part");
partition_source_columns.push_back("_partition_id");
partition_source_columns.push_back("_part_uuid");
optimize_trivial_count = true;
for (const auto & required_column : required)
{

View File

@ -42,6 +42,7 @@
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
@ -681,6 +682,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
ASTPtr expression_ast;
Block virtual_columns_block = MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns();
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
std::unordered_set<String> part_values;
if (valid && expression_ast)
{
MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return 0;
}
// At this point, empty `part_values` means all parts.
size_t res = 0;
for (const auto & part : parts)
{
if ((part_values.empty() || part_values.find(part->name) != part_values.end()) && !partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)

View File

@ -893,6 +893,9 @@ protected:
return {begin, end};
}
std::optional<UInt64> totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const;
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->setState(state); };

View File

@ -39,6 +39,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
#include <DataStreams/materializeBlock.h>
namespace ProfileEvents
{
@ -71,28 +72,30 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
}
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
Block MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns()
{
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid")});
}
void MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block)
{
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
for (const auto & part : parts)
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
}
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
block.setColumns(std::move(columns));
}
@ -176,8 +179,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
size_t total_parts = parts.size();
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -186,7 +187,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
@ -199,7 +199,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
@ -219,12 +218,23 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
std::unordered_set<String> part_values;
ASTPtr expression_ast;
auto virtual_columns_block = getSampleBlockWithVirtualPartColumns();
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return std::make_unique<QueryPlan>();
}
// At this point, empty `part_values` means all parts.
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
@ -373,7 +383,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested"
" (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first.");
return {};
return std::make_unique<QueryPlan>();
}
bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
@ -1894,7 +1904,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
@ -1945,7 +1955,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())

View File

@ -44,6 +44,12 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
/// Construct a sample block consisting only of possible virtual columns for part pruning.
static Block getSampleBlockWithVirtualPartColumns();
/// Fill in values of possible virtual columns for part pruning.
static void fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block);
private:
const MergeTreeData & data;

View File

@ -219,8 +219,8 @@ Pipe StorageMerge::read(
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(
query_info.query, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
StorageListWithLocks selected_tables
= getSelectedTables(query_info, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
@ -409,8 +409,9 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const
{
const ASTPtr & query = query_info.query;
StorageListWithLocks selected_tables;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context);
@ -438,7 +439,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
if (has_virtual_column)
{
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, global_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list

View File

@ -59,7 +59,7 @@ private:
StorageListWithLocks getSelectedTables(const String & query_id, const Settings & settings) const;
StorageMerge::StorageListWithLocks getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const;
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;

View File

@ -212,18 +212,8 @@ std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
auto parts = getDataPartsVector({DataPartState::Committed});
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const

View File

@ -4116,17 +4116,9 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & set
std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
foreachCommittedParts([&](auto & part)
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}, context.getSettingsRef().select_sequential_consistency);
return res;
DataPartsVector parts;
foreachCommittedParts([&](auto & part) { parts.push_back(part); }, context.getSettingsRef().select_sequential_consistency);
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const

View File

@ -62,7 +62,7 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
}
static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context)
static ColumnPtr getFilteredDatabases(const SelectQueryInfo & query_info, const Context & context)
{
MutableColumnPtr column = ColumnString::create();
@ -76,7 +76,7 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
}
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
return block.getByPosition(0).column;
}
@ -525,7 +525,7 @@ Pipe StorageSystemTables::read(
}
}
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context);
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
return Pipe(std::make_shared<TablesBlockSource>(
std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context));

View File

@ -5,12 +5,14 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
@ -19,6 +21,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include <Interpreters/ActionsVisitor.h>
namespace DB
@ -28,31 +31,36 @@ namespace
{
/// Verifying that the function depends only on the specified columns
bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
bool isValidFunction(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant)
{
for (const auto & child : expression->children)
if (!isValidFunction(child, columns))
return false;
if (auto opt_name = IdentifierSemantic::getColumnName(expression))
return columns.count(*opt_name);
return true;
const auto * function = expression->as<ASTFunction>();
if (function && functionIsInOrGlobalInOperator(function->name))
{
// Second argument of IN can be a scalar subquery
return isValidFunction(function->arguments->children[0], is_constant);
}
else
return is_constant(expression);
}
/// Extract all subfunctions of the main conjunction, but depending only on the specified columns
void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
bool extractFunctions(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant, std::vector<ASTPtr> & result)
{
const auto * function = expression->as<ASTFunction>();
if (function && function->name == "and")
if (function && (function->name == "and" || function->name == "indexHint"))
{
bool ret = true;
for (const auto & child : function->arguments->children)
extractFunctions(child, columns, result);
ret &= extractFunctions(child, is_constant, result);
return ret;
}
else if (isValidFunction(expression, columns))
else if (isValidFunction(expression, is_constant))
{
result.push_back(expression->clone());
return true;
}
else
return false;
}
/// Construct a conjunction from given functions
@ -65,6 +73,25 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return makeASTFunction("and", functions);
}
void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
{
const auto * func = expression->as<ASTFunction>();
if (func && functionIsInOrGlobalInOperator(func->name))
{
const IAST & args = *func->arguments;
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
analyzer.tryMakeSetForIndexFromSubquery(arg);
}
}
else
{
for (const auto & child : expression->children)
buildSets(child, analyzer);
}
}
}
namespace VirtualColumnUtils
@ -76,7 +103,6 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
if (func.empty())
{
auto literal = std::make_shared<ASTLiteral>(value);
@ -96,30 +122,63 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
}
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast)
{
bool unmodified = true;
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where() && !select.prewhere())
return;
return unmodified;
NameSet columns;
for (const auto & it : block.getNamesAndTypesList())
columns.insert(it.name);
ASTPtr condition_ast;
if (select.prewhere() && select.where())
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
else
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
/// We will create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
// Prepare a constant block with valid expressions
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).type->createColumnConstWithDefaultValue(1);
// Provide input columns as constant columns to check if an expression is constant.
std::function<bool(const ASTPtr &)> is_constant = [&block, &context](const ASTPtr & node)
{
auto actions = std::make_shared<ActionsDAG>(block.getColumnsWithTypeAndName());
PreparedSets prepared_sets;
SubqueriesForSets subqueries_for_sets;
ActionsVisitor::Data visitor_data(
context, SizeLimits{}, 1, {}, std::move(actions), prepared_sets, subqueries_for_sets, true, true, true, false);
ActionsVisitor(visitor_data).visit(node);
actions = visitor_data.getActions();
auto expression_actions = std::make_shared<ExpressionActions>(actions);
auto block_with_constants = block;
expression_actions->execute(block_with_constants);
auto column_name = node->getColumnName();
return block_with_constants.has(column_name) && isColumnConst(*block_with_constants.getByName(column_name).column);
};
/// Create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
std::vector<ASTPtr> functions;
if (select.where())
extractFunctions(select.where(), columns, functions);
unmodified &= extractFunctions(select.where(), is_constant, functions);
if (select.prewhere())
extractFunctions(select.prewhere(), columns, functions);
unmodified &= extractFunctions(select.prewhere(), is_constant, functions);
expression_ast = buildWhereExpression(functions);
return unmodified;
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast)
{
if (!expression_ast)
prepareFilterBlockWithQuery(query, context, block, expression_ast);
ASTPtr expression_ast = buildWhereExpression(functions);
if (!expression_ast)
return;
/// Let's analyze and calculate the expression.
/// Let's analyze and calculate the prepared expression.
auto syntax_result = TreeRewriter(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
buildSets(expression_ast, analyzer);
ExpressionActionsPtr actions = analyzer.getActions(false);
Block block_with_filter = block;

View File

@ -4,6 +4,7 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
@ -24,9 +25,14 @@ namespace VirtualColumnUtils
/// - `WITH toUInt16(9000) as _port`.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func = "");
/// Prepare `expression_ast` to filter block. Returns true if `expression_ast` is not trimmed, that is,
/// `block` provides all needed columns for `expression_ast`, else return false.
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast);
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast = {});
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -10,7 +10,8 @@ SELECT count() FROM merge_tree;
SET max_rows_to_read = 900000;
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
-- constant ignore will be pruned by part pruner. ignore(*) is used.
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
DROP TABLE merge_tree;

View File

@ -8,7 +8,7 @@ INSERT INTO xp SELECT '2020-01-01', number, '' FROM numbers(100000);
CREATE TABLE xp_d AS xp ENGINE = Distributed(test_shard_localhost, currentDatabase(), xp);
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 20 }
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- B > NULL is evaluated to 0 and this works
SELECT count() FROM xp_d WHERE A GLOBAL IN (SELECT NULL); -- { serverError 53 }

View File

@ -0,0 +1,7 @@
1 1
1 2
1 3
1 1
1 2
1 3
3

View File

@ -0,0 +1,19 @@
drop table if exists x;
create table x (i int, j int) engine MergeTree partition by i order by j settings index_granularity = 1;
insert into x values (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6);
set max_rows_to_read = 3;
select * from x where _partition_id = partitionId(1);
set max_rows_to_read = 4; -- one row for subquery
select * from x where _partition_id in (select partitionId(number + 1) from numbers(1));
-- trivial count optimization test
set max_rows_to_read = 1; -- one row for subquery
select count() from x where _partition_id in (select partitionId(number + 1) from numbers(1));
drop table x;