diff --git a/src/DataStreams/ColumnGathererStream.cpp b/src/DataStreams/ColumnGathererStream.cpp index 2468ae872a2..683b8012efe 100644 --- a/src/DataStreams/ColumnGathererStream.cpp +++ b/src/DataStreams/ColumnGathererStream.cpp @@ -53,7 +53,7 @@ ColumnGathererStream::ColumnGathererStream( Block ColumnGathererStream::readImpl() { /// Special case: single source and there are no skipped rows - if (children.size() == 1 && row_sources_buf.eof()) + if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy) return children[0]->read(); if (!source_to_fully_copy && row_sources_buf.eof()) diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp index 132241844d7..b8c788ed1fc 100644 --- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp @@ -93,6 +93,10 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge() } } + /// If have enough rows, return block, because it prohibited to overflow requested number of rows. + if (merged_data.hasEnoughRows()) + return Status(merged_data.pull()); + /// We will write the data for the last primary key. if (!selected_row.empty()) insertRow(); diff --git a/tests/queries/0_stateless/01825_replacing_vertical_merge.reference b/tests/queries/0_stateless/01825_replacing_vertical_merge.reference new file mode 100644 index 00000000000..18fcfcd8f8e --- /dev/null +++ b/tests/queries/0_stateless/01825_replacing_vertical_merge.reference @@ -0,0 +1,4 @@ +1720 32 +220 17 +33558527 8193 +33550336 8192 diff --git a/tests/queries/0_stateless/01825_replacing_vertical_merge.sql b/tests/queries/0_stateless/01825_replacing_vertical_merge.sql new file mode 100644 index 00000000000..832fd4173b3 --- /dev/null +++ b/tests/queries/0_stateless/01825_replacing_vertical_merge.sql @@ -0,0 +1,44 @@ +SET optimize_on_insert = 0; + +DROP TABLE IF EXISTS replacing_table; + +CREATE TABLE replacing_table (a UInt32, b UInt32, c UInt32) +ENGINE = ReplacingMergeTree ORDER BY a +SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1, + vertical_merge_algorithm_min_columns_to_activate = 1, + index_granularity = 16, + min_bytes_for_wide_part = 0, + merge_max_block_size = 16; + +INSERT INTO replacing_table SELECT number, number, number from numbers(16); +INSERT INTO replacing_table SELECT 100, number, number from numbers(16); + +SELECT sum(a), count() FROM replacing_table; + +OPTIMIZE TABLE replacing_table FINAL; + +SELECT sum(a), count() FROM replacing_table; + +DROP TABLE IF EXISTS replacing_table; + +CREATE TABLE replacing_table +( + key UInt64, + value UInt64 +) +ENGINE = ReplacingMergeTree +ORDER BY key +SETTINGS + vertical_merge_algorithm_min_rows_to_activate=0, + vertical_merge_algorithm_min_columns_to_activate=0, + min_bytes_for_wide_part = 0; + +INSERT INTO replacing_table SELECT if(number == 8192, 8191, number), 1 FROM numbers(8193); + +SELECT sum(key), count() from replacing_table; + +OPTIMIZE TABLE replacing_table FINAL; + +SELECT sum(key), count() from replacing_table; + +DROP TABLE IF EXISTS replacing_table;