Merge pull request #30335 from ClickHouse/single-sorting-step

Single sorting step
This commit is contained in:
Nikolai Kochetov 2021-10-22 11:34:29 +03:00 committed by GitHub
commit 1533f4af57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 406 additions and 741 deletions

View File

@ -43,15 +43,12 @@
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/OffsetStep.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
@ -2261,35 +2258,20 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
// happens in case of `over ()`.
if (!w.full_sort_description.empty() && (i == 0 || !sortIsPrefix(w, *windows_sorted[i - 1])))
{
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '" + w.window_name + "'");
query_plan.addStep(std::move(partial_sorting));
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode),
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription("Merge sorted blocks for window '" + w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
// First MergeSorted, now MergingSorted.
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */);
merging_sorted->setStepDescription("Merge sorted streams for window '" + w.window_name + "'");
query_plan.addStep(std::move(merging_sorted));
sorting_step->setStepDescription("Sorting for window '" + w.window_name + "'");
query_plan.addStep(std::move(sorting_step));
}
auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), w, w.window_functions);
@ -2304,7 +2286,7 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
{
const Settings & settings = context->getSettingsRef();
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
auto finish_sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
@ -2334,32 +2316,21 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo
const Settings & settings = context->getSettingsRef();
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr,
limit,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for ORDER BY");
query_plan.addStep(std::move(partial_sorting));
/// Merge the sorted blocks.
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
auto sorting_step = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr,
settings.max_block_size,
limit,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode),
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription("Merge sorted blocks for ORDER BY");
query_plan.addStep(std::move(merge_sorting_step));
/// If there are several streams, we merge them into one
executeMergeSorted(query_plan, output_order_descr, limit, "for ORDER BY");
sorting_step->setStepDescription("Sorting for ORDER BY");
query_plan.addStep(std::move(sorting_step));
}
@ -2377,7 +2348,7 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const So
const Settings & settings = context->getSettingsRef();
auto merging_sorted
= std::make_unique<MergingSortedStep>(query_plan.getCurrentDataStream(), sort_description, settings.max_block_size, limit);
= std::make_unique<SortingStep>(query_plan.getCurrentDataStream(), sort_description, settings.max_block_size, limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));

View File

@ -1,115 +0,0 @@
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
FinishSortingStep::FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
}
void FinishSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void FinishSortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size,
limit_for_merging);
pipeline.addTransform(std::move(transform));
}
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, prefix_description, result_description, max_block_size, limit);
});
}
}
void FinishSortingStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << "Prefix sort description: ";
dumpSortDescription(prefix_description, input_streams.front().header, settings.out);
settings.out << '\n';
settings.out << prefix << "Result sort description: ";
dumpSortDescription(result_description, input_streams.front().header, settings.out);
settings.out << '\n';
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void FinishSortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Prefix Sort Description", explainSortDescription(prefix_description, input_streams.front().header));
map.add("Result Sort Description", explainSortDescription(result_description, input_streams.front().header));
if (limit)
map.add("Limit", limit);
}
}

View File

@ -1,36 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
namespace DB
{
/// Finish sorting of pre-sorted data. See FinishSortingTransform.
class FinishSortingStep : public ITransformingStep
{
public:
FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_);
String getName() const override { return "FinishSorting"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription prefix_description;
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
};
}

View File

@ -1,96 +0,0 @@
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
MergeSortingStep::MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
/// TODO: check input_stream is partially sorted by the same description.
output_stream->sort_description = description;
output_stream->sort_mode = input_stream.has_single_port ? DataStream::SortMode::Stream
: DataStream::SortMode::Port;
}
void MergeSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void MergeSortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
return std::make_shared<MergeSortingTransform>(
header, description, max_merged_block_size, limit,
max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort,
tmp_volume,
min_free_disk_space);
});
}
void MergeSortingStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << "Sort description: ";
dumpSortDescription(description, input_streams.front().header, settings.out);
settings.out << '\n';
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void MergeSortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Sort Description", explainSortDescription(description, input_streams.front().header));
if (limit)
map.add("Limit", limit);
}
}

View File

