diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index 687d5ae19f4..96ac8a98355 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -17,7 +17,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns, if (out_row_sources_buf) { /// true flag value means "skip row" - current_row_sources.back().setSkipFlag(false); + current_row_sources[max_pos].setSkipFlag(false); out_row_sources_buf->write(reinterpret_cast(current_row_sources.data()), current_row_sources.size() * sizeof(RowSourcePart)); @@ -96,6 +96,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std } /// Initially, skip all rows. Unskip last on insert. + size_t current_pos = current_row_sources.size(); if (out_row_sources_buf) current_row_sources.emplace_back(current.impl->order, true); @@ -103,6 +104,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (version >= max_version) { max_version = version; + max_pos = current_pos; setRowRef(selected_row, current); } diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index 0ab6b185833..dabc1c7e3af 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -43,6 +43,7 @@ private: RowRef selected_row; /// Last row with maximum version for current primary key. UInt64 max_version = 0; /// Max version for current primary key. + size_t max_pos = 0; PODArray current_row_sources; /// Sources of rows with the current primary key diff --git a/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.reference b/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.reference new file mode 100644 index 00000000000..e0f8c3bae3f --- /dev/null +++ b/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.reference @@ -0,0 +1,4 @@ +2018-01-01 0 0 +2018-01-01 1 1 +2018-01-01 2 2 +2018-01-01 2 2 diff --git a/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql b/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql new file mode 100644 index 00000000000..25b53d9b169 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql @@ -0,0 +1,8 @@ +drop table if exists test.tab; +create table test.tab (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; +insert into test.tab values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1); +insert into test.tab values ('2018-01-01', 0, 0); +select * from test.tab order by version; +OPTIMIZE TABLE test.tab; +select * from test.tab; +