From 07b1be5a7687bac84cea111fcc50b0438ecb71e1 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sun, 16 May 2021 22:04:13 +0800 Subject: [PATCH] Fix distributed processing when using projection --- src/Interpreters/Aggregator.h | 4 +- .../Transforms/AggregatingTransform.cpp | 12 ++-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 55 +++++++++++++------ ...projections_in_distributed_query.reference | 4 ++ ...01710_projections_in_distributed_query.sql | 20 +++++++ 5 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 tests/queries/0_stateless/01710_projections_in_distributed_query.reference create mode 100644 tests/queries/0_stateless/01710_projections_in_distributed_query.sql diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index b9be4b76c8b..da55831f952 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -915,8 +915,10 @@ public: size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, VolumePtr tmp_volume_, size_t max_threads_, - size_t min_free_disk_space_) + size_t min_free_disk_space_, + const Block & intermediate_header_ = {}) : src_header(src_header_), + intermediate_header(intermediate_header_), keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 90e0e7cb066..7802bf6e3bf 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -339,7 +339,7 @@ private: { params->aggregator.mergeWithoutKeyDataImpl(*data); auto block = params->aggregator.prepareBlockAndFillWithoutKey( - *first, params->final, first->type != AggregatedDataVariants::Type::without_key); + *first, params->final, first->type != AggregatedDataVariants::Type::without_key); setCurrentChunk(convertToChunk(block)); } @@ -381,8 +381,7 @@ private: { /// Select Arena to avoid race conditions Arena * arena = first->aggregates_pools.at(thread).get(); - auto source = std::make_shared( - params, data, shared_data, arena); + auto source = std::make_shared(params, data, shared_data, arena); processors.emplace_back(std::move(source)); } @@ -614,7 +613,12 @@ void AggregatingTransform::initGenerate() pipe = Pipe::unitePipes(std::move(pipes)); } - LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed)); + LOG_DEBUG( + log, + "Will merge {} temporary files of size {} compressed, {} uncompressed.", + files.files.size(), + ReadableSize(files.sum_size_compressed), + ReadableSize(files.sum_size_uncompressed)); addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index e22b9461b26..0f5e69448d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -284,25 +284,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( descr.arguments.push_back(header_before_aggregation.getPositionByName(name)); } - Aggregator::Params params( - header_before_aggregation, - keys, - aggregates, - query_info.projection->aggregate_overflow_row, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data); - - auto transform_params = std::make_shared(std::move(params), query_info.projection->aggregate_final); - + AggregatingTransformParamsPtr transform_params; if (projection) { + Aggregator::Params params( + header_before_aggregation, + keys, + aggregates, + query_info.projection->aggregate_overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + header_before_aggregation); // The source header is also an intermediate header + + transform_params = std::make_shared(std::move(params), query_info.projection->aggregate_final); + /// This part is hacky. /// We want AggregatingTransform to work with aggregate states instead of normal columns. /// It is almost the same, just instead of adding new data to aggregation state we merge it with existing. @@ -312,6 +314,25 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( /// * is not split into buckets (so if we just use MergingAggregated, it will use single thread) transform_params->only_merge = true; } + else + { + Aggregator::Params params( + header_before_aggregation, + keys, + aggregates, + query_info.projection->aggregate_overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data); + + transform_params = std::make_shared(std::move(params), query_info.projection->aggregate_final); + } pipe.resize(pipe.numOutputPorts(), true, true); diff --git a/tests/queries/0_stateless/01710_projections_in_distributed_query.reference b/tests/queries/0_stateless/01710_projections_in_distributed_query.reference new file mode 100644 index 00000000000..27aaf6330f4 --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_in_distributed_query.reference @@ -0,0 +1,4 @@ +2020-10-24 00:00:00 -93943555724557365 +-93943555724557365 +2020-10-24 00:00:00 -187887111449114730 +-187887111449114730 diff --git a/tests/queries/0_stateless/01710_projections_in_distributed_query.sql b/tests/queries/0_stateless/01710_projections_in_distributed_query.sql new file mode 100644 index 00000000000..475e8372fc6 --- /dev/null +++ b/tests/queries/0_stateless/01710_projections_in_distributed_query.sql @@ -0,0 +1,20 @@ +drop table if exists projection_test; + +create table projection_test (dt DateTime, cost Int64, projection p (select toStartOfMinute(dt) dt_m, sum(cost) group by dt_m)) engine MergeTree partition by toDate(dt) order by dt; + +insert into projection_test with rowNumberInAllBlocks() as id select toDateTime('2020-10-24 00:00:00') + (id / 20), * from generateRandom('cost Int64', 10, 10, 1) limit 1000 settings max_threads = 1; + +set allow_experimental_projection_optimization = 1, force_optimize_projection = 1; + +select toStartOfMinute(dt) dt_m, sum(cost) from projection_test group by dt_m; +select sum(cost) from projection_test; + +drop table if exists projection_test_d; + +create table projection_test_d (dt DateTime, cost Int64) engine Distributed(test_cluster_two_shards, currentDatabase(), projection_test); + +select toStartOfMinute(dt) dt_m, sum(cost) from projection_test_d group by dt_m; +select sum(cost) from projection_test_d; + +drop table projection_test; +drop table projection_test_d;