This commit is contained in:
Alexey Milovidov 2018-09-26 04:30:07 +03:00
parent 042536177d
commit 57822ad592
2 changed files with 15 additions and 6 deletions

View File

@ -102,12 +102,20 @@ Block MergeSortingBlockInputStream::readImpl()
removeConstantsFromBlock(block);
blocks.push_back(block);
sum_rows_in_blocks += block.rows();
sum_bytes_in_blocks += block.allocatedBytes();
/** If significant amount of data was accumulated, perform preliminary merging step.
*/
if (blocks.size() > 1 && limit && remerge_is_useful && max_bytes_before_remerge && sum_bytes_in_blocks > max_bytes_before_remerge)
if (blocks.size() > 1
&& limit
&& limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
&& remerge_is_useful
&& max_bytes_before_remerge
&& sum_bytes_in_blocks > max_bytes_before_remerge)
{
remerge();
}
/** If too many of them and if external sorting is enabled,
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
@ -130,6 +138,7 @@ Block MergeSortingBlockInputStream::readImpl()
blocks.clear();
sum_bytes_in_blocks = 0;
sum_rows_in_blocks = 0;
}
}
@ -264,21 +273,19 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
void MergeSortingBlockInputStream::remerge()
{
size_t num_rows = 0;
for (const auto & block : blocks)
num_rows += block.rows();
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << num_rows << " rows) to save memory consumption");
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit);
Blocks new_blocks;
size_t new_sum_rows_in_blocks = 0;
size_t new_sum_bytes_in_blocks = 0;
merger.readPrefix();
while (Block block = merger.read())
{
new_sum_rows_in_blocks += block.rows();
new_sum_bytes_in_blocks += block.allocatedBytes();
new_blocks.emplace_back(std::move(block));
}
@ -293,6 +300,7 @@ void MergeSortingBlockInputStream::remerge()
remerge_is_useful = false;
blocks = std::move(new_blocks);
sum_rows_in_blocks = new_sum_rows_in_blocks;
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}

View File

@ -97,6 +97,7 @@ private:
Logger * log = &Logger::get("MergeSortingBlockInputStream");
Blocks blocks;
size_t sum_rows_in_blocks = 0;
size_t sum_bytes_in_blocks = 0;
std::unique_ptr<IBlockInputStream> impl;