Merge pull request #1938 from yandex/fix-replacing-merge-tree

Fix replacing merge tree vertical merge
This commit is contained in:
alexey-milovidov 2018-02-22 02:06:24 +03:00 committed by GitHub
commit 533b5cbe00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 1 deletions

View File

@ -17,7 +17,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns,
if (out_row_sources_buf) if (out_row_sources_buf)
{ {
/// true flag value means "skip row" /// true flag value means "skip row"
current_row_sources.back().setSkipFlag(false); current_row_sources[max_pos].setSkipFlag(false);
out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()), out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart)); current_row_sources.size() * sizeof(RowSourcePart));
@ -96,6 +96,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
} }
/// Initially, skip all rows. Unskip last on insert. /// Initially, skip all rows. Unskip last on insert.
size_t current_pos = current_row_sources.size();
if (out_row_sources_buf) if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true); current_row_sources.emplace_back(current.impl->order, true);
@ -103,6 +104,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
if (version >= max_version) if (version >= max_version)
{ {
max_version = version; max_version = version;
max_pos = current_pos;
setRowRef(selected_row, current); setRowRef(selected_row, current);
} }

View File

@ -43,6 +43,7 @@ private:
RowRef selected_row; /// Last row with maximum version for current primary key. RowRef selected_row; /// Last row with maximum version for current primary key.
UInt64 max_version = 0; /// Max version for current primary key. UInt64 max_version = 0; /// Max version for current primary key.
size_t max_pos = 0;
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key

View File

@ -0,0 +1,4 @@
2018-01-01 0 0
2018-01-01 1 1
2018-01-01 2 2
2018-01-01 2 2

View File

@ -0,0 +1,8 @@
drop table if exists test.tab;
create table test.tab (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.tab values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
insert into test.tab values ('2018-01-01', 0, 0);
select * from test.tab order by version;
OPTIMIZE TABLE test.tab;
select * from test.tab;