limit fixes + func(primary_key) group by works

This commit is contained in:
Dmitry 2020-05-08 16:13:50 +03:00
parent 465dfe47fc
commit e7b747b0b7
5 changed files with 30 additions and 9 deletions

View File

@ -1746,10 +1746,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
{
auto & query = getSelectQuery();
SortDescription group_by_descr = getSortDescriptionFromGroupBy(query, *context);
UInt64 limit = getLimitForSorting(query, *context);
executeOrderOptimized(pipeline, group_by_info, limit, group_by_descr);
executeOrderOptimized(pipeline, group_by_info, 0, group_by_descr);
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, group_by_descr);

View File

@ -97,6 +97,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (!res_block_size)
{
// std::cerr << "Creating first state with key " << key_begin << "\n";
LOG_TRACE(log, "AggregatingInOrder");
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++res_block_size;
}
@ -228,6 +229,7 @@ void AggregatingInOrderTransform::generate()
if (res_block_size)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
LOG_TRACE(log, "Aggregated");
Block res = params->getHeader().cloneEmpty();
for (size_t i = 0; i < res_key_columns.size(); ++i)

View File

@ -47,6 +47,8 @@ private:
bool is_consume_finished = false;
Chunk current_chunk;
Logger * log = &Logger::get("AggregatingInOrderTransform");
};
}

View File

@ -834,6 +834,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
return res;
}
static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeData & data)
{
const auto & header = pipe.getHeader();
auto projection = std::make_shared<ExpressionActions>(header.getNamesAndTypesList(), data.global_context);
projection->add(ExpressionAction::project(header.getNames()));
return projection;
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
@ -1033,13 +1041,20 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
sort_description.emplace_back(data.sorting_key_columns[j], input_sorting_info->direction, 1);
}
/// Project input columns to drop columns from sorting_key_prefix_expr
/// to allow execute the same expression later.
/// NOTE: It may lead to double computation of expression.
auto projection = createProjection(pipes.back(), data);
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
auto merging_sorted = std::make_shared<MergingSortedTransform>(
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
res.emplace_back(std::move(pipes), std::move(merging_sorted));
Pipe merged(std::move(pipes), std::move(merging_sorted));
merged.addSimpleTransform(std::make_shared<ExpressionTransform>(merged.getHeader(), projection));
res.emplace_back(std::move(merged));
}
else
res.emplace_back(std::move(pipes.front()));
@ -1085,6 +1100,10 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
use_uncompressed_cache = false;
Pipes pipes;
/// Project input columns to drop columns from sorting_key_expr
/// to allow execute the same expression later.
/// NOTE: It may lead to double computation of expression.
ExpressionActionsPtr projection;
for (const auto & part : parts)
{
@ -1095,6 +1114,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
virt_columns, part.part_index_in_query);
Pipe pipe(std::move(source_processor));
if (!projection)
projection = createProjection(pipe, data);
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
pipes.emplace_back(std::move(pipe));
}
@ -1167,6 +1189,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (merged_processor)
{
Pipe pipe(std::move(pipes), std::move(merged_processor));
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), projection));
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
}

View File

@ -55,7 +55,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
int read_direction = required_sort_description.at(0).direction;
size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size());
std::cerr << "Looking for common prefix\n";
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(required_sort_description[i].column_name))
@ -83,7 +82,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
}
else
found_function = true;
std::cerr << "Function was found\n";
if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i])
{
current_direction = 0;
@ -96,7 +94,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
current_direction = 0;
break;
}
std::cerr << "Function has info about monotonicity\n";
auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {});
if (!monotonicity.is_monotonic)
{
@ -105,15 +102,12 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
}
else if (!monotonicity.is_positive)
current_direction *= -1;
std::cerr << "Function is monotonic\n";
}
if (!found_function)
current_direction = 0;
std::cerr << current_direction << " " << read_direction << "\n";
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
std::cerr << "Adding function\n";
if (i == 0)
read_direction = current_direction;