Small fixes

This commit is contained in:
alesapin 2019-03-26 15:37:42 +03:00
parent 0378400972
commit 48fb090cda
4 changed files with 11 additions and 2 deletions

View File

@ -146,6 +146,9 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const
if (!count_average) if (!count_average)
return total_rows == max_block_size; return total_rows == max_block_size;
if (total_rows == 0)
return false;
size_t average = sum / total_rows; size_t average = sum / total_rows;
return total_rows >= average; return total_rows >= average;
} }
@ -155,6 +158,9 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop(size_t total_r
if (!count_average) if (!count_average)
return total_rows == max_block_size; return total_rows == max_block_size;
if (total_rows == 0)
return false;
size_t sum = 0; size_t sum = 0;
for (const auto & [granularity, rows_count] : blocks_granularity) for (const auto & [granularity, rows_count] : blocks_granularity)
sum += granularity * rows_count; sum += granularity * rows_count;

View File

@ -29,6 +29,7 @@ MergeListElement::MergeListElement(const std::string & database, const std::stri
total_size_bytes_compressed += source_part->bytes_on_disk; total_size_bytes_compressed += source_part->bytes_on_disk;
total_size_marks += source_part->getMarksCount(); total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
} }
if (!future_part.parts.empty()) if (!future_part.parts.empty())
@ -60,6 +61,7 @@ MergeInfo MergeListElement::getInfo() const
res.num_parts = num_parts; res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_marks = total_size_marks; res.total_size_marks = total_size_marks;
res.total_rows_count = total_rows_count;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed); res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed); res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed);
res.rows_read = rows_read.load(std::memory_order_relaxed); res.rows_read = rows_read.load(std::memory_order_relaxed);

View File

@ -36,6 +36,7 @@ struct MergeInfo
UInt64 num_parts; UInt64 num_parts;
UInt64 total_size_bytes_compressed; UInt64 total_size_bytes_compressed;
UInt64 total_size_marks; UInt64 total_size_marks;
UInt64 total_rows_count;
UInt64 bytes_read_uncompressed; UInt64 bytes_read_uncompressed;
UInt64 bytes_written_uncompressed; UInt64 bytes_written_uncompressed;
UInt64 rows_read; UInt64 rows_read;
@ -67,6 +68,7 @@ struct MergeListElement : boost::noncopyable
UInt64 total_size_bytes_compressed{}; UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{}; UInt64 total_size_marks{};
UInt64 total_rows_count{};
std::atomic<UInt64> bytes_read_uncompressed{}; std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> bytes_written_uncompressed{}; std::atomic<UInt64> bytes_written_uncompressed{};

View File

@ -536,8 +536,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
new_data_part->relative_path = TMP_PREFIX + future_part.name; new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true; new_data_part->is_temp = true;
/// TODO(alesap) fixme to sum of all index_granularity size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity_info.fixed_index_granularity;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate); MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate);