Review fixes.

This commit is contained in:
Nikolai Kochetov 2022-11-09 16:07:38 +00:00
parent 5a3d4cd72e
commit 997881c7f7
9 changed files with 77 additions and 90 deletions

View File

@ -1447,14 +1447,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
for (const auto & key_name : key_names)
order_descr.emplace_back(key_name);
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*context);
auto sorting_step = std::make_unique<SortingStep>(
plan.getCurrentDataStream(),
@ -2619,14 +2612,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
// happens in case of `over ()`.
if (!window.full_sort_description.empty() && (i == 0 || !sortIsPrefix(window, *windows_sorted[i - 1])))
{
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*context);
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
@ -2680,14 +2666,7 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo
const Settings & settings = context->getSettingsRef();
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*context);
/// Merge the sorted blocks.
auto sorting_step = std::make_unique<SortingStep>(

View File

@ -571,14 +571,7 @@ void Planner::buildQueryPlanIfNeeded()
if (!window_description.full_sort_description.empty() &&
(i == 0 || !sortDescriptionIsPrefix(window_description.full_sort_description, window_descriptions[i - 1].full_sort_description)))
{
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = query_context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*query_context);
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
@ -676,14 +669,7 @@ void Planner::buildQueryPlanIfNeeded()
const Settings & settings = query_context->getSettingsRef();
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = query_context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*query_context);
/// Merge the sorted blocks
auto sorting_step = std::make_unique<SortingStep>(

View File

@ -528,14 +528,7 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
for (const auto & key_name : key_names)
sort_description.emplace_back(key_name);
SortingStep::Settings sort_settings;
sort_settings.max_block_size = settings.max_block_size;
sort_settings.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sort_settings.max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
sort_settings.remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
sort_settings.max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
sort_settings.tmp_data = query_context->getTempDataOnDisk();
sort_settings.min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
SortingStep::Settings sort_settings(*query_context);
auto sorting_step = std::make_unique<SortingStep>(
plan.getCurrentDataStream(),

View File

@ -30,10 +30,10 @@ struct Optimization
const bool QueryPlanOptimizationSettings::* const is_enabled{};
};
/// Move ARRAY JOIN up if possible.
/// Move ARRAY JOIN up if possible
size_t tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Move LimitStep down if possible.
/// Move LimitStep down if possible
size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
/// Split FilterStep into chain `ExpressionStep -> FilterStep`, where FilterStep contains minimal number of nodes.
@ -59,6 +59,10 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
size_t tryDistinctReadInOrder(QueryPlan::Node * node);
/// Put some steps under union, so that plan optimisation could be applied to union parts separately.
/// For example, the plan can be rewritten like:
/// - Something - - Expression - Something -
/// - Expression - Union - Something - => - Union - Expression - Something -
/// - Something - - Expression - Something -
size_t tryLiftUpUnion(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
@ -85,7 +89,7 @@ struct Frame
using Stack = std::vector<Frame>;
/// Second pass optimizations.
/// Second pass optimizations
void optimizePrimaryKeyCondition(const Stack & stack);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);

View File

@ -22,7 +22,7 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// walk through the plan
/// (1) check if nodes below preliminary distinct preserve sorting
/// (2) gather transforming steps to update their sorting properties later
std::vector<ITransformingStep *> steps2update;
std::vector<ITransformingStep *> steps_to_update;
QueryPlan::Node * node = parent_node;
while (!node->children.empty())
{
@ -34,7 +34,7 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
if (!traits.preserves_sorting)
return 0;
steps2update.push_back(step);
steps_to_update.push_back(step);
node = node->children.front();
}
@ -90,11 +90,11 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// update data stream's sorting properties for found transforms
const DataStream * input_stream = &read_from_merge_tree->getOutputStream();
while (!steps2update.empty())
while (!steps_to_update.empty())
{
steps2update.back()->updateInputStream(*input_stream);
input_stream = &steps2update.back()->getOutputStream();
steps2update.pop_back();
steps_to_update.back()->updateInputStream(*input_stream);
input_stream = &steps_to_update.back()->getOutputStream();
steps_to_update.pop_back();
}
return 0;

View File

@ -87,6 +87,7 @@ QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
using FixedColumns = std::unordered_set<const ActionsDAG::Node *>;
/// Right now we find only simple cases like 'and(..., and(..., and(column = value, ...), ...'
/// Injective functions are supported here. For a condition 'injectiveFunction(x) = 5' column 'x' is fixed.
void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expression, FixedColumns & fixed_columns)
{
std::stack<const ActionsDAG::Node *> stack;
@ -107,23 +108,21 @@ void appendFixedColumnsFromFilterExpression(const ActionsDAG::Node & filter_expr
else if (name == "equals")
{
const ActionsDAG::Node * maybe_fixed_column = nullptr;
bool is_single = true;
size_t num_constant_columns = 0;
for (const auto & child : node->children)
{
if (!child->column)
{
if (!maybe_fixed_column)
maybe_fixed_column = child;
else
is_single = false;
}
if (child->column)
++num_constant_columns;
else
maybe_fixed_column = child;
}
if (maybe_fixed_column && is_single)
if (maybe_fixed_column && num_constant_columns + 1 == node->children.size())
{
//std::cerr << "====== Added fixed column " << maybe_fixed_column->result_name << ' ' << static_cast<const void *>(maybe_fixed_column) << std::endl;
fixed_columns.insert(maybe_fixed_column);
/// Support injective functions chain.
const ActionsDAG::Node * maybe_injective = maybe_fixed_column;
while (maybe_injective->type == ActionsDAG::ActionType::FUNCTION
&& maybe_injective->children.size() == 1
@ -146,6 +145,8 @@ void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expression)
dag = expression->clone();
}
/// This function builds a common DAG which is a gerge of DAGs from Filter and Expression steps chain.
/// Additionally, build a set of fixed columns.
void buildSortingDAG(QueryPlan::Node & node, ActionsDAGPtr & dag, FixedColumns & fixed_columns)
{
IQueryPlanStep * step = node.step.get();
@ -274,7 +275,7 @@ void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
/// * Input nodes are mapped by name.
/// * Function is mapped to function if all children are mapped and function names are same.
/// * Alias is mapped to it's children mapping.
/// * Monotonic function can be mapped to it's children mapping if dirrect mapping does not exist.
/// * Monotonic function can be mapped to it's children mapping if direct mapping does not exist.
/// In this case, information about monotonicity is filled.
/// * Mapped node is nullptr if there is no mapping found.
///
@ -345,13 +346,13 @@ MatchedTrees::Matches matchTrees(const ActionsDAG & inner_dag, const ActionsDAG
};
MatchedTrees::Matches matches;
std::stack<Frame> stack;
for (const auto & node : outer_dag.getNodes())
{
if (matches.contains(&node))
continue;
std::stack<Frame> stack;
stack.push(Frame{&node, {}});
while (!stack.empty())
{
@ -410,8 +411,9 @@ MatchedTrees::Matches matchTrees(const ActionsDAG & inner_dag, const ActionsDAG
if (frame.mapped_children.size() > 1)
{
std::vector<Parents *> other_parents;
other_parents.reserve(frame.mapped_children.size());
for (size_t i = 1; i < frame.mapped_children.size(); ++i)
size_t mapped_children_size = frame.mapped_children.size();
other_parents.reserve(mapped_children_size);
for (size_t i = 1; i < mapped_children_size; ++i)
other_parents.push_back(&inner_parents[frame.mapped_children[i]]);
for (const auto * parent : *intersection)
@ -548,17 +550,17 @@ InputOrderInfoPtr buildInputOrderInfo(
///
/// So far, 0 means any direction is possible. It is ok for constant prefix.
int read_direction = 0;
size_t next_descr_column = 0;
size_t next_description_column = 0;
size_t next_sort_key = 0;
while (next_descr_column < description.size() && next_sort_key < sorting_key_columns.size())
while (next_description_column < description.size() && next_sort_key < sorting_key_columns.size())
{
const auto & sorting_key_column = sorting_key_columns[next_sort_key];
const auto & descr = description[next_descr_column];
const auto & sort_column_description = description[next_description_column];
/// If required order depend on collation, it cannot be matched with primary key order.
/// Because primary keys cannot have collations.
if (descr.collator)
if (sort_column_description.collator)
break;
/// Direction for current sort key.
@ -578,20 +580,20 @@ InputOrderInfoPtr buildInputOrderInfo(
if (sort_column_node->type != ActionsDAG::ActionType::INPUT)
break;
if (descr.column_name != sorting_key_column)
if (sort_column_description.column_name != sorting_key_column)
break;
current_direction = descr.direction;
current_direction = sort_column_description.direction;
//std::cerr << "====== (no dag) Found direct match" << std::endl;
++next_descr_column;
++next_description_column;
++next_sort_key;
}
else
{
const ActionsDAG::Node * sort_node = dag->tryFindInOutputs(descr.column_name);
const ActionsDAG::Node * sort_node = dag->tryFindInOutputs(sort_column_description.column_name);
/// It is possible when e.g. sort by array joined column.
if (!sort_node)
break;
@ -609,14 +611,14 @@ InputOrderInfoPtr buildInputOrderInfo(
/// 'SELECT x, y FROM table WHERE x = 42 ORDER BY x + 1, y + 1'
/// Here, 'x + 1' would be a fixed point. But it is reasonable to read-in-order.
current_direction = descr.direction;
current_direction = sort_column_description.direction;
if (match.monotonicity)
{
current_direction *= match.monotonicity->direction;
strict_monotonic = match.monotonicity->strict;
}
++next_descr_column;
++next_description_column;
++next_sort_key;
}
else if (fixed_key_columns.contains(sort_column_node))
@ -632,8 +634,8 @@ InputOrderInfoPtr buildInputOrderInfo(
if (!is_fixed_column)
break;
order_key_prefix_descr.push_back(descr);
++next_descr_column;
order_key_prefix_descr.push_back(sort_column_description);
++next_description_column;
}
}
@ -646,7 +648,7 @@ InputOrderInfoPtr buildInputOrderInfo(
read_direction = current_direction;
if (current_direction)
order_key_prefix_descr.push_back(descr);
order_key_prefix_descr.push_back(sort_column_description);
if (current_direction && !strict_monotonic)
break;
@ -712,9 +714,7 @@ InputOrderInfoPtr buildInputOrderInfo(
return order_info;
}
InputOrderInfoPtr buildInputOrderInfo(
SortingStep & sorting,
QueryPlan::Node & node)
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node);
if (!reading_node)

View File

@ -441,7 +441,11 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (info.sum_marks == 0)
return {};
bool added_prewhere_output = false;
/// PREWHERE actions can remove some input columns (which are needed only for prewhere condition).
/// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key.
/// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
/// See 02354_read_in_order_prewhere.sql as an example.
bool have_input_columns_removed_after_prewhere = false;
if (prewhere_info && prewhere_info->prewhere_actions)
{
auto & outputs = prewhere_info->prewhere_actions->getOutputs();
@ -451,7 +455,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (!outputs_set.contains(input))
{
outputs.push_back(input);
added_prewhere_output = true;
have_input_columns_removed_after_prewhere = true;
}
}
}
@ -626,7 +630,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
}
if (!pipes.empty() && (need_preliminary_merge || added_prewhere_output))
if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe_header);

View File

@ -23,6 +23,23 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
SortingStep::Settings::Settings(const Context & context)
{
const auto & settings = context.getSettingsRef();
max_block_size = settings.max_block_size;
size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
tmp_data = context.getTempDataOnDisk();
min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
}
SortingStep::Settings::Settings(size_t max_block_size_)
{
max_block_size = max_block_size_;
}
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
@ -71,8 +88,8 @@ SortingStep::SortingStep(
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, limit(limit_)
, sort_settings(max_block_size_)
{
sort_settings.max_block_size = max_block_size_;
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
@ -87,6 +104,7 @@ SortingStep::SortingStep(
, type(Type::MergingSorted)
, result_description(std::move(sort_description_))
, limit(limit_)
, sort_settings(max_block_size_)
{
sort_settings.max_block_size = max_block_size_;
/// TODO: check input_stream is partially sorted (each port) by the same description.

View File

@ -27,6 +27,9 @@ public:
size_t max_bytes_before_external_sort = 0;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
explicit Settings(const Context & context);
explicit Settings(size_t max_block_size_);
};
/// Full