@ -1,47 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
/// Sorts stream of data. See MergeSortingTransform.
class MergeSortingStep : public ITransformingStep
{
public:
explicit MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio;
size_t max_bytes_before_external_sort;
VolumePtr tmp_volume;
size_t min_free_disk_space;
};
}

View File

@ -1,85 +0,0 @@
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
MergingSortedStep::MergingSortedStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, sort_description(std::move(sort_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// TODO: check input_stream is partially sorted (each port) by the same description.
output_stream->sort_description = sort_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
}
void MergingSortedStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void MergingSortedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
max_block_size, limit);
pipeline.addTransform(std::move(transform));
}
}
void MergingSortedStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << "Sort description: ";
dumpSortDescription(sort_description, input_streams.front().header, settings.out);
settings.out << '\n';
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void MergingSortedStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Sort Description", explainSortDescription(sort_description, input_streams.front().header));
if (limit)
map.add("Limit", limit);
}
}

View File

@ -1,36 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
/// Merge streams of data into single sorted stream.
class MergingSortedStep : public ITransformingStep
{
public:
explicit MergingSortedStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
String getName() const override { return "MergingSorted"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription sort_description;
size_t max_block_size;
UInt64 limit;
};
}

View File

@ -7,10 +7,7 @@
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -237,10 +234,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
// {
// }
if (typeid_cast<PartialSortingStep *>(child.get())
|| typeid_cast<MergeSortingStep *>(child.get())
|| typeid_cast<MergingSortedStep *>(child.get())
|| typeid_cast<FinishSortingStep *>(child.get()))
if (typeid_cast<SortingStep *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))

View File

@ -2,10 +2,7 @@
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations
@ -21,32 +18,15 @@ static bool tryUpdateLimitForSortingSteps(QueryPlan::Node * node, size_t limit)
QueryPlan::Node * child = nullptr;
bool updated = false;
if (auto * merging_sorted = typeid_cast<MergingSortedStep *>(step.get()))
if (auto * sorting = typeid_cast<SortingStep *>(step.get()))
{
/// TODO: remove LimitStep here.
merging_sorted->updateLimit(limit);
sorting->updateLimit(limit);
updated = true;
child = node->children.front();
}
else if (auto * finish_sorting = typeid_cast<FinishSortingStep *>(step.get()))
{
/// TODO: remove LimitStep here.
finish_sorting->updateLimit(limit);
updated = true;
}
else if (auto * merge_sorting = typeid_cast<MergeSortingStep *>(step.get()))
{
merge_sorting->updateLimit(limit);
updated = true;
child = node->children.front();
}
else if (auto * partial_sorting = typeid_cast<PartialSortingStep *>(step.get()))
{
partial_sorting->updateLimit(limit);
updated = true;
}
/// We often have chain PartialSorting -> MergeSorting -> MergingSorted
/// In case we have several sorting steps.
/// Try update limit for them also if possible.
if (child)
tryUpdateLimitForSortingSteps(child, limit);

View File

@ -1,93 +0,0 @@
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
PartialSortingStep::PartialSortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
UInt64 limit_,
SizeLimits size_limits_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, sort_description(std::move(sort_description_))
, limit(limit_)
, size_limits(size_limits_)
{
output_stream->sort_description = sort_description;
output_stream->sort_mode = DataStream::SortMode::Chunk;
}
void PartialSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void PartialSortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, sort_description, limit);
});
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = size_limits;
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
return transform;
});
}
void PartialSortingStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << "Sort description: ";
dumpSortDescription(sort_description, input_streams.front().header, settings.out);
settings.out << '\n';
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void PartialSortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Sort Description", explainSortDescription(sort_description, input_streams.front().header));
if (limit)
map.add("Limit", limit);
}
}

View File

@ -1,35 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h>
namespace DB
{
/// Sort separate chunks of data.
class PartialSortingStep : public ITransformingStep
{
public:
explicit PartialSortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
UInt64 limit_,
SizeLimits size_limits_);
String getName() const override { return "PartialSorting"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription sort_description;
UInt64 limit;
SizeLimits size_limits;
};
}

View File

