Merge pull request #10063 from ClickHouse/fix-GroupingAggregatedTransform-single-level

Fix GroupingAggregatedTransform for single-level aggregation. Add test.
This commit is contained in:
alesapin 2020-04-06 22:13:15 +03:00 committed by GitHub
commit 107c5b7860
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 2 deletions

View File

@ -69,6 +69,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/; \ ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/; \ ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/; \ ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/clusters.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/server.key /etc/clickhouse-server/; \ ln -s /usr/share/clickhouse-test/config/server.key /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/server.crt /etc/clickhouse-server/; \ ln -s /usr/share/clickhouse-test/config/server.crt /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/dhparam.pem /etc/clickhouse-server/; \ ln -s /usr/share/clickhouse-test/config/dhparam.pem /etc/clickhouse-server/; \

View File

@ -275,15 +275,20 @@ void GroupingAggregatedTransform::work()
{ {
if (!single_level_chunks.empty()) if (!single_level_chunks.empty())
{ {
auto & header = getOutputs().front().getHeader(); auto & header = getInputs().front().getHeader(); /// Take header from input port. Output header is empty.
auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns()); auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns());
single_level_chunks.pop_back(); single_level_chunks.pop_back();
auto blocks = params->aggregator.convertBlockToTwoLevel(block); auto blocks = params->aggregator.convertBlockToTwoLevel(block);
for (auto & cur_block : blocks) for (auto & cur_block : blocks)
{ {
if (!cur_block)
continue;
Int32 bucket = cur_block.info.bucket_num; Int32 bucket = cur_block.info.bucket_num;
chunks_map[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows())); auto chunk_info = std::make_shared<AggregatedChunkInfo>();
chunk_info->bucket_num = bucket;
chunks_map[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows(), std::move(chunk_info)));
} }
} }
} }

20
tests/config/clusters.xml Normal file
View File

@ -0,0 +1,20 @@
<yandex>
<remote_servers>
<test_cluster_two_shards_different_databases>
<shard>
<replica>
<default_database>shard_0</default_database>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<default_database>shard_1</default_database>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_different_databases>
</remote_servers>
</yandex>

View File

@ -0,0 +1,10 @@
0 2
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1

View File

@ -0,0 +1,25 @@
set send_logs_level = 'error';
create database if not exists shard_0;
create database if not exists shard_1;
drop table if exists shard_0.shard_01231_distributed_aggregation_memory_efficient;
drop table if exists shard_1.shard_01231_distributed_aggregation_memory_efficient;
drop table if exists ma_dist;
create table shard_0.shard_01231_distributed_aggregation_memory_efficient (x UInt64) engine = MergeTree order by x;
create table shard_1.shard_01231_distributed_aggregation_memory_efficient (x UInt64) engine = MergeTree order by x;
insert into shard_0.shard_01231_distributed_aggregation_memory_efficient select * from numbers(1);
insert into shard_1.shard_01231_distributed_aggregation_memory_efficient select * from numbers(10);
create table ma_dist (x UInt64) ENGINE = Distributed(test_cluster_two_shards_different_databases, '', 'shard_01231_distributed_aggregation_memory_efficient');
set distributed_aggregation_memory_efficient = 1;
set group_by_two_level_threshold = 2;
set max_bytes_before_external_group_by = 16;
select x, count() from ma_dist group by x order by x;
drop table if exists shard_0.shard_01231_distributed_aggregation_memory_efficient;
drop table if exists shard_1.shard_01231_distributed_aggregation_memory_efficient;