Fix distributed processing when using projection

This commit is contained in:
Amos Bird 2021-05-16 22:04:13 +08:00
parent 1a8ce039b4
commit 07b1be5a76
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
5 changed files with 73 additions and 22 deletions

View File

@ -915,8 +915,10 @@ public:
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
size_t min_free_disk_space_,
const Block & intermediate_header_ = {})
: src_header(src_header_),
intermediate_header(intermediate_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),

View File

@ -339,7 +339,7 @@ private:
{
params->aggregator.mergeWithoutKeyDataImpl(*data);
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
setCurrentChunk(convertToChunk(block));
}
@ -381,8 +381,7 @@ private:
{
/// Select Arena to avoid race conditions
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
params, data, shared_data, arena);
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(params, data, shared_data, arena);
processors.emplace_back(std::move(source));
}
@ -614,7 +613,12 @@ void AggregatingTransform::initGenerate()
pipe = Pipe::unitePipes(std::move(pipes));
}
LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
LOG_DEBUG(
log,
"Will merge {} temporary files of size {} compressed, {} uncompressed.",
files.files.size(),
ReadableSize(files.sum_size_compressed),
ReadableSize(files.sum_size_uncompressed));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -284,25 +284,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
}
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
AggregatingTransformParamsPtr transform_params;
if (projection)
{
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
header_before_aggregation); // The source header is also an intermediate header
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
/// This part is hacky.
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
/// It is almost the same, just instead of adding new data to aggregation state we merge it with existing.
@ -312,6 +314,25 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
/// * is not split into buckets (so if we just use MergingAggregated, it will use single thread)
transform_params->only_merge = true;
}
else
{
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
}
pipe.resize(pipe.numOutputPorts(), true, true);

View File

@ -0,0 +1,4 @@
2020-10-24 00:00:00 -93943555724557365
-93943555724557365
2020-10-24 00:00:00 -187887111449114730
-187887111449114730

View File

@ -0,0 +1,20 @@
drop table if exists projection_test;
create table projection_test (dt DateTime, cost Int64, projection p (select toStartOfMinute(dt) dt_m, sum(cost) group by dt_m)) engine MergeTree partition by toDate(dt) order by dt;
insert into projection_test with rowNumberInAllBlocks() as id select toDateTime('2020-10-24 00:00:00') + (id / 20), * from generateRandom('cost Int64', 10, 10, 1) limit 1000 settings max_threads = 1;
set allow_experimental_projection_optimization = 1, force_optimize_projection = 1;
select toStartOfMinute(dt) dt_m, sum(cost) from projection_test group by dt_m;
select sum(cost) from projection_test;
drop table if exists projection_test_d;
create table projection_test_d (dt DateTime, cost Int64) engine Distributed(test_cluster_two_shards, currentDatabase(), projection_test);
select toStartOfMinute(dt) dt_m, sum(cost) from projection_test_d group by dt_m;
select sum(cost) from projection_test_d;
drop table projection_test;
drop table projection_test_d;