Merge pull request #3049 from yandex/fix-collapsing-vertical-merge

Fix collapsing vertical merge regression
This commit is contained in:
alexey-milovidov 2018-09-05 21:47:23 +03:00 committed by GitHub
commit 4032e4d22c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 26 deletions

View File

@ -48,37 +48,34 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
return;
}
if (count_positive == count_negative && !last_is_positive)
if (last_is_positive || count_positive != count_negative)
{
/// Input rows exactly cancel out.
return;
}
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}
}
if (out_row_sources_buf)

View File

@ -735,6 +735,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_write_buf->next();
rows_sources_uncompressed_write_buf->next();
size_t rows_sources_count = rows_sources_write_buf->count();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || parts.size() > 1) && sum_input_rows_exact != rows_sources_count)
throw Exception("Number of rows in source parts (" + toString(sum_input_rows_exact)
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();

View File

@ -0,0 +1,3 @@
k1 k1v2 1
k3 k3v1 1
k4 k4v1 -1

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS test.collapsing;
CREATE TABLE test.collapsing(key String, value String, sign Int8) ENGINE CollapsingMergeTree(sign)
ORDER BY key
SETTINGS enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0;
INSERT INTO test.collapsing VALUES ('k1', 'k1v1', 1);
INSERT INTO test.collapsing VALUES ('k1', 'k1v1', -1), ('k1', 'k1v2', 1);
INSERT INTO test.collapsing VALUES ('k2', 'k2v1', 1), ('k2', 'k2v1', -1), ('k3', 'k3v1', 1);
INSERT INTO test.collapsing VALUES ('k4', 'k4v1', -1), ('k4', 'k4v2', 1), ('k4', 'k4v2', -1);
OPTIMIZE TABLE test.collapsing PARTITION tuple() FINAL;
SELECT * FROM test.collapsing ORDER BY key;
DROP TABLE test.collapsing;