Fix tests and change pipeline

This commit is contained in:
Anastasiya Rodigina 2019-05-09 18:44:51 +03:00
parent 63b3d7dbdb
commit 23a8efe367

View File

@ -1465,25 +1465,39 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SelectQueryInfo &
if (need_sorting)
{
pipeline.transform([&](auto & stream)
if (!prefix_order_descr.empty())
{
stream = std::make_shared<FinishSortingBlockInputStream>(
stream,
prefix_order_descr,
order_descr,
settings.max_block_size,
limit);
});
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FinishSortingBlockInputStream>(
stream,
prefix_order_descr,
order_descr,
settings.max_block_size,
limit);
});
}
else
{
pipeline.transform([&](auto & stream)
{
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, limit);
/// Limits on sorting
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sorting_stream->setLimits(limits);
stream = sorting_stream;
});
}
}
// in order to read blocks in fixed order
query_info.do_not_steal_task = true;
if (order_direction == -1)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<AsynchronousBlockInputStream>(stream);
});
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<ReverseBlockInputStream>(stream);
@ -1499,8 +1513,10 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SelectQueryInfo &
if (settings.optimize_pk_order)
{
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
{
optimize_pk_order(*merge_tree);
return;
return;
}
}
pipeline.transform([&](auto & stream)