diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 1f01f1091e2..8a93dc5fd77 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -408,6 +408,29 @@ Block Aggregator::Params::getHeader( return materializeBlock(res); } +ColumnRawPtrs Aggregator::Params::makeRawKeyColumns(const Block & block) const +{ + ColumnRawPtrs key_columns(keys_size); + + for (size_t i = 0; i < keys_size; ++i) + key_columns[i] = block.safeGetByPosition(i).column.get(); + + return key_columns; +} + +Aggregator::AggregateColumnsConstData Aggregator::Params::makeAggregateColumnsData(const Block & block) const +{ + AggregateColumnsConstData aggregate_columns(aggregates_size); + + for (size_t i = 0; i < aggregates_size; ++i) + { + const auto & aggregate_column_name = aggregates[i].column_name; + aggregate_columns[i] = &typeid_cast(*block.getByName(aggregate_column_name).column).getData(); + } + + return aggregate_columns; +} + void Aggregator::Params::explain(WriteBuffer & out, size_t indent) const { Strings res; @@ -865,6 +888,38 @@ void Aggregator::executeOnBlockSmall( executeImpl(result, row_begin, row_end, key_columns, aggregate_instructions); } +void Aggregator::mergeOnBlockSmall( + AggregatedDataVariants & result, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const +{ + /// `result` will destroy the states of aggregate functions in the destructor + result.aggregator = this; + + /// How to perform the aggregation? + if (result.empty()) + { + initDataVariantsWithSizeHint(result, method_chosen, params); + result.keys_size = params.keys_size; + result.key_sizes = key_sizes; + } + + if (false) {} // NOLINT +#define M(NAME, IS_TWO_LEVEL) \ + else if (result.type == AggregatedDataVariants::Type::NAME) \ + mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \ + result.without_key, /* no_more_keys= */ false, \ + row_begin, row_end, \ + aggregate_columns_data, key_columns); + + APPLY_FOR_AGGREGATED_VARIANTS(M) +#undef M + else + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); +} + void Aggregator::executeImpl( AggregatedDataVariants & result, size_t row_begin, @@ -1181,8 +1236,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl( AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, - AggregateFunctionInstruction * aggregate_instructions, - Arena * arena) const + AggregateFunctionInstruction * aggregate_instructions) const { /// `data_variants` will destroy the states of aggregate functions in the destructor data_variants.aggregator = this; @@ -1198,17 +1252,30 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl( inst->offsets[static_cast(row_begin) - 1], inst->offsets[row_end - 1], res + inst->state_offset, - inst->batch_arguments, arena); + inst->batch_arguments, data_variants.aggregates_pool); else inst->batch_that->addBatchSinglePlaceFromInterval( row_begin, row_end, res + inst->state_offset, inst->batch_arguments, - arena); + data_variants.aggregates_pool); } } +void NO_INLINE Aggregator::mergeOnIntervalWithoutKeyImpl( + AggregatedDataVariants & data_variants, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data) const +{ + /// `data_variants` will destroy the states of aggregate functions in the destructor + data_variants.aggregator = this; + data_variants.init(AggregatedDataVariants::Type::without_key); + + mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data); +} + void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns, AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const @@ -2569,33 +2636,20 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData template void NO_INLINE Aggregator::mergeStreamsImplCase( - Block & block, Arena * aggregates_pool, Method & method [[maybe_unused]], Table & data, - AggregateDataPtr overflow_row) const + AggregateDataPtr overflow_row, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const { - ColumnRawPtrs key_columns(params.keys_size); - AggregateColumnsConstData aggregate_columns(params.aggregates_size); - - /// Remember the columns we will work with - for (size_t i = 0; i < params.keys_size; ++i) - key_columns[i] = block.safeGetByPosition(i).column.get(); - - for (size_t i = 0; i < params.aggregates_size; ++i) - { - const auto & aggregate_column_name = params.aggregates[i].column_name; - aggregate_columns[i] = &typeid_cast(*block.getByName(aggregate_column_name).column).getData(); - } - typename Method::State state(key_columns, key_sizes, aggregation_state_cache); - /// For all rows. - size_t rows = block.rows(); + std::unique_ptr places(new AggregateDataPtr[row_end]); - std::unique_ptr places(new AggregateDataPtr[rows]); - - for (size_t i = 0; i < rows; ++i) + for (size_t i = row_begin; i < row_end; ++i) { AggregateDataPtr aggregate_data = nullptr; @@ -2631,45 +2685,69 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( { /// Merge state of aggregate functions. aggregate_functions[j]->mergeBatch( - 0, rows, + row_begin, row_end, places.get(), offsets_of_aggregate_states[j], - aggregate_columns[j]->data(), + aggregate_columns_data[j]->data(), aggregates_pool); } - - /// Early release memory. - block.clear(); } template void NO_INLINE Aggregator::mergeStreamsImpl( - Block & block, + Block block, Arena * aggregates_pool, Method & method, Table & data, AggregateDataPtr overflow_row, bool no_more_keys) const +{ + const AggregateColumnsConstData & aggregate_columns_data = params.makeAggregateColumnsData(block); + const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block); + + mergeStreamsImpl( + aggregates_pool, + method, + data, + overflow_row, + no_more_keys, + 0, + block.rows(), + aggregate_columns_data, + key_columns); +} + +template +void NO_INLINE Aggregator::mergeStreamsImpl( + Arena * aggregates_pool, + Method & method, + Table & data, + AggregateDataPtr overflow_row, + bool no_more_keys, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const { if (!no_more_keys) - mergeStreamsImplCase(block, aggregates_pool, method, data, overflow_row); + mergeStreamsImplCase(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns); else - mergeStreamsImplCase(block, aggregates_pool, method, data, overflow_row); + mergeStreamsImplCase(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns); } -void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( - Block & block, +void NO_INLINE Aggregator::mergeBlockWithoutKeyStreamsImpl( + Block block, AggregatedDataVariants & result) const { - AggregateColumnsConstData aggregate_columns(params.aggregates_size); - - /// Remember the columns we will work with - for (size_t i = 0; i < params.aggregates_size; ++i) - { - const auto & aggregate_column_name = params.aggregates[i].column_name; - aggregate_columns[i] = &typeid_cast(*block.getByName(aggregate_column_name).column).getData(); - } - + AggregateColumnsConstData aggregate_columns = params.makeAggregateColumnsData(block); + mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns); +} +void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( + AggregatedDataVariants & result, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data) const +{ AggregatedDataWithoutKey & res = result.without_key; if (!res) { @@ -2678,17 +2756,15 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( res = place; } - for (size_t row = 0, rows = block.rows(); row < rows; ++row) + for (size_t row = row_begin; row < row_end; ++row) { /// Adding Values for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[row], result.aggregates_pool); + aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool); } - - /// Early release memory. - block.clear(); } + bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const { /// `result` will destroy the states of aggregate functions in the destructor @@ -2704,11 +2780,10 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool } if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows) - mergeWithoutKeyStreamsImpl(block, result); - + mergeBlockWithoutKeyStreamsImpl(std::move(block), result); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -2824,7 +2899,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari { #define M(NAME) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false); + mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false); if (false) {} // NOLINT APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -2875,11 +2950,11 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari break; if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows) - mergeWithoutKeyStreamsImpl(block, result); + mergeBlockWithoutKeyStreamsImpl(std::move(block), result); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -2942,11 +3017,11 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) bucket_num = -1; if (result.type == AggregatedDataVariants::Type::without_key || is_overflows) - mergeWithoutKeyStreamsImpl(block, result); + mergeBlockWithoutKeyStreamsImpl(std::move(block), result); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 1806465db4a..475fcd9e249 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -890,6 +890,11 @@ class NativeWriter; class Aggregator final { public: + using AggregateColumns = std::vector; + using AggregateColumnsData = std::vector; + using AggregateColumnsConstData = std::vector; + using AggregateFunctionsPlainPtrs = std::vector; + struct Params { /// Data structure of source blocks. @@ -1015,6 +1020,10 @@ public: return getHeader(src_header, intermediate_header, keys, aggregates, final); } + /// Remember the columns we will work with + ColumnRawPtrs makeRawKeyColumns(const Block & block) const; + AggregateColumnsConstData makeAggregateColumnsData(const Block & block) const; + /// Returns keys and aggregated for EXPLAIN query void explain(WriteBuffer & out, size_t indent) const; void explain(JSONBuilder::JSONMap & map) const; @@ -1022,11 +1031,6 @@ public: explicit Aggregator(const Params & params_); - using AggregateColumns = std::vector; - using AggregateColumnsData = std::vector; - using AggregateColumnsConstData = std::vector; - using AggregateFunctionsPlainPtrs = std::vector; - /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). bool executeOnBlock(const Block & block, AggregatedDataVariants & result, @@ -1181,6 +1185,14 @@ private: size_t row_end, ColumnRawPtrs & key_columns, AggregateFunctionInstruction * aggregate_instructions) const; + void mergeOnBlockSmall( + AggregatedDataVariants & result, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const; + + void mergeOnBlockImpl(Block block, AggregatedDataVariants & result, bool no_more_keys) const; void executeImpl( AggregatedDataVariants & result, @@ -1227,8 +1239,12 @@ private: AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, - AggregateFunctionInstruction * aggregate_instructions, - Arena * arena) const; + AggregateFunctionInstruction * aggregate_instructions) const; + void mergeOnIntervalWithoutKeyImpl( + AggregatedDataVariants & data_variants, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data) const; template void writeToTemporaryFileImpl( @@ -1338,24 +1354,43 @@ private: template void mergeStreamsImplCase( - Block & block, Arena * aggregates_pool, Method & method, Table & data, - AggregateDataPtr overflow_row) const; + AggregateDataPtr overflow_row, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const; template void mergeStreamsImpl( - Block & block, + Block block, Arena * aggregates_pool, Method & method, Table & data, AggregateDataPtr overflow_row, bool no_more_keys) const; + template + void mergeStreamsImpl( + Arena * aggregates_pool, + Method & method, + Table & data, + AggregateDataPtr overflow_row, + bool no_more_keys, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data, + const ColumnRawPtrs & key_columns) const; - void mergeWithoutKeyStreamsImpl( - Block & block, + void mergeBlockWithoutKeyStreamsImpl( + Block block, AggregatedDataVariants & result) const; + void mergeWithoutKeyStreamsImpl( + AggregatedDataVariants & result, + size_t row_begin, + size_t row_end, + const AggregateColumnsConstData & aggregate_columns_data) const; template void mergeBucketImpl( diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 6999aefc830..1bf188596d7 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1617,7 +1617,10 @@ static void executeMergeAggregatedImpl( Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads); - auto transform_params = std::make_shared(params, final); + auto transform_params = std::make_shared( + params, + final, + /* only_merge_= */ false); auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), @@ -2289,6 +2292,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac std::move(aggregator_params), std::move(grouping_sets_params), final, + /* only_merge_= */ false, settings.max_block_size, settings.aggregation_in_order_max_block_bytes, merge_threads, @@ -2362,7 +2366,10 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific keys.push_back(header_before_transform.getPositionByName(key.name)); auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0); - auto transform_params = std::make_shared(std::move(params), true); + auto transform_params = std::make_shared( + std::move(params), + /* final_= */ true, + /* only_merge_= */ false); QueryPlanStepPtr step; if (modificator == Modificator::ROLLUP) diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 17a0498fb7e..28f821d6f3f 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -69,6 +69,7 @@ AggregatingStep::AggregatingStep( Aggregator::Params params_, GroupingSetsParamsList grouping_sets_params_, bool final_, + bool only_merge_, size_t max_block_size_, size_t aggregation_in_order_max_block_bytes_, size_t merge_threads_, @@ -79,7 +80,8 @@ AggregatingStep::AggregatingStep( : ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), grouping_sets_params_), getTraits(), false) , params(std::move(params_)) , grouping_sets_params(std::move(grouping_sets_params_)) - , final(std::move(final_)) + , final(final_) + , only_merge(only_merge_) , max_block_size(max_block_size_) , aggregation_in_order_max_block_bytes(aggregation_in_order_max_block_bytes_) , merge_threads(merge_threads_) @@ -119,7 +121,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B * 1. Parallel aggregation is done, and the results should be merged in parallel. * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. */ - auto transform_params = std::make_shared(std::move(params), final); + auto transform_params = std::make_shared(std::move(params), final, only_merge); if (!grouping_sets_params.empty()) { @@ -169,7 +171,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params->params.intermediate_header, transform_params->params.stats_collecting_params }; - auto transform_params_for_set = std::make_shared(std::move(params_for_set), final); + auto transform_params_for_set = std::make_shared(std::move(params_for_set), final, only_merge); if (streams > 1) { diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index 4dd3d956350..1be74da583a 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -7,9 +7,6 @@ namespace DB { -struct AggregatingTransformParams; -using AggregatingTransformParamsPtr = std::shared_ptr; - struct GroupingSetsParams { GroupingSetsParams() = default; @@ -36,6 +33,7 @@ public: Aggregator::Params params_, GroupingSetsParamsList grouping_sets_params_, bool final_, + bool only_merge_, size_t max_block_size_, size_t aggregation_in_order_max_block_bytes_, size_t merge_threads_, @@ -59,6 +57,7 @@ private: Aggregator::Params params; GroupingSetsParamsList grouping_sets_params; bool final; + bool only_merge; size_t max_block_size; size_t aggregation_in_order_max_block_bytes; size_t merge_threads; diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 7491dda8164..f435d46a066 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -31,6 +31,7 @@ AggregatingInOrderTransform::AggregatingInOrderTransform( , max_block_size(max_block_size_) , max_block_bytes(max_block_bytes_) , params(std::move(params_)) + , aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates)) , group_by_info(group_by_info_) , sort_description(group_by_description_) , aggregate_columns(params->params.aggregates_size) @@ -66,6 +67,7 @@ static Int64 getCurrentMemoryUsage() void AggregatingInOrderTransform::consume(Chunk chunk) { + const Columns & columns = chunk.getColumns(); Int64 initial_memory_usage = getCurrentMemoryUsage(); size_t rows = chunk.getNumRows(); @@ -85,7 +87,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) Columns key_columns(params->params.keys_size); for (size_t i = 0; i < params->params.keys_size; ++i) { - materialized_columns.push_back(chunk.getColumns().at(params->params.keys[i])->convertToFullColumnIfConst()); + materialized_columns.push_back(columns.at(params->params.keys[i])->convertToFullColumnIfConst()); key_columns[i] = materialized_columns.back(); if (group_by_key) key_columns_raw[i] = materialized_columns.back().get(); @@ -93,7 +95,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) Aggregator::NestedColumnsHolder nested_columns_holder; Aggregator::AggregateFunctionInstructions aggregate_function_instructions; - params->aggregator.prepareAggregateInstructions(chunk.getColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions, nested_columns_holder); + params->aggregator.prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_function_instructions, nested_columns_holder); size_t key_end = 0; size_t key_begin = 0; @@ -120,6 +122,17 @@ void AggregatingInOrderTransform::consume(Chunk chunk) Int64 current_memory_usage = 0; + Aggregator::AggregateColumnsConstData aggregate_columns_data(params->params.aggregates_size); + if (params->only_merge) + { + for (size_t i = 0, j = 0; i < columns.size(); ++i) + { + if (!aggregates_mask[i]) + continue; + aggregate_columns_data[j++] = &typeid_cast(*columns[i]).getData(); + } + } + /// Will split block into segments with the same key while (key_end != rows) { @@ -136,10 +149,20 @@ void AggregatingInOrderTransform::consume(Chunk chunk) /// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block. if (key_begin != key_end) { - if (group_by_key) - params->aggregator.executeOnBlockSmall(variants, key_begin, key_end, key_columns_raw, aggregate_function_instructions.data()); + if (params->only_merge) + { + if (group_by_key) + params->aggregator.mergeOnBlockSmall(variants, key_begin, key_end, aggregate_columns_data, key_columns_raw); + else + params->aggregator.mergeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_columns_data); + } else - params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); + { + if (group_by_key) + params->aggregator.executeOnBlockSmall(variants, key_begin, key_end, key_columns_raw, aggregate_function_instructions.data()); + else + params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data()); + } } current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage; diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.h b/src/Processors/Transforms/AggregatingInOrderTransform.h index 9632b107463..ee9ab0f4b79 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.h +++ b/src/Processors/Transforms/AggregatingInOrderTransform.h @@ -56,6 +56,7 @@ private: MutableColumns res_aggregate_columns; AggregatingTransformParamsPtr params; + ColumnsMask aggregates_mask; InputOrderInfoPtr group_by_info; /// For sortBlock() diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index bfc3904e5d8..8d62664da59 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -34,21 +34,24 @@ struct AggregatingTransformParams AggregatorListPtr aggregator_list_ptr; Aggregator & aggregator; bool final; + /// Merge data for aggregate projections. bool only_merge = false; - AggregatingTransformParams(const Aggregator::Params & params_, bool final_) + AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool only_merge_) : params(params_) , aggregator_list_ptr(std::make_shared()) , aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params)) , final(final_) + , only_merge(only_merge_) { } - AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_) + AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_, bool only_merge_) : params(params_) , aggregator_list_ptr(aggregator_list_ptr_) , aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params)) , final(final_) + , only_merge(only_merge_) { } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index dc94266bc95..9498c4a52b2 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5593,8 +5593,9 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg // If optimize_aggregation_in_order = true, we need additional information to transform the projection's pipeline. auto attach_aggregation_in_order_info = [&]() { - for (const auto & key : keys) + for (const auto & desc : select.getQueryAnalyzer()->aggregationKeys()) { + const String & key = desc.name; auto actions_dag = analysis_result.before_aggregation->clone(); actions_dag->foldActionsByProjection({key}, sample_block_for_keys); candidate.group_by_elements_actions.emplace_back(std::make_shared(actions_dag, actions_settings)); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f23a6e7834e..b39f91fd51c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -115,6 +116,21 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows)); } +static SortDescription getSortDescriptionFromGroupBy(const ASTSelectQuery & query) +{ + SortDescription order_descr; + order_descr.reserve(query.groupBy()->children.size()); + + for (const auto & elem : query.groupBy()->children) + { + /// Note, here aliases should not be used, since there will be no such column in a block. + String name = elem->getColumnNameWithoutAlias(); + order_descr.emplace_back(name, 1, 1); + } + + return order_descr; +} + QueryPlanPtr MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, @@ -168,9 +184,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( query_info.projection->desc->type, query_info.projection->desc->name); - Pipes pipes; - Pipe projection_pipe; - Pipe ordinary_pipe; QueryPlanResourceHolder resources; auto projection_plan = std::make_unique(); @@ -217,12 +230,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( expression_before_aggregation->setStepDescription("Before GROUP BY"); projection_plan->addStep(std::move(expression_before_aggregation)); } - - auto builder = projection_plan->buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - projection_pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources); } + auto ordinary_query_plan = std::make_unique(); if (query_info.projection->merge_tree_normal_select_result_ptr) { auto storage_from_base_parts_of_projection @@ -234,49 +244,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( nullptr, SelectQueryOptions{processed_stage}.projectionQuery()); - QueryPlan ordinary_query_plan; - interpreter.buildQueryPlan(ordinary_query_plan); + interpreter.buildQueryPlan(*ordinary_query_plan); const auto & expressions = interpreter.getAnalysisResult(); if (processed_stage == QueryProcessingStage::Enum::FetchColumns && expressions.before_where) { auto where_step = std::make_unique( - ordinary_query_plan.getCurrentDataStream(), + ordinary_query_plan->getCurrentDataStream(), expressions.before_where, expressions.where_column_name, expressions.remove_where_filter); where_step->setStepDescription("WHERE"); - ordinary_query_plan.addStep(std::move(where_step)); + ordinary_query_plan->addStep(std::move(where_step)); } - - auto builder = ordinary_query_plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); - ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources); } + Pipe projection_pipe; + Pipe ordinary_pipe; if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) { - /// Here we create shared ManyAggregatedData for both projection and ordinary data. - /// For ordinary data, AggregatedData is filled in a usual way. - /// For projection data, AggregatedData is filled by merging aggregation states. - /// When all AggregatedData is filled, we merge aggregation states together in a usual way. - /// Pipeline will look like: - /// ReadFromProjection -> Aggregating (only merge states) -> - /// ReadFromProjection -> Aggregating (only merge states) -> - /// ... -> Resize -> ConvertingAggregatedToChunks - /// ReadFromOrdinaryPart -> Aggregating (usual) -> (added by last Aggregating) - /// ReadFromOrdinaryPart -> Aggregating (usual) -> - /// ... - auto many_data = std::make_shared(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts()); - size_t counter = 0; - - AggregatorListPtr aggregator_list_ptr = std::make_shared(); - - // TODO apply in_order_optimization here - auto build_aggregate_pipe = [&](Pipe & pipe, bool projection) + auto make_aggregator_params = [&](const Block & header_before_aggregation, bool projection) { - const auto & header_before_aggregation = pipe.getHeader(); - ColumnNumbers keys; for (const auto & key : query_info.projection->aggregation_keys) keys.push_back(header_before_aggregation.getPositionByName(key.name)); @@ -290,29 +278,28 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( descr.arguments.push_back(header_before_aggregation.getPositionByName(name)); } - AggregatingTransformParamsPtr transform_params; + Aggregator::Params params( + header_before_aggregation, + keys, + aggregates, + query_info.projection->aggregate_overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + settings.compile_aggregate_expressions, + settings.min_count_to_compile_aggregate_expression); + + bool only_merge = false; if (projection) { - Aggregator::Params params( - header_before_aggregation, - keys, - aggregates, - query_info.projection->aggregate_overflow_row, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - settings.compile_expressions, - settings.min_count_to_compile_aggregate_expression, - header_before_aggregation); // The source header is also an intermediate header - - transform_params = std::make_shared( - std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final); + /// The source header is also an intermediate header + params.intermediate_header = header_before_aggregation; /// This part is hacky. /// We want AggregatingTransform to work with aggregate states instead of normal columns. @@ -321,51 +308,135 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( /// It is needed because data in projection: /// * is not merged completely (we may have states with the same key in different parts) /// * is not split into buckets (so if we just use MergingAggregated, it will use single thread) - transform_params->only_merge = true; - } - else - { - Aggregator::Params params( - header_before_aggregation, - keys, - aggregates, - query_info.projection->aggregate_overflow_row, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression); - - transform_params = std::make_shared( - std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final); + only_merge = true; } - pipe.resize(pipe.numOutputPorts(), true, true); - - auto merge_threads = num_streams; - auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads); - - pipe.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); - }); + return std::make_pair(params, only_merge); }; - if (!projection_pipe.empty()) - build_aggregate_pipe(projection_pipe, true); - if (!ordinary_pipe.empty()) - build_aggregate_pipe(ordinary_pipe, false); + if (ordinary_query_plan->isInitialized() && projection_plan->isInitialized()) + { + auto projection_builder = projection_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); + + auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); + + /// Here we create shared ManyAggregatedData for both projection and ordinary data. + /// For ordinary data, AggregatedData is filled in a usual way. + /// For projection data, AggregatedData is filled by merging aggregation states. + /// When all AggregatedData is filled, we merge aggregation states together in a usual way. + /// Pipeline will look like: + /// ReadFromProjection -> Aggregating (only merge states) -> + /// ReadFromProjection -> Aggregating (only merge states) -> + /// ... -> Resize -> ConvertingAggregatedToChunks + /// ReadFromOrdinaryPart -> Aggregating (usual) -> (added by last Aggregating) + /// ReadFromOrdinaryPart -> Aggregating (usual) -> + /// ... + auto many_data = std::make_shared(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts()); + size_t counter = 0; + + AggregatorListPtr aggregator_list_ptr = std::make_shared(); + + /// TODO apply optimize_aggregation_in_order here too (like below) + auto build_aggregate_pipe = [&](Pipe & pipe, bool projection) + { + auto [params, only_merge] = make_aggregator_params(pipe.getHeader(), projection); + + AggregatingTransformParamsPtr transform_params = std::make_shared( + std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final, only_merge); + + pipe.resize(pipe.numOutputPorts(), true, true); + + auto merge_threads = num_streams; + auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); + }); + }; + + if (!projection_pipe.empty()) + build_aggregate_pipe(projection_pipe, true); + if (!ordinary_pipe.empty()) + build_aggregate_pipe(ordinary_pipe, false); + } + else + { + auto add_aggregating_step = [&](QueryPlanPtr & query_plan, bool projection) + { + auto [params, only_merge] = make_aggregator_params(query_plan->getCurrentDataStream().header, projection); + + auto merge_threads = num_streams; + auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads); + + InputOrderInfoPtr group_by_info = query_info.projection->input_order_info; + SortDescription group_by_sort_description; + if (group_by_info && settings.optimize_aggregation_in_order) + group_by_sort_description = getSortDescriptionFromGroupBy(query_info.query->as()); + else + group_by_info = nullptr; + + auto aggregating_step = std::make_unique( + query_plan->getCurrentDataStream(), + std::move(params), + /* grouping_sets_params_= */ GroupingSetsParamsList{}, + query_info.projection->aggregate_final, + only_merge, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + merge_threads, + temporary_data_merge_threads, + /* storage_has_evenly_distributed_read_= */ false, + std::move(group_by_info), + std::move(group_by_sort_description)); + query_plan->addStep(std::move(aggregating_step)); + }; + + if (projection_plan->isInitialized()) + { + add_aggregating_step(projection_plan, true); + + auto projection_builder = projection_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); + } + if (ordinary_query_plan->isInitialized()) + { + add_aggregating_step(ordinary_query_plan, false); + + auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); + } + } + } + else + { + if (projection_plan->isInitialized()) + { + auto projection_builder = projection_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources); + } + + if (ordinary_query_plan->isInitialized()) + { + auto ordinary_builder = ordinary_query_plan->buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources); + } } + Pipes pipes; pipes.emplace_back(std::move(projection_pipe)); pipes.emplace_back(std::move(ordinary_pipe)); auto pipe = Pipe::unitePipes(std::move(pipes)); diff --git a/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql b/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql index add38dbd3f8..06f05e36237 100644 --- a/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql +++ b/tests/queries/0_stateless/01710_projection_aggregation_in_order.sql @@ -1,3 +1,6 @@ +-- Test that check the correctness of the result for optimize_aggregation_in_order and projections, +-- not that this optimization will take place. + DROP TABLE IF EXISTS normal; CREATE TABLE normal diff --git a/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.reference b/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.reference new file mode 100644 index 00000000000..30b0f53ced8 --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.reference @@ -0,0 +1,28 @@ +SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0 +1 0 0 1249950000 +1 0 2 1250000000 +1 1 1 1249975000 +1 1 3 1250025000 +Used processors: +AggregatingTransform +SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1 +1 0 0 1249950000 +1 0 2 1250000000 +1 1 1 1249975000 +1 1 3 1250025000 +Used processors: +AggregatingInOrderTransform +SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0 +1 0 1249950000 +1 1 1249975000 +1 2 1250000000 +1 3 1250025000 +Used processors: +AggregatingTransform +SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1 +1 0 1249950000 +1 1 1249975000 +1 2 1250000000 +1 3 1250025000 +Used processors: +AggregatingInOrderTransform diff --git a/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.sh b/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.sh new file mode 100755 index 00000000000..0cafa904a71 --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_optimize_aggregation_in_order.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS in_order_agg_01710; + + CREATE TABLE in_order_agg_01710 + ( + k1 UInt32, + k2 UInt32, + k3 UInt32, + value UInt32, + PROJECTION aaaa + ( + SELECT + k1, + k2, + k3, + sum(value) + GROUP BY k1, k2, k3 + ) + ) + ENGINE = MergeTree + ORDER BY tuple(); + + INSERT INTO in_order_agg_01710 SELECT 1, number%2, number%4, number FROM numbers(100000); +" + +function random_str() +{ + local n=$1 && shift + tr -cd '[:lower:]' < /dev/urandom | head -c"$n" +} + +function run_query() +{ + local query=$1 && shift + + local query_id + query_id="$CLICKHOUSE_TEST_UNIQUE_NAME-$(random_str 6)" + + echo "$query" + local opts=( + --allow_experimental_projection_optimization 1 + --force_optimize_projection 1 + --log_processors_profiles 1 + --query_id "$query_id" + ) + $CLICKHOUSE_CLIENT "${opts[@]}" "$@" -q "$query" + + $CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS" + + echo "Used processors:" + $CLICKHOUSE_CLIENT --param_query_id "$query_id" -q "SELECT DISTINCT name FROM system.processors_profile_log WHERE query_id = {query_id:String} AND name LIKE 'Aggregating%'" +} + +run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0" +run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1" +run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0" +run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1" diff --git a/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.reference b/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.reference new file mode 100644 index 00000000000..6e0a46509bd --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.reference @@ -0,0 +1,28 @@ +SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0 +1 0 0 1249950000 +1 0 2 1250000000 +1 1 1 1249975000 +1 1 3 1250025000 +Used processors: +AggregatingTransform +SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1 +1 0 0 1249950000 +1 0 2 1250000000 +1 1 1 1249975000 +1 1 3 1250025000 +Used processors: +AggregatingTransform +SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0 +1 0 1249950000 +1 1 1249975000 +1 2 1250000000 +1 3 1250025000 +Used processors: +AggregatingTransform +SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1 +1 0 1249950000 +1 1 1249975000 +1 2 1250000000 +1 3 1250025000 +Used processors: +AggregatingTransform diff --git a/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.sh b/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.sh new file mode 100755 index 00000000000..f66dc9ff872 --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_partial_optimize_aggregation_in_order.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +# Test for optimize_aggregation_in_order with partial projections, i.e.: +# - first part will not have projection +# - second part will have projection +# And so two different aggregation should be done. + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS in_order_agg_partial_01710; + + CREATE TABLE in_order_agg_partial_01710 + ( + k1 UInt32, + k2 UInt32, + k3 UInt32, + value UInt32 + ) + ENGINE = MergeTree + ORDER BY tuple(); + + INSERT INTO in_order_agg_partial_01710 SELECT 1, number%2, number%4, number FROM numbers(50000); + SYSTEM STOP MERGES in_order_agg_partial_01710; + ALTER TABLE in_order_agg_partial_01710 ADD PROJECTION aaaa ( + SELECT + k1, + k2, + k3, + sum(value) + GROUP BY k1, k2, k3 + ); + INSERT INTO in_order_agg_partial_01710 SELECT 1, number%2, number%4, number FROM numbers(100000) LIMIT 50000, 100000; +" + +function random_str() +{ + local n=$1 && shift + tr -cd '[:lower:]' < /dev/urandom | head -c"$n" +} + +function run_query() +{ + local query=$1 && shift + + local query_id + query_id="$CLICKHOUSE_TEST_UNIQUE_NAME-$(random_str 6)" + + echo "$query" + local opts=( + --allow_experimental_projection_optimization 1 + --force_optimize_projection 1 + --log_processors_profiles 1 + --query_id "$query_id" + ) + $CLICKHOUSE_CLIENT "${opts[@]}" "$@" -q "$query" + + $CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS" + + echo "Used processors:" + $CLICKHOUSE_CLIENT --param_query_id "$query_id" -q "SELECT DISTINCT name FROM system.processors_profile_log WHERE query_id = {query_id:String} AND name LIKE 'Aggregating%'" +} + +run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0" +run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1" +run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0" +run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1" diff --git a/tests/queries/1_stateful/00172_early_constant_folding.reference b/tests/queries/1_stateful/00172_early_constant_folding.reference index 27cd6b545e0..da564dc694e 100644 --- a/tests/queries/1_stateful/00172_early_constant_folding.reference +++ b/tests/queries/1_stateful/00172_early_constant_folding.reference @@ -2,6 +2,5 @@ ExpressionTransform (ReadFromStorage) AggregatingTransform - StrictResize - ExpressionTransform - SourceFromSingleChunk 0 → 1 + ExpressionTransform + SourceFromSingleChunk 0 → 1