Merge pull request #38715 from CurtizJ/fix-read-in-order-fixed-prefix

Better support of `optimize_read_in_order` in case of fixed prefix of sorting key
This commit is contained in:
Anton Popov 2022-09-01 12:59:18 +02:00 committed by GitHub
commit f7bdf07adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 269 additions and 61 deletions

View File

@ -2592,7 +2592,7 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
auto finish_sorting_step = std::make_unique<SortingStep>( auto finish_sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr, input_sorting_info->sort_description_for_merging,
output_order_descr, output_order_descr,
settings.max_block_size, settings.max_block_size,
limit); limit);

View File

@ -104,7 +104,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info) if (order_info)
{ {
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info); read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
sorting->convertToFinishSorting(order_info->order_key_prefix_descr); sorting->convertToFinishSorting(order_info->sort_description_for_merging);
} }
return 0; return 0;

View File

@ -548,9 +548,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (need_preliminary_merge) if (need_preliminary_merge)
{ {
size_t fixed_prefix_size = input_order_info->order_key_fixed_prefix_descr.size(); size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
size_t prefix_size = fixed_prefix_size + input_order_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone(); auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
order_key_prefix_ast->children.resize(prefix_size); order_key_prefix_ast->children.resize(prefix_size);

View File

@ -41,13 +41,13 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
/// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform /// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform
res_header = params->getCustomHeader(/* final_= */ false); res_header = params->getCustomHeader(/* final_= */ false);
for (size_t i = 0; i < group_by_info->order_key_prefix_descr.size(); ++i) for (size_t i = 0; i < group_by_info->sort_description_for_merging.size(); ++i)
{ {
const auto & column_description = group_by_description_[i]; const auto & column_description = group_by_description_[i];
group_by_description.emplace_back(column_description, res_header.getPositionByName(column_description.column_name)); group_by_description.emplace_back(column_description, res_header.getPositionByName(column_description.column_name));
} }
if (group_by_info->order_key_prefix_descr.size() < group_by_description_.size()) if (group_by_info->sort_description_for_merging.size() < group_by_description_.size())
{ {
group_by_key = true; group_by_key = true;
/// group_by_description may contains duplicates, so we use keys_size from Aggregator::params /// group_by_description may contains duplicates, so we use keys_size from Aggregator::params

View File

@ -242,7 +242,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto sorting_step = std::make_unique<SortingStep>( auto sorting_step = std::make_unique<SortingStep>(
projection_plan->getCurrentDataStream(), projection_plan->getCurrentDataStream(),
query_info.projection->input_order_info->order_key_prefix_descr, query_info.projection->input_order_info->sort_description_for_merging,
output_order_descr, output_order_descr,
settings.max_block_size, settings.max_block_size,
limit); limit);

View File

@ -5,10 +5,12 @@
#include <Interpreters/TreeRewriter.h> #include <Interpreters/TreeRewriter.h>
#include <Interpreters/replaceAliasColumnsInQuery.h> #include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
namespace DB namespace DB
{ {
@ -21,26 +23,46 @@ namespace ErrorCodes
namespace namespace
{ {
ASTPtr getFixedPoint(const ASTPtr & ast) /// Finds expression like x = 'y' or f(x) = 'y',
/// where `x` is identifier, 'y' is literal and `f` is injective functions.
ASTPtr getFixedPoint(const ASTPtr & ast, const ContextPtr & context)
{ {
const auto * func = ast->as<ASTFunction>(); const auto * func = ast->as<ASTFunction>();
if (!func || func->name != "equals") if (!func || func->name != "equals")
return nullptr; return nullptr;
if (!func->arguments || func->arguments->children.size() != 2)
return nullptr;
const auto & lhs = func->arguments->children[0]; const auto & lhs = func->arguments->children[0];
const auto & rhs = func->arguments->children[1]; const auto & rhs = func->arguments->children[1];
if (lhs->as<ASTLiteral>()) if (!lhs->as<ASTLiteral>() && !rhs->as<ASTLiteral>())
return rhs; return nullptr;
if (rhs->as<ASTLiteral>()) /// Case of two literals doesn't make sense.
return lhs; if (lhs->as<ASTLiteral>() && rhs->as<ASTLiteral>())
return nullptr;
return nullptr; /// If indetifier is wrapped into injective functions, remove them.
auto argument = lhs->as<ASTLiteral>() ? rhs : lhs;
while (const auto * arg_func = argument->as<ASTFunction>())
{
if (!arg_func->arguments || arg_func->arguments->children.size() != 1)
return nullptr;
auto func_resolver = FunctionFactory::instance().tryGet(arg_func->name, context);
if (!func_resolver || !func_resolver->isInjective({}))
return nullptr;
argument = arg_func->arguments->children[0];
}
return argument->as<ASTIdentifier>() ? argument : nullptr;
} }
size_t calculateFixedPrefixSize( NameSet getFixedSortingColumns(
const ASTSelectQuery & query, const Names & sorting_key_columns) const ASTSelectQuery & query, const Names & sorting_key_columns, const ContextPtr & context)
{ {
ASTPtr condition; ASTPtr condition;
if (query.where() && query.prewhere()) if (query.where() && query.prewhere())
@ -51,14 +73,15 @@ size_t calculateFixedPrefixSize(
condition = query.prewhere(); condition = query.prewhere();
if (!condition) if (!condition)
return 0; return {};
/// Convert condition to CNF for more convenient analysis. /// Convert condition to CNF for more convenient analysis.
auto cnf = TreeCNFConverter::tryConvertToCNF(condition); auto cnf = TreeCNFConverter::tryConvertToCNF(condition);
if (!cnf) if (!cnf)
return 0; return {};
NameSet fixed_points; NameSet fixed_points;
NameSet sorting_key_columns_set(sorting_key_columns.begin(), sorting_key_columns.end());
/// If we met expression like 'column = x', where 'x' is literal, /// If we met expression like 'column = x', where 'x' is literal,
/// in clause of size 1 in CNF, then we can guarantee /// in clause of size 1 in CNF, then we can guarantee
@ -67,22 +90,17 @@ size_t calculateFixedPrefixSize(
{ {
if (group.size() == 1 && !group.begin()->negative) if (group.size() == 1 && !group.begin()->negative)
{ {
auto fixed_point = getFixedPoint(group.begin()->ast); auto fixed_point = getFixedPoint(group.begin()->ast, context);
if (fixed_point) if (fixed_point)
fixed_points.insert(fixed_point->getColumnName()); {
auto column_name = fixed_point->getColumnName();
if (sorting_key_columns_set.contains(column_name))
fixed_points.insert(column_name);
}
} }
}); });
size_t prefix_size = 0; return fixed_points;
for (const auto & column_name : sorting_key_columns)
{
if (!fixed_points.contains(column_name))
break;
++prefix_size;
}
return prefix_size;
} }
/// Optimize in case of exact match with order key element /// Optimize in case of exact match with order key element
@ -181,46 +199,54 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrderImpl(
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const SortDescription & description, const SortDescription & description,
const ManyExpressionActions & actions, const ManyExpressionActions & actions,
const ContextPtr & context,
UInt64 limit) const UInt64 limit) const
{ {
auto sorting_key_columns = metadata_snapshot->getSortingKeyColumns(); auto sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
int read_direction = description.at(0).direction; int read_direction = description.at(0).direction;
size_t fixed_prefix_size = calculateFixedPrefixSize(query, sorting_key_columns); auto fixed_sorting_columns = getFixedSortingColumns(query, sorting_key_columns, context);
size_t descr_prefix_size = std::min(description.size(), sorting_key_columns.size() - fixed_prefix_size);
SortDescription order_key_prefix_descr; SortDescription sort_description_for_merging;
order_key_prefix_descr.reserve(descr_prefix_size); sort_description_for_merging.reserve(description.size());
for (size_t i = 0; i < descr_prefix_size; ++i) size_t desc_pos = 0;
size_t key_pos = 0;
while (desc_pos < description.size() && key_pos < sorting_key_columns.size())
{ {
if (forbidden_columns.contains(description[i].column_name)) if (forbidden_columns.contains(description[desc_pos].column_name))
break; break;
int current_direction = matchSortDescriptionAndKey( int current_direction = matchSortDescriptionAndKey(actions[desc_pos]->getActions(), description[desc_pos], sorting_key_columns[key_pos]);
actions[i]->getActions(), description[i], sorting_key_columns[i + fixed_prefix_size]); bool is_matched = current_direction && (desc_pos == 0 || current_direction == read_direction);
if (!is_matched)
{
/// If one of the sorting columns is constant after filtering,
/// skip it, because it won't affect order anymore.
if (fixed_sorting_columns.contains(sorting_key_columns[key_pos]))
{
++key_pos;
continue;
}
if (!current_direction || (i > 0 && current_direction != read_direction))
break; break;
}
if (i == 0) if (desc_pos == 0)
read_direction = current_direction; read_direction = current_direction;
order_key_prefix_descr.push_back(required_sort_description[i]); sort_description_for_merging.push_back(description[desc_pos]);
++desc_pos;
++key_pos;
} }
if (order_key_prefix_descr.empty()) if (sort_description_for_merging.empty())
return {}; return {};
SortDescription order_key_fixed_prefix_descr; return std::make_shared<InputOrderInfo>(std::move(sort_description_for_merging), key_pos, read_direction, limit);
order_key_fixed_prefix_descr.reserve(fixed_prefix_size);
for (size_t i = 0; i < fixed_prefix_size; ++i)
order_key_fixed_prefix_descr.emplace_back(sorting_key_columns[i], read_direction);
return std::make_shared<InputOrderInfo>(
std::move(order_key_fixed_prefix_descr),
std::move(order_key_prefix_descr),
read_direction, limit);
} }
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder( InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(
@ -255,10 +281,10 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(
aliases_actions[i] = expression_analyzer.getActions(true); aliases_actions[i] = expression_analyzer.getActions(true);
} }
return getInputOrderImpl(metadata_snapshot, aliases_sort_description, aliases_actions, limit); return getInputOrderImpl(metadata_snapshot, aliases_sort_description, aliases_actions, context, limit);
} }
return getInputOrderImpl(metadata_snapshot, required_sort_description, elements_actions, limit); return getInputOrderImpl(metadata_snapshot, required_sort_description, elements_actions, context, limit);
} }
} }

View File

@ -12,8 +12,6 @@ namespace DB
* common prefix, which is needed for * common prefix, which is needed for
* performing reading in order of PK. * performing reading in order of PK.
*/ */
class Context;
class ReadInOrderOptimizer class ReadInOrderOptimizer
{ {
public: public:
@ -30,6 +28,7 @@ private:
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const SortDescription & description, const SortDescription & description,
const ManyExpressionActions & actions, const ManyExpressionActions & actions,
const ContextPtr & context,
UInt64 limit) const; UInt64 limit) const;
/// Actions for every element of order expression to analyze functions for monotonicity /// Actions for every element of order expression to analyze functions for monotonicity

View File

@ -101,17 +101,33 @@ struct FilterDAGInfo
struct InputOrderInfo struct InputOrderInfo
{ {
SortDescription order_key_fixed_prefix_descr; /// Sort description for merging of already sorted streams.
SortDescription order_key_prefix_descr; /// Always a prefix of ORDER BY or GROUP BY description specified in query.
SortDescription sort_description_for_merging;
/** Size of prefix of sorting key that is already
* sorted before execution of sorting or aggreagation.
*
* Contains both columns that scpecified in
* ORDER BY or GROUP BY clause of query
* and columns that turned out to be already sorted.
*
* E.g. if we have sorting key ORDER BY (a, b, c, d)
* and query with `WHERE a = 'x' AND b = 'y' ORDER BY c, d` clauses.
* sort_description_for_merging will be equal to (c, d) and
* used_prefix_of_sorting_key_size will be equal to 4.
*/
size_t used_prefix_of_sorting_key_size;
int direction; int direction;
UInt64 limit; UInt64 limit;
InputOrderInfo( InputOrderInfo(
const SortDescription & order_key_fixed_prefix_descr_, const SortDescription & sort_description_for_merging_,
const SortDescription & order_key_prefix_descr_, size_t used_prefix_of_sorting_key_size_,
int direction_, UInt64 limit_) int direction_, UInt64 limit_)
: order_key_fixed_prefix_descr(order_key_fixed_prefix_descr_) : sort_description_for_merging(sort_description_for_merging_)
, order_key_prefix_descr(order_key_prefix_descr_) , used_prefix_of_sorting_key_size(used_prefix_of_sorting_key_size_)
, direction(direction_), limit(limit_) , direction(direction_), limit(limit_)
{ {
} }

View File

@ -0,0 +1,8 @@
1 100000 1
1 100001 1
1 100002 1
ReadType: InOrder
100000 1
100001 1
100002 1
ReadType: InOrder

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_agg"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_agg ( A Int64, B Int64 ) Engine=MergeTree() ORDER BY (A, B)"
$CLICKHOUSE_CLIENT -q "INSERT INTO test_agg SELECT intDiv(number, 1e5), number FROM numbers(1e6)"
$CLICKHOUSE_CLIENT --optimize_aggregation_in_order 1 -q "SELECT A, B, count() FROM test_agg where A = 1 GROUP BY A, B ORDER BY A, B LIMIT 3"
$CLICKHOUSE_CLIENT --optimize_aggregation_in_order 1 -q "EXPLAIN actions = 1 SELECT A, B, count() FROM test_agg where A = 1 GROUP BY A, B ORDER BY A, B LIMIT 3" | grep -o "ReadType: InOrder"
$CLICKHOUSE_CLIENT --optimize_aggregation_in_order 1 -q "SELECT B, count() FROM test_agg where A = 1 GROUP BY B ORDER BY B LIMIT 3"
$CLICKHOUSE_CLIENT --optimize_aggregation_in_order 1 -q "EXPLAIN actions = 1 SELECT B, count() FROM test_agg where A = 1 GROUP BY B ORDER BY B LIMIT 3" | grep -o "ReadType: InOrder"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_agg"

View File

@ -0,0 +1,132 @@
#!/usr/bin/env python3
import os
import sys
from itertools import chain, combinations, permutations
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
def powerset(iterable):
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
queries = [
{"optimize": True, "where": [], "order_by": ["a"]},
{"optimize": True, "where": [], "order_by": ["a", "b"]},
{"optimize": True, "where": [], "order_by": ["a", "b", "c"]},
{"optimize": True, "where": [], "order_by": ["a", "b", "c", "d"]},
{"optimize": True, "where": ["a"], "order_by": ["a"]},
{"optimize": True, "where": ["a"], "order_by": ["a", "b"]},
{"optimize": True, "where": ["a"], "order_by": ["b"]},
{"optimize": True, "where": ["a"], "order_by": ["b", "c"]},
{"optimize": True, "where": ["b"], "order_by": ["a"]},
{"optimize": True, "where": ["b"], "order_by": ["a", "c"]},
{"optimize": False, "where": ["b"], "order_by": ["b", "c"]},
{"optimize": True, "where": ["c"], "order_by": ["a"]},
{"optimize": True, "where": ["c"], "order_by": ["a", "b"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["a"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["a", "b"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["a", "c"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["a", "b", "c"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["a", "b", "c", "d"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["b", "c"]},
{"optimize": True, "where": ["a", "b"], "order_by": ["c", "d"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["a"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["a", "b"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["b", "d"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["a", "b", "c"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["b", "c", "d"]},
{"optimize": True, "where": ["a", "c"], "order_by": ["a", "b", "c", "d"]},
{"optimize": False, "where": [], "order_by": ["b"]},
{"optimize": False, "where": [], "order_by": ["b", "a"]},
{"optimize": False, "where": [], "order_by": ["b", "c"]},
{"optimize": False, "where": ["a"], "order_by": ["c"]},
{"optimize": False, "where": ["a"], "order_by": ["c", "b"]},
{"optimize": False, "where": ["a"], "order_by": ["c", "d"]},
{"optimize": False, "where": ["c"], "order_by": ["c", "d"]},
{"optimize": False, "where": ["c"], "order_by": ["b", "c"]},
]
client.query("DROP TABLE IF EXISTS t_fixed_prefix")
client.query(
"""
CREATE TABLE t_fixed_prefix (a UInt32, b UInt32, c UInt32, d UInt32, e UInt32)
ENGINE = MergeTree ORDER BY (a, b, c, d)"""
)
client.query("SYSTEM STOP MERGES t_fixed_prefix")
# create several parts
for _ in range(4):
client.query(
"INSERT INTO t_fixed_prefix SELECT number % 2, number % 10, number % 100, number % 1000, number FROM numbers(25000)"
)
def check_query(
fixed_columns, order_by_columns, should_be_optimized, should_use_finish_sorting
):
where_clause = " AND ".join([f"{c} = 1" for c in fixed_columns])
order_by_clause = ", ".join(order_by_columns)
query = "SELECT {} FROM t_fixed_prefix".format(order_by_clause)
if len(where_clause) != 0:
query += " WHERE " + where_clause
if len(order_by_clause) != 0:
query += " ORDER BY " + order_by_clause
query += " SETTINGS optimize_read_in_order = {}"
res_optimized = client.query(query.format(1))
res_not_optimized = client.query(query.format(0))
if res_optimized != res_not_optimized:
print("Failed query {}. Result of queries mismatched".format(query))
exit(1)
res_explain = client.query("EXPLAIN PIPELINE {}".format(query.format(1)))
is_optimized = "MergeSortingTransform" not in res_explain
uses_finish_sorting = "FinishSortingTransform" in res_explain
if (
is_optimized != should_be_optimized
or uses_finish_sorting != should_use_finish_sorting
):
print(
"""
Wrong query pipeline is built for query {}:
{}
Should be optimized: {}.
Is optimized: {}.
Should use FinishSortingTransform: {}.
Uses FinishSortingTransform: {}
""".format(
query.format(1),
res_explain,
should_be_optimized,
is_optimized,
should_use_finish_sorting,
uses_finish_sorting,
)
)
exit(1)
for query in queries:
check_query(query["where"], query["order_by"], query["optimize"], False)
check_query(query["where"], query["order_by"] + ["e"], query["optimize"], query["optimize"])
where_columns = [f"bitNot({col})" for col in query["where"]]
check_query(where_columns, query["order_by"], query["optimize"], False)
check_query(where_columns, query["order_by"] + ["e"], query["optimize"], query["optimize"])
print("OK")

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
# Tag no-fasttest: Require python libraries like scipy, pandas and numpy
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02346_read_in_order_fixed_prefix.python