diff --git a/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h index 70ff57e7928..14fe81a8dcf 100644 --- a/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AggregatingSortedBlockInputStream.h @@ -50,7 +50,7 @@ protected: Block readImpl() override; private: - Logger * log = &Logger::get("SummingSortedBlockInputStream"); + Logger * log = &Logger::get("AggregatingSortedBlockInputStream"); /// Прочитали до конца. bool finished = false; @@ -68,7 +68,7 @@ private: * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ template - void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); /// Вставить в результат первую строку для текущей группы. void insertCurrentRow(ColumnPlainPtrs & merged_columns); diff --git a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h index 7ccec1bfdf5..96799afa5a4 100644 --- a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h @@ -81,7 +81,7 @@ private: * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ template - void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); /// Вставить в результат строки для текущего идентификатора "визита". void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false); diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 4a70524ae2a..66fc321e5d2 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -136,7 +136,7 @@ private: void initQueue(std::priority_queue & queue); template - void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); Logger * log = &Logger::get("MergingSortedBlockInputStream"); diff --git a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h index 7c7862cbca5..1ded9c19d2b 100644 --- a/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/SummingSortedBlockInputStream.h @@ -99,7 +99,7 @@ private: * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ template - void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); + void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); /// Вставить в результат просуммированную строку для текущей группы. void insertCurrentRow(ColumnPlainPtrs & merged_columns); diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 949c41116ea..5f25260cc42 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -61,16 +61,16 @@ Block AggregatingSortedBlockInputStream::readImpl() columns_to_aggregate[i] = typeid_cast(merged_columns[column_numbers_to_aggregate[i]]); if (has_collation) - merge(merged_block, merged_columns, queue_with_collation); + merge(merged_columns, queue_with_collation); else - merge(merged_block, merged_columns, queue); + merge(merged_columns, queue); return merged_block; } template -void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; @@ -81,13 +81,15 @@ void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainP setPrimaryKey(next_key, current); + bool key_differs = next_key != current_key; + /// если накопилось достаточно строк и последняя посчитана полностью - if (next_key != current_key && merged_rows >= max_block_size) + if (key_differs && merged_rows >= max_block_size) return; queue.pop(); - if (next_key != current_key) + if (key_differs) { current_key = std::move(next_key); next_key.resize(description.size()); diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 4225ad5c4f8..c90bea9636e 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -104,15 +104,15 @@ Block CollapsingSortedBlockInputStream::readImpl() } if (has_collation) - merge(merged_block, merged_columns, queue_with_collation); + merge(merged_columns, queue_with_collation); else - merge(merged_block, merged_columns, queue); + merge(merged_columns, queue); return merged_block; } template -void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; @@ -120,12 +120,22 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt while (!queue.empty()) { TSortCursor current = queue.top(); - queue.pop(); Int8 sign = get((*current->all_columns[sign_column_number])[current->pos]); setPrimaryKey(next_key, current); - if (next_key != current_key) + bool key_differs = next_key != current_key; + + /// если накопилось достаточно строк и последняя посчитана полностью + if (key_differs && merged_rows >= max_block_size) + { + ++blocks_written; + return; + } + + queue.pop(); + + if (key_differs) { /// Запишем данные для предыдущего визита. insertRows(merged_columns, merged_rows); @@ -168,12 +178,6 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt /// Достаём из соответствующего источника следующий блок, если есть. fetchNextBlock(current, queue); } - - if (merged_rows >= max_block_size) - { - ++blocks_written; - return; - } } /// Запишем данные для последнего визита. diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index ddbc2d0195b..7051057b19c 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -111,15 +111,15 @@ Block MergingSortedBlockInputStream::readImpl() return Block(); if (has_collation) - merge(merged_block, merged_columns, queue_with_collation); + merge(merged_columns, queue_with_collation); else - merge(merged_block, merged_columns, queue); + merge(merged_columns, queue); return merged_block; } template -void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +void MergingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 5e1244bb4fe..0a41e5ffd15 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -142,16 +142,16 @@ Block SummingSortedBlockInputStream::readImpl() } if (has_collation) - merge(merged_block, merged_columns, queue_with_collation); + merge(merged_columns, queue_with_collation); else - merge(merged_block, merged_columns, queue); + merge(merged_columns, queue); return merged_block; } template -void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue) +void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; @@ -159,11 +159,18 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs while (!queue.empty()) { TSortCursor current = queue.top(); - queue.pop(); setPrimaryKey(next_key, current); - if (next_key != current_key) + bool key_differs = next_key != current_key; + + /// если накопилось достаточно строк и последняя посчитана полностью + if (key_differs && merged_rows >= max_block_size) + return; + + queue.pop(); + + if (key_differs) { /// Запишем данные для предыдущей группы. if (!current_key[0].isNull() && !current_row_is_zero) @@ -194,9 +201,6 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs /// Достаём из соответствующего источника следующий блок, если есть. fetchNextBlock(current, queue); } - - if (merged_rows >= max_block_size) - return; } /// Запишем данные для последней группы, если она ненулевая. diff --git a/dbms/tests/queries/0_stateless/00155_merges.reference b/dbms/tests/queries/0_stateless/00155_merges.reference new file mode 100644 index 00000000000..f07cdf6765b --- /dev/null +++ b/dbms/tests/queries/0_stateless/00155_merges.reference @@ -0,0 +1,120 @@ +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + +1 1 +1 1 + +1 1 +1 1 + +1 1 +1 1 + + diff --git a/dbms/tests/queries/0_stateless/00155_merges.sh b/dbms/tests/queries/0_stateless/00155_merges.sh new file mode 100755 index 00000000000..6d8a5a92196 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00155_merges.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +function create { + clickhouse-client --query="DROP TABLE IF EXISTS test.summing" + clickhouse-client --query="DROP TABLE IF EXISTS test.collapsing" + clickhouse-client --query="DROP TABLE IF EXISTS test.aggregating" + + clickhouse-client --query="CREATE TABLE test.summing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = SummingMergeTree(d, x, 8192)" + clickhouse-client --query="CREATE TABLE test.collapsing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = CollapsingMergeTree(d, x, 8192, s)" + clickhouse-client --query="CREATE TABLE test.aggregating (d Date DEFAULT today(), x UInt64, s AggregateFunction(sum, UInt64)) ENGINE = AggregatingMergeTree(d, x, 8192)" +} + + +function cleanup { + clickhouse-client --query="DROP TABLE test.summing" + clickhouse-client --query="DROP TABLE test.collapsing" + clickhouse-client --query="DROP TABLE test.aggregating" +} + + +function test { + create + + SUM=$(( $1 + $2 )) + MAX=$(( $1 > $2 ? $1 : $2 )) + + clickhouse-client --query="INSERT INTO test.summing (x) SELECT number AS x FROM system.numbers LIMIT $1" + clickhouse-client --query="INSERT INTO test.summing (x) SELECT number AS x FROM system.numbers LIMIT $2" + + clickhouse-client --query="INSERT INTO test.collapsing (x) SELECT number AS x FROM system.numbers LIMIT $1" + clickhouse-client --query="INSERT INTO test.collapsing (x) SELECT number AS x FROM system.numbers LIMIT $2" + + clickhouse-client --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $1) GROUP BY number" + clickhouse-client --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $2) GROUP BY number" + + clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.summing" + clickhouse-client --query="OPTIMIZE TABLE test.summing" + clickhouse-client --query="SELECT count() = $MAX, sum(s) = $SUM FROM test.summing" + echo + clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.collapsing" + clickhouse-client --query="OPTIMIZE TABLE test.collapsing" + clickhouse-client --query="SELECT count() = $MAX, sum(s) = $MAX FROM test.collapsing" + echo + clickhouse-client --query="SELECT count() = $SUM, sumMerge(s) = $SUM FROM test.aggregating" + clickhouse-client --query="OPTIMIZE TABLE test.aggregating" + clickhouse-client --query="SELECT count() = $MAX, sumMerge(s) = $SUM FROM test.aggregating" + echo + echo +} + +test 8191 8191 +test 8191 8192 +test 8192 8191 +test 8192 8192 +test 8192 8193 +test 8193 8192 +test 8193 8193 +test 8191 8193 +test 8193 8191 +test 8193 8194 +test 8194 8193 +test 8194 8194 + +cleanup