reuse sort order

This commit is contained in:
Alexander Kuzmenkov 2021-02-10 15:37:05 +03:00
parent d5b0f30732
commit b57b3244f5

View File

@ -1831,46 +1831,117 @@ void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const Act
query_plan.addStep(std::move(expression_step));
}
static bool windowDescriptionComparator(const WindowDescription * _left,
const WindowDescription * _right)
{
const auto & left = _left->full_sort_description;
const auto & right = _right->full_sort_description;
if (left.size() != right.size())
{
return left.size() < right.size();
}
for (size_t i = 0; i < left.size(); ++i)
{
if (left[i].column_name < right[i].column_name)
{
return true;
}
if (left[i].column_number < right[i].column_number)
{
return true;
}
if (left[i].direction < right[i].direction)
{
return true;
}
if (left[i].nulls_direction < right[i].nulls_direction)
{
return true;
}
assert(left[i] == right[i]);
}
return false;
}
static bool sortIsPrefix(const WindowDescription & _prefix,
const WindowDescription & _full)
{
const auto & prefix = _prefix.full_sort_description;
const auto & full = _full.full_sort_description;
if (prefix.size() > full.size())
{
return false;
}
for (size_t i = 0; i < prefix.size(); ++i)
{
if (full[i] != prefix[i])
{
return false;
}
}
return true;
}
void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
{
std::vector<const WindowDescription *> windows_sorted;
for (const auto & [_, w] : query_analyzer->windowDescriptions())
{
const Settings & settings = context->getSettingsRef();
windows_sorted.push_back(&w);
}
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort,
settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(partial_sorting));
std::sort(windows_sorted.begin(), windows_sorted.end(),
windowDescriptionComparator);
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */,
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription("Merge sorted blocks for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
const Settings & settings = context->getSettingsRef();
for (size_t i = 0; i < windows_sorted.size(); ++i)
{
const auto & w = *windows_sorted[i];
if (i == 0 || !sortIsPrefix(w, *windows_sorted[i - 1]))
{
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort,
settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(partial_sorting));
// First MergeSorted, now MergingSorted.
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */);
merging_sorted->setStepDescription("Merge sorted streams for window '"
+ w.window_name + "'");
query_plan.addStep(std::move(merging_sorted));
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */,
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription(
"Merge sorted blocks for window '" + w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
// First MergeSorted, now MergingSorted.
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */);
merging_sorted->setStepDescription(
"Merge sorted streams for window '" + w.window_name + "'");
query_plan.addStep(std::move(merging_sorted));
}
auto window_step = std::make_unique<WindowStep>(
query_plan.getCurrentDataStream(),