@ -0,0 +1,241 @@
#include <Processors/QueryPlan/SortingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
SortingStep::SortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_block_size_,
UInt64 limit_,
SizeLimits size_limits_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::Full)
, result_description(description_)
, max_block_size(max_block_size_)
, limit(limit_)
, size_limits(size_limits_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
/// TODO: check input_stream is partially sorted by the same description.
output_stream->sort_description = result_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
}
SortingStep::SortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, type(Type::FinishSorting)
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
}
SortingStep::SortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::MergingSorted)
, result_description(std::move(sort_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// TODO: check input_stream is partially sorted (each port) by the same description.
output_stream->sort_description = result_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
}
void SortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (type == Type::FinishSorting)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size,
limit_for_merging);
pipeline.addTransform(std::move(transform));
}
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, prefix_description, result_description, max_block_size, limit);
});
}
}
else if (type == Type::Full)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = size_limits;
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
return transform;
});
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
return std::make_shared<MergeSortingTransform>(
header, result_description, max_block_size, limit,
max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort,
tmp_volume,
min_free_disk_space);
});
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
result_description,
max_block_size, limit);
pipeline.addTransform(std::move(transform));
}
}
else if (type == Type::MergingSorted)
{ /// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
result_description,
max_block_size, limit);
pipeline.addTransform(std::move(transform));
}
}
}
void SortingStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
if (!prefix_description.empty())
{
settings.out << prefix << "Prefix sort description: ";
dumpSortDescription(prefix_description, input_streams.front().header, settings.out);
settings.out << '\n';
settings.out << prefix << "Result sort description: ";
dumpSortDescription(result_description, input_streams.front().header, settings.out);
settings.out << '\n';
}
else
{
settings.out << prefix << "Sort description: ";
dumpSortDescription(result_description, input_streams.front().header, settings.out);
settings.out << '\n';
}
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void SortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (!prefix_description.empty())
{
map.add("Prefix Sort Description", explainSortDescription(prefix_description, input_streams.front().header));
map.add("Result Sort Description", explainSortDescription(result_description, input_streams.front().header));
}
else
map.add("Sort Description", explainSortDescription(result_description, input_streams.front().header));
if (limit)
map.add("Limit", limit);
}
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
/// Sort data stream
class SortingStep : public ITransformingStep
{
public:
/// Full
SortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_block_size_,
UInt64 limit_,
SizeLimits size_limits_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_);
/// FinishSorting
SortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_);
/// MergingSorted
SortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
String getName() const override { return "Sorting"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
enum class Type
{
Full,
FinishSorting,
MergingSorted,
};
Type type;
SortDescription prefix_description;
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
SizeLimits size_limits;
size_t max_bytes_before_remerge = 0;
double remerge_lowered_memory_bytes_ratio = 0;
size_t max_bytes_before_external_sort = 0;
VolumePtr tmp_volume;
size_t min_free_disk_space = 0;
};
}

View File

@ -6,12 +6,10 @@ ORDER BY timestamp ASC
LIMIT 10
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
SELECT
timestamp,
key
@ -20,7 +18,7 @@ ORDER BY toDate(timestamp) ASC
LIMIT 10
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
FinishSorting
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
@ -34,7 +32,7 @@ ORDER BY
LIMIT 10
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
FinishSorting
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree

View File

@ -23,21 +23,19 @@ lambda
optimize_read_in_order
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
FinishSorting
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
FinishSorting
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree

View File

