diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 5376b1e4d5c..a5a409d0f1d 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1746,10 +1746,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const { auto & query = getSelectQuery(); SortDescription group_by_descr = getSortDescriptionFromGroupBy(query, *context); - UInt64 limit = getLimitForSorting(query, *context); - executeOrderOptimized(pipeline, group_by_info, limit, group_by_descr); + executeOrderOptimized(pipeline, group_by_info, 0, group_by_descr); + pipeline.resize(1); pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared(header, transform_params, group_by_descr, group_by_descr); diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index fc9473bfd6c..20e623010d4 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -97,6 +97,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) if (!res_block_size) { // std::cerr << "Creating first state with key " << key_begin << "\n"; + LOG_TRACE(log, "AggregatingInOrder"); params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns); ++res_block_size; } @@ -228,6 +229,7 @@ void AggregatingInOrderTransform::generate() if (res_block_size) params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + LOG_TRACE(log, "Aggregated"); Block res = params->getHeader().cloneEmpty(); for (size_t i = 0; i < res_key_columns.size(); ++i) diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.h b/src/Processors/Transforms/AggregatingInOrderTransform.h index 8afb83232db..5928ab97972 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.h +++ b/src/Processors/Transforms/AggregatingInOrderTransform.h @@ -47,6 +47,8 @@ private: bool is_consume_finished = false; Chunk current_chunk; + + Logger * log = &Logger::get("AggregatingInOrderTransform"); }; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 76314e823f9..61c81c9f2b4 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -834,6 +834,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( return res; } +static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeData & data) +{ + const auto & header = pipe.getHeader(); + auto projection = std::make_shared(header.getNamesAndTypesList(), data.global_context); + projection->add(ExpressionAction::project(header.getNames())); + return projection; +} + Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( RangesInDataParts && parts, size_t num_streams, @@ -1033,13 +1041,20 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( sort_description.emplace_back(data.sorting_key_columns[j], input_sorting_info->direction, 1); } + /// Project input columns to drop columns from sorting_key_prefix_expr + /// to allow execute the same expression later. + /// NOTE: It may lead to double computation of expression. + auto projection = createProjection(pipes.back(), data); + for (auto & pipe : pipes) pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), sorting_key_prefix_expr)); auto merging_sorted = std::make_shared( pipes.back().getHeader(), pipes.size(), sort_description, max_block_size); - res.emplace_back(std::move(pipes), std::move(merging_sorted)); + Pipe merged(std::move(pipes), std::move(merging_sorted)); + merged.addSimpleTransform(std::make_shared(merged.getHeader(), projection)); + res.emplace_back(std::move(merged)); } else res.emplace_back(std::move(pipes.front())); @@ -1085,6 +1100,10 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( use_uncompressed_cache = false; Pipes pipes; + /// Project input columns to drop columns from sorting_key_expr + /// to allow execute the same expression later. + /// NOTE: It may lead to double computation of expression. + ExpressionActionsPtr projection; for (const auto & part : parts) { @@ -1095,6 +1114,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( virt_columns, part.part_index_in_query); Pipe pipe(std::move(source_processor)); + if (!projection) + projection = createProjection(pipe, data); + pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), data.sorting_key_expr)); pipes.emplace_back(std::move(pipe)); } @@ -1167,6 +1189,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (merged_processor) { Pipe pipe(std::move(pipes), std::move(merged_processor)); + pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), projection)); pipes = Pipes(); pipes.emplace_back(std::move(pipe)); } diff --git a/src/Storages/ReadInOrderOptimizer.cpp b/src/Storages/ReadInOrderOptimizer.cpp index ece90c97ce6..e164f1928cf 100644 --- a/src/Storages/ReadInOrderOptimizer.cpp +++ b/src/Storages/ReadInOrderOptimizer.cpp @@ -55,7 +55,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora int read_direction = required_sort_description.at(0).direction; size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size()); - std::cerr << "Looking for common prefix\n"; for (size_t i = 0; i < prefix_size; ++i) { if (forbidden_columns.count(required_sort_description[i].column_name)) @@ -83,7 +82,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora } else found_function = true; - std::cerr << "Function was found\n"; if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i]) { current_direction = 0; @@ -96,7 +94,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora current_direction = 0; break; } - std::cerr << "Function has info about monotonicity\n"; auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {}); if (!monotonicity.is_monotonic) { @@ -105,15 +102,12 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora } else if (!monotonicity.is_positive) current_direction *= -1; - std::cerr << "Function is monotonic\n"; } if (!found_function) current_direction = 0; - std::cerr << current_direction << " " << read_direction << "\n"; if (!current_direction || (i > 0 && current_direction != read_direction)) break; - std::cerr << "Adding function\n"; if (i == 0) read_direction = current_direction;