Added remerge step to process ORDER BY with LIMIT when memory usage is high #3179

This commit is contained in:
Alexey Milovidov 2018-09-24 23:07:30 +03:00
parent 17b8e20922
commit 8e7c6598e8
4 changed files with 47 additions and 2 deletions

View File

@ -2,6 +2,7 @@
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
@ -65,9 +66,10 @@ static void enrichBlockWithConstants(Block & block, const Block & header)
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
children.push_back(input);
@ -100,7 +102,12 @@ Block MergeSortingBlockInputStream::readImpl()
removeConstantsFromBlock(block);
blocks.push_back(block);
sum_bytes_in_blocks += block.bytes();
sum_bytes_in_blocks += block.allocatedBytes();
/** If significant amount of data was accumulated, perform preliminary merging step.
*/
if (blocks.size() > 1 && limit && remerge_is_useful && max_bytes_before_remerge && sum_bytes_in_blocks > max_bytes_before_remerge)
remerge();
/** If too many of them and if external sorting is enabled,
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
@ -255,4 +262,33 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
}
void MergeSortingBlockInputStream::remerge()
{
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data to save memory consumption");
MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit);
Blocks new_blocks;
size_t new_sum_bytes_in_blocks = 0;
merger.readPrefix();
while (Block block = merger.read())
{
new_sum_bytes_in_blocks += block.allocatedBytes();
new_blocks.emplace_back(std::move(block));
}
merger.readSuffix();
LOG_DEBUG(log, "Memory usage is lowered from "
<< formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to "
<< formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
remerge_is_useful = false;
blocks = std::move(new_blocks);
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}
}

View File

@ -72,6 +72,7 @@ public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
String getName() const override { return "MergeSorting"; }
@ -89,6 +90,7 @@ private:
size_t max_merged_block_size;
size_t limit;
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
@ -121,6 +123,11 @@ private:
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true;
};
}

View File

@ -1199,6 +1199,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
}

View File

@ -218,6 +218,7 @@ struct Settings
M(SettingUInt64, max_bytes_to_sort, 0, "") \
M(SettingOverflowMode<false>, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
M(SettingUInt64, max_bytes_before_external_sort, 0, "") \
M(SettingUInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.") \
\
M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \
M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \