Merge pull request #57790 from CurtizJ/fix-external-aggregation-with-projections

Fix result of external aggregation in case of partially materialized projection
This commit is contained in:
Anton Popov 2023-12-13 22:56:02 +01:00 committed by GitHub
commit 82ebb5e2d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 8 deletions

View File

@ -741,8 +741,11 @@ void AggregatingTransform::initGenerate()
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
Pipes pipes;
for (auto & variant : prepared_data)
{
/// Converts hash tables to blocks with data (finalized or not).
pipes.emplace_back(std::make_shared<ConvertingAggregatedToChunksSource>(params, variant));
}
Pipe pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty())
{
@ -796,21 +799,23 @@ void AggregatingTransform::initGenerate()
}
}
const auto & tmp_data = params->aggregator.getTemporaryData();
size_t num_streams = 0;
size_t compressed_size = 0;
size_t uncompressed_size = 0;
Pipe pipe;
Pipes pipes;
/// Merge external data from all aggregators used in query.
for (const auto & aggregator : *params->aggregator_list_ptr)
{
Pipes pipes;
const auto & tmp_data = aggregator.getTemporaryData();
for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
pipe = Pipe::unitePipes(std::move(pipes));
num_streams += tmp_data.getStreams().size();
compressed_size += tmp_data.getStat().compressed_size;
uncompressed_size += tmp_data.getStat().uncompressed_size;
}
size_t num_streams = tmp_data.getStreams().size();
size_t compressed_size = tmp_data.getStat().compressed_size;
size_t uncompressed_size = tmp_data.getStat().uncompressed_size;
LOG_DEBUG(
log,
"Will merge {} temporary files of size {} compressed, {} uncompressed.",
@ -818,6 +823,7 @@ void AggregatingTransform::initGenerate()
ReadableSize(compressed_size),
ReadableSize(uncompressed_size));
auto pipe = Pipe::unitePipes(std::move(pipes));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);
processors = Pipe::detachProcessors(std::move(pipe));

View File

@ -0,0 +1,41 @@
*** correct aggregation ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** correct aggregation with projection ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** after materialization ***
*** correct aggregation ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** correct aggregation with projection ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
*** optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000

View File

@ -0,0 +1,66 @@
DROP TABLE IF EXISTS t_proj_external;
CREATE TABLE t_proj_external
(
k1 UInt32,
k2 UInt32,
k3 UInt32,
value UInt32
)
ENGINE = MergeTree
ORDER BY tuple();
INSERT INTO t_proj_external SELECT 1, number%2, number%4, number FROM numbers(50000);
SYSTEM STOP MERGES t_proj_external;
ALTER TABLE t_proj_external ADD PROJECTION aaaa (
SELECT
k1,
k2,
k3,
sum(value)
GROUP BY k1, k2, k3
);
INSERT INTO t_proj_external SELECT 1, number%2, number%4, number FROM numbers(100000) LIMIT 50000, 100000;
SELECT '*** correct aggregation ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_use_projections = 0;
SELECT '*** correct aggregation with projection ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3;
SELECT '*** optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1;
SELECT '*** optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1;
SYSTEM START MERGES t_proj_external;
ALTER TABLE t_proj_external MATERIALIZE PROJECTION aaaa SETTINGS mutations_sync = 2;
SELECT '*** after materialization ***';
SELECT '*** correct aggregation ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_use_projections = 0;
SELECT '*** correct aggregation with projection ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3;
SELECT '*** optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order = 0, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1;
SELECT '*** optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1 ***';
SELECT k1, k2, k3, sum(value) v FROM t_proj_external GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order = 1, max_bytes_before_external_group_by = 1, group_by_two_level_threshold = 1;
DROP TABLE IF EXISTS t_proj_external;