Implement max_rows_to_sort again

This commit is contained in:
Alexey Milovidov 2020-05-17 10:27:55 +03:00
parent de8120d69a
commit f2d438b79f

View File

@ -2063,10 +2063,9 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
const Settings & settings = context->getSettingsRef();
/// TODO: Limits on sorting
// IBlockInputStream::LocalLimits limits;
// limits.mode = IBlockInputStream::LIMITS_TOTAL;
// limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
if (input_sorting_info)
{
@ -2103,6 +2102,8 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
@ -2122,6 +2123,15 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
return transform;
});
/// Merge the sorted blocks.
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{