fix CollapsingMergeTree + vertical merge [#CLICKHOUSE-3963]

don't skip writing to rows sources even if all rows have collapsed.
This commit is contained in:
Alexey Zatelepin 2018-09-05 19:13:22 +03:00
parent 60ca1287c1
commit 6275bfd8cd
2 changed files with 33 additions and 26 deletions

View File

@ -48,37 +48,34 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
return;
}
if (count_positive == count_negative && !last_is_positive)
if (last_is_positive || count_positive != count_negative)
{
/// Input rows exactly cancel out.
return;
}
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (count_positive >= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}
}
if (out_row_sources_buf)

View File

@ -735,6 +735,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_write_buf->next();
rows_sources_uncompressed_write_buf->next();
size_t rows_sources_count = rows_sources_write_buf->count();
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || parts.size() > 1) && sum_input_rows_exact != rows_sources_count)
throw Exception("Number of rows in source parts (" + toString(sum_input_rows_exact)
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();