diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index f630494ae2f..4722c260621 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -40,7 +40,7 @@ void CollapsingSortedBlockInputStream::reportIncorrectData() } -void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows) +void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition) { if (count_positive == 0 && count_negative == 0) { @@ -52,7 +52,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column { if (count_positive <= count_negative) { - ++merged_rows; + condition.incrementRowsCountFromGranularity(block_size); for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num); @@ -62,7 +62,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column if (count_positive >= count_negative) { - ++merged_rows; + condition.incrementRowsCountFromGranularity(block_size); for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num); @@ -106,12 +106,14 @@ Block CollapsingSortedBlockInputStream::readImpl() void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { - size_t merged_rows = 0; + MergeStopCondition stop_condition(average_block_sizes, max_block_size); + size_t current_block_granularity; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` for (; !queue.empty(); ++current_pos) { SortCursor current = queue.top(); + current_block_granularity = current->rows; if (current_key.empty()) setPrimaryKeyRef(current_key, current); @@ -122,7 +124,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st bool key_differs = next_key != current_key; /// if there are enough rows and the last one is calculated completely - if (key_differs && merged_rows >= max_block_size) + if (key_differs && stop_condition.checkStop()) { ++blocks_written; return; @@ -133,7 +135,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st if (key_differs) { /// We write data for the previous primary key. - insertRows(merged_columns, merged_rows); + insertRows(merged_columns, current_block_granularity, stop_condition); current_key.swap(next_key); @@ -167,7 +169,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st first_negative_pos = current_pos; } - if (!blocks_written && !merged_rows) + if (!blocks_written && stop_condition.empty()) { setRowRef(last_negative, current); last_negative_pos = current_pos; @@ -193,7 +195,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st } /// Write data for last primary key. - insertRows(merged_columns, merged_rows); + insertRows(merged_columns, current_block_granularity, stop_condition); finished = true; } diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index cf72df30dbd..664e7a55bf9 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -26,8 +26,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream public: CollapsingSortedBlockInputStream( BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) + const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes = false) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes) { sign_column_number = header.getPositionByName(sign_column); } @@ -75,7 +75,7 @@ private: void merge(MutableColumns & merged_columns, std::priority_queue & queue); /// Output to result rows for the current primary key. - void insertRows(MutableColumns & merged_columns, size_t & merged_rows); + void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition); void reportIncorrectData(); }; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 18573a81463..c30e062485c 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -134,6 +134,10 @@ protected: bool checkStop(size_t total_rows) const; bool checkStop() const; + bool empty() const + { + return blocks_granularity.empty(); + } }; diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index fc24bef60bc..90e61218e14 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -16,8 +16,8 @@ namespace ErrorCodes VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( const BlockInputStreams & inputs_, const SortDescription & description_, const String & sign_column_, size_t max_block_size_, - WriteBuffer * out_row_sources_buf_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) + WriteBuffer * out_row_sources_buf_, bool average_block_sizes_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_) , max_rows_in_queue(std::min(std::max(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) , current_keys(max_rows_in_queue + 1) { @@ -83,7 +83,7 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { - size_t merged_rows = 0; + MergeStopCondition stop_condition(average_block_sizes, max_block_size); auto update_queue = [this, & queue](SortCursor & cursor) { @@ -108,6 +108,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co while (!queue.empty()) { SortCursor current = queue.top(); + size_t current_block_granularity = current->rows; RowRef next_key; @@ -154,10 +155,10 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co current_keys.popFront(); - ++merged_rows; + stop_condition.incrementRowsCountFromGranularity(current_block_granularity); --rows_to_merge; - if (merged_rows >= max_block_size) + if (stop_condition.checkStop()) { ++blocks_written; return; @@ -173,7 +174,6 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co insertRow(gap, row, merged_columns); current_keys.popFront(); - ++merged_rows; } /// Write information about last collapsed rows. diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index 573fd66920d..d3da6cffd09 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -178,7 +178,7 @@ public: VersionedCollapsingSortedBlockInputStream( const BlockInputStreams & inputs_, const SortDescription & description_, const String & sign_column_, size_t max_block_size_, - WriteBuffer * out_row_sources_buf_ = nullptr); + WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes_ = false); String getName() const override { return "VersionedCollapsingSorted"; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 34bb134748a..ac3c8d6fb0f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -642,7 +642,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( - src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); + src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get(), blocks_are_granules_size); break; case MergeTreeData::MergingParams::Summing: @@ -668,7 +668,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor case MergeTreeData::MergingParams::VersionedCollapsing: merged_stream = std::make_unique( - src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); + src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get(), blocks_are_granules_size); break; }