mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #3205 from yandex/order-by-remerge
Added remerge step to process ORDER BY with LIMIT when memory usage is high
This commit is contained in:
commit
b326b95592
@ -16,6 +16,9 @@ namespace DB
|
||||
}
|
||||
|
||||
|
||||
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
||||
|
||||
|
||||
MemoryTracker::~MemoryTracker()
|
||||
{
|
||||
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
|
||||
@ -52,6 +55,13 @@ void MemoryTracker::logPeakMemoryUsage() const
|
||||
<< ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
|
||||
}
|
||||
|
||||
static void logMemoryUsage(Int64 amount)
|
||||
{
|
||||
LOG_DEBUG(&Logger::get("MemoryTracker"),
|
||||
"Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
|
||||
}
|
||||
|
||||
|
||||
|
||||
void MemoryTracker::alloc(Int64 size)
|
||||
{
|
||||
@ -101,9 +111,15 @@ void MemoryTracker::alloc(Int64 size)
|
||||
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
||||
}
|
||||
|
||||
if (will_be > peak.load(std::memory_order_relaxed)) /// Races doesn't matter. Could rewrite with CAS, but not worth.
|
||||
auto peak_old = peak.load(std::memory_order_relaxed);
|
||||
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
|
||||
{
|
||||
peak.store(will_be, std::memory_order_relaxed);
|
||||
|
||||
if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
|
||||
logMemoryUsage(will_be);
|
||||
}
|
||||
|
||||
if (auto loaded_next = parent.load(std::memory_order_relaxed))
|
||||
loaded_next->alloc(size);
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <DataStreams/MergingSortedBlockInputStream.h>
|
||||
#include <DataStreams/NativeBlockOutputStream.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/CompressedWriteBuffer.h>
|
||||
|
||||
@ -65,9 +66,10 @@ static void enrichBlockWithConstants(Block & block, const Block & header)
|
||||
|
||||
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
|
||||
const BlockInputStreamPtr & input, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_,
|
||||
size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
|
||||
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
||||
max_bytes_before_remerge(max_bytes_before_remerge_),
|
||||
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
|
||||
{
|
||||
children.push_back(input);
|
||||
@ -100,7 +102,20 @@ Block MergeSortingBlockInputStream::readImpl()
|
||||
removeConstantsFromBlock(block);
|
||||
|
||||
blocks.push_back(block);
|
||||
sum_bytes_in_blocks += block.bytes();
|
||||
sum_rows_in_blocks += block.rows();
|
||||
sum_bytes_in_blocks += block.allocatedBytes();
|
||||
|
||||
/** If significant amount of data was accumulated, perform preliminary merging step.
|
||||
*/
|
||||
if (blocks.size() > 1
|
||||
&& limit
|
||||
&& limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
|
||||
&& remerge_is_useful
|
||||
&& max_bytes_before_remerge
|
||||
&& sum_bytes_in_blocks > max_bytes_before_remerge)
|
||||
{
|
||||
remerge();
|
||||
}
|
||||
|
||||
/** If too many of them and if external sorting is enabled,
|
||||
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
|
||||
@ -123,6 +138,7 @@ Block MergeSortingBlockInputStream::readImpl()
|
||||
|
||||
blocks.clear();
|
||||
sum_bytes_in_blocks = 0;
|
||||
sum_rows_in_blocks = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -255,4 +271,37 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
|
||||
}
|
||||
|
||||
|
||||
void MergeSortingBlockInputStream::remerge()
|
||||
{
|
||||
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
|
||||
|
||||
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
|
||||
MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit);
|
||||
|
||||
Blocks new_blocks;
|
||||
size_t new_sum_rows_in_blocks = 0;
|
||||
size_t new_sum_bytes_in_blocks = 0;
|
||||
|
||||
merger.readPrefix();
|
||||
while (Block block = merger.read())
|
||||
{
|
||||
new_sum_rows_in_blocks += block.rows();
|
||||
new_sum_bytes_in_blocks += block.allocatedBytes();
|
||||
new_blocks.emplace_back(std::move(block));
|
||||
}
|
||||
merger.readSuffix();
|
||||
|
||||
LOG_DEBUG(log, "Memory usage is lowered from "
|
||||
<< formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to "
|
||||
<< formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
|
||||
|
||||
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
|
||||
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
|
||||
remerge_is_useful = false;
|
||||
|
||||
blocks = std::move(new_blocks);
|
||||
sum_rows_in_blocks = new_sum_rows_in_blocks;
|
||||
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -72,6 +72,7 @@ public:
|
||||
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
|
||||
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_,
|
||||
size_t max_bytes_before_remerge_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
|
||||
|
||||
String getName() const override { return "MergeSorting"; }
|
||||
@ -89,12 +90,14 @@ private:
|
||||
size_t max_merged_block_size;
|
||||
size_t limit;
|
||||
|
||||
size_t max_bytes_before_remerge;
|
||||
size_t max_bytes_before_external_sort;
|
||||
const std::string tmp_path;
|
||||
|
||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
||||
|
||||
Blocks blocks;
|
||||
size_t sum_rows_in_blocks = 0;
|
||||
size_t sum_bytes_in_blocks = 0;
|
||||
std::unique_ptr<IBlockInputStream> impl;
|
||||
|
||||
@ -121,6 +124,11 @@ private:
|
||||
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
||||
|
||||
BlockInputStreams inputs_to_merge;
|
||||
|
||||
/// Merge all accumulated blocks to keep no more than limit rows.
|
||||
void remerge();
|
||||
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
|
||||
bool remerge_is_useful = true;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1199,6 +1199,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
|
||||
/// Merge the sorted blocks.
|
||||
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
|
||||
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
|
||||
settings.max_bytes_before_remerge_sort,
|
||||
settings.max_bytes_before_external_sort, context.getTemporaryPath());
|
||||
}
|
||||
|
||||
|
@ -218,6 +218,7 @@ struct Settings
|
||||
M(SettingUInt64, max_bytes_to_sort, 0, "") \
|
||||
M(SettingOverflowMode<false>, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
|
||||
M(SettingUInt64, max_bytes_before_external_sort, 0, "") \
|
||||
M(SettingUInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.") \
|
||||
\
|
||||
M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \
|
||||
M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \
|
||||
|
@ -1,3 +1,3 @@
|
||||
SET max_memory_usage = 100000000;
|
||||
SET max_bytes_before_external_sort = 10000000;
|
||||
SET max_bytes_before_external_sort = 20000000;
|
||||
SELECT number FROM (SELECT number FROM system.numbers LIMIT 10000000) ORDER BY number * 1234567890123456789 LIMIT 9999990, 10;
|
||||
|
20
dbms/tests/queries/0_stateless/00723_remerge_sort.reference
Normal file
20
dbms/tests/queries/0_stateless/00723_remerge_sort.reference
Normal file
@ -0,0 +1,20 @@
|
||||
0
|
||||
1
|
||||
10
|
||||
100
|
||||
1000
|
||||
10000
|
||||
100000
|
||||
1000000
|
||||
1000001
|
||||
1000002
|
||||
0
|
||||
1
|
||||
10
|
||||
100
|
||||
1000
|
||||
10000
|
||||
100000
|
||||
1000000
|
||||
1000001
|
||||
1000002
|
3
dbms/tests/queries/0_stateless/00723_remerge_sort.sql
Normal file
3
dbms/tests/queries/0_stateless/00723_remerge_sort.sql
Normal file
@ -0,0 +1,3 @@
|
||||
SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10;
|
||||
SET max_bytes_before_remerge_sort = 1000000;
|
||||
SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10;
|
Loading…
Reference in New Issue
Block a user