@ -908,12 +908,10 @@ Expression ((Projection + Before ORDER BY))
Window (Window step for window \'\')
Window (Window step for window \'PARTITION BY p\')
Window (Window step for window \'PARTITION BY p ORDER BY o ASC\')
MergingSorted (Merge sorted streams for window \'PARTITION BY p ORDER BY o ASC\')
MergeSorting (Merge sorted blocks for window \'PARTITION BY p ORDER BY o ASC\')
PartialSorting (Sort each block for window \'PARTITION BY p ORDER BY o ASC\')
Expression ((Before window functions + (Projection + Before ORDER BY)))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
Sorting (Sorting for window \'PARTITION BY p ORDER BY o ASC\')
Expression ((Before window functions + (Projection + Before ORDER BY)))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
explain select
count(*) over (order by o, number),
count(*) over (order by number)
@ -923,16 +921,12 @@ from
;
Expression ((Projection + Before ORDER BY))
Window (Window step for window \'ORDER BY o ASC, number ASC\')
MergingSorted (Merge sorted streams for window \'ORDER BY o ASC, number ASC\')
MergeSorting (Merge sorted blocks for window \'ORDER BY o ASC, number ASC\')
PartialSorting (Sort each block for window \'ORDER BY o ASC, number ASC\')
Window (Window step for window \'ORDER BY number ASC\')
MergingSorted (Merge sorted streams for window \'ORDER BY number ASC\')
MergeSorting (Merge sorted blocks for window \'ORDER BY number ASC\')
PartialSorting (Sort each block for window \'ORDER BY number ASC\')
Expression ((Before window functions + (Projection + Before ORDER BY)))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
Sorting (Sorting for window \'ORDER BY o ASC, number ASC\')
Window (Window step for window \'ORDER BY number ASC\')
Sorting (Sorting for window \'ORDER BY number ASC\')
Expression ((Before window functions + (Projection + Before ORDER BY)))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
-- A test case for the sort comparator found by fuzzer.
SELECT
max(number) OVER (ORDER BY number DESC NULLS FIRST),

View File

@ -5,11 +5,8 @@ FUNCTION sipHash64
Filter column: equals
> sorting steps should know about limit
Limit 10
MergingSorted
Limit 10
MergeSorting
Limit 10
PartialSorting
Sorting
Sorting
Limit 10
-- filter push down --
> filter should be pushed down after aggregating
@ -108,9 +105,8 @@ Filter column: notEquals(y, 2)
1 0
1 1
> filter is pushed down before sorting steps
MergingSorted
MergeSorting
PartialSorting
Sorting
Sorting
Filter column: and(notEquals(x, 0), notEquals(y, 0))
1 2
1 1

View File

@ -10,7 +10,7 @@ $CLICKHOUSE_CLIENT -q "select x + 1 from (select y + 2 as x from (select dummy +
echo "> sipHash should be calculated after filtration"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select sum(x), sum(y) from (select sipHash64(number) as x, bitAnd(number, 1024) as y from numbers_mt(1000000000) limit 1000000000) where y = 0" | grep -o "FUNCTION sipHash64\|Filter column: equals"
echo "> sorting steps should know about limit"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select number from (select number from numbers(500000000) order by -number) limit 10" | grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Limit 10"
$CLICKHOUSE_CLIENT -q "explain actions = 1 select number from (select number from numbers(500000000) order by -number) limit 10" | grep -o "Sorting\|Limit 10"
echo "-- filter push down --"
echo "> filter should be pushed down after aggregating"
@ -132,7 +132,7 @@ $CLICKHOUSE_CLIENT -q "
select number % 2 as x, number % 3 as y from numbers(6) order by y desc
) where x != 0 and y != 0
settings enable_optimize_predicate_expression = 0" |
grep -o "MergingSorted\|MergeSorting\|PartialSorting\|Filter column: and(notEquals(x, 0), notEquals(y, 0))"
grep -o "Sorting\|Filter column: and(notEquals(x, 0), notEquals(y, 0))"
$CLICKHOUSE_CLIENT -q "
select x, y from (
select number % 2 as x, number % 3 as y from numbers(6) order by y desc

View File

@ -111,31 +111,3 @@
}
],
"Limit": 3,
--
"Sort Description": [
{
"Column": "number",
"Ascending": false,
"With Fill": false
},
{
"Column": "plus(number, 1)",
"Ascending": true,
"With Fill": false
}
],
"Limit": 3,
--
"Sort Description": [
{
"Column": "number",
"Ascending": false,
"With Fill": false
},
{
"Column": "plus(number, 1)",
"Ascending": true,
"With Fill": false
}
],
"Limit": 3,

View File

@ -2,31 +2,27 @@
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=0;
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Limit (preliminary LIMIT (with OFFSET))
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)

View File

@ -50,66 +50,58 @@ SettingQuotaAndLimits (Set limits and quota after reading from storage)
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
Distinct
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
Sorting (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
Sorting (Sorting for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Distinct
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
Sorting (Sorting for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
Sorting (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)