diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 72df102a57f..f630494ae2f 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -48,37 +48,34 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column return; } - if (count_positive == count_negative && !last_is_positive) + if (last_is_positive || count_positive != count_negative) { - /// Input rows exactly cancel out. - return; - } + if (count_positive <= count_negative) + { + ++merged_rows; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num); - if (count_positive <= count_negative) - { - ++merged_rows; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num); + if (out_row_sources_buf) + current_row_sources[first_negative_pos].setSkipFlag(false); + } - if (out_row_sources_buf) - current_row_sources[first_negative_pos].setSkipFlag(false); - } + if (count_positive >= count_negative) + { + ++merged_rows; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num); - if (count_positive >= count_negative) - { - ++merged_rows; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num); + if (out_row_sources_buf) + current_row_sources[last_positive_pos].setSkipFlag(false); + } - if (out_row_sources_buf) - current_row_sources[last_positive_pos].setSkipFlag(false); - } - - if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) - { - if (count_incorrect_data < MAX_ERROR_MESSAGES) - reportIncorrectData(); - ++count_incorrect_data; + if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) + { + if (count_incorrect_data < MAX_ERROR_MESSAGES) + reportIncorrectData(); + ++count_incorrect_data; + } } if (out_row_sources_buf) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index dc4b97ff6e3..7218e8b20d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -735,6 +735,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor rows_sources_write_buf->next(); rows_sources_uncompressed_write_buf->next(); + + size_t rows_sources_count = rows_sources_write_buf->count(); + /// In special case, when there is only one source part, and no rows were skipped, we may have + /// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total + /// number of input rows. + if ((rows_sources_count > 0 || parts.size() > 1) && sum_input_rows_exact != rows_sources_count) + throw Exception("Number of rows in source parts (" + toString(sum_input_rows_exact) + + ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count) + + "). It is a bug.", ErrorCodes::LOGICAL_ERROR); + CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0); for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();