diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 19d3453d3c3..7275cb11726 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -146,6 +146,9 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const if (!count_average) return total_rows == max_block_size; + if (total_rows == 0) + return false; + size_t average = sum / total_rows; return total_rows >= average; } @@ -155,6 +158,9 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop(size_t total_r if (!count_average) return total_rows == max_block_size; + if (total_rows == 0) + return false; + size_t sum = 0; for (const auto & [granularity, rows_count] : blocks_granularity) sum += granularity * rows_count; diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index 5f590de9bde..179f0354999 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -29,6 +29,7 @@ MergeListElement::MergeListElement(const std::string & database, const std::stri total_size_bytes_compressed += source_part->bytes_on_disk; total_size_marks += source_part->getMarksCount(); + total_rows_count += source_part->index_granularity.getTotalRows(); } if (!future_part.parts.empty()) @@ -60,6 +61,7 @@ MergeInfo MergeListElement::getInfo() const res.num_parts = num_parts; res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_marks = total_size_marks; + res.total_rows_count = total_rows_count; res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed); res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed); res.rows_read = rows_read.load(std::memory_order_relaxed); diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 2d800c85896..dc2d1c80682 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -36,6 +36,7 @@ struct MergeInfo UInt64 num_parts; UInt64 total_size_bytes_compressed; UInt64 total_size_marks; + UInt64 total_rows_count; UInt64 bytes_read_uncompressed; UInt64 bytes_written_uncompressed; UInt64 rows_read; @@ -67,6 +68,7 @@ struct MergeListElement : boost::noncopyable UInt64 total_size_bytes_compressed{}; UInt64 total_size_marks{}; + UInt64 total_rows_count{}; std::atomic bytes_read_uncompressed{}; std::atomic bytes_written_uncompressed{}; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index ac3c8d6fb0f..9b5ae02274e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -536,8 +536,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor new_data_part->relative_path = TMP_PREFIX + future_part.name; new_data_part->is_temp = true; - /// TODO(alesap) fixme to sum of all index_granularity - size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity_info.fixed_index_granularity; + size_t sum_input_rows_upper_bound = merge_entry->total_rows_count; MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate);