From 8e7c6598e8816d7b95bf2bdd717e9c179d3cfff7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 24 Sep 2018 23:07:30 +0300 Subject: [PATCH 1/6] Added remerge step to process ORDER BY with LIMIT when memory usage is high #3179 --- .../MergeSortingBlockInputStream.cpp | 40 ++++++++++++++++++- .../MergeSortingBlockInputStream.h | 7 ++++ .../Interpreters/InterpreterSelectQuery.cpp | 1 + dbms/src/Interpreters/Settings.h | 1 + 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index b062c679c0a..97f7398c32c 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -65,9 +66,10 @@ static void enrichBlockWithConstants(Block & block, const Block & header) MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, SortDescription & description_, - size_t max_merged_block_size_, size_t limit_, + size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_) : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), + max_bytes_before_remerge(max_bytes_before_remerge_), max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) { children.push_back(input); @@ -100,7 +102,12 @@ Block MergeSortingBlockInputStream::readImpl() removeConstantsFromBlock(block); blocks.push_back(block); - sum_bytes_in_blocks += block.bytes(); + sum_bytes_in_blocks += block.allocatedBytes(); + + /** If significant amount of data was accumulated, perform preliminary merging step. + */ + if (blocks.size() > 1 && limit && remerge_is_useful && max_bytes_before_remerge && sum_bytes_in_blocks > max_bytes_before_remerge) + remerge(); /** If too many of them and if external sorting is enabled, * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. @@ -255,4 +262,33 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue sum_bytes_in_blocks) + remerge_is_useful = false; + + blocks = std::move(new_blocks); + sum_bytes_in_blocks = new_sum_bytes_in_blocks; +} + } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index ad6d81984cc..5fd7490bdae 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -72,6 +72,7 @@ public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, size_t limit_, + size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_); String getName() const override { return "MergeSorting"; } @@ -89,6 +90,7 @@ private: size_t max_merged_block_size; size_t limit; + size_t max_bytes_before_remerge; size_t max_bytes_before_external_sort; const std::string tmp_path; @@ -121,6 +123,11 @@ private: std::vector> temporary_inputs; BlockInputStreams inputs_to_merge; + + /// Merge all accumulated blocks to keep no more than limit rows. + void remerge(); + /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. + bool remerge_is_useful = true; }; } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 65acba867a6..45edccf723c 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1199,6 +1199,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline) /// Merge the sorted blocks. pipeline.firstStream() = std::make_shared( pipeline.firstStream(), order_descr, settings.max_block_size, limit, + settings.max_bytes_before_remerge_sort, settings.max_bytes_before_external_sort, context.getTemporaryPath()); } diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 1928d9d0828..60bd04fd5f2 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -218,6 +218,7 @@ struct Settings M(SettingUInt64, max_bytes_to_sort, 0, "") \ M(SettingOverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ M(SettingUInt64, max_bytes_before_external_sort, 0, "") \ + M(SettingUInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.") \ \ M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \ M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \ From 81e2fe14e5e494b01dcee3aef3f12dfa95834013 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 24 Sep 2018 23:30:02 +0300 Subject: [PATCH 2/6] Better messages #3205 --- dbms/src/DataStreams/MergeSortingBlockInputStream.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 97f7398c32c..c8b3e8aecaa 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -264,8 +264,13 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue Date: Mon, 24 Sep 2018 23:57:10 +0300 Subject: [PATCH 3/6] Memory tracker: added watermark logging #3205 --- dbms/src/Common/MemoryTracker.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 7e957ae1ae4..082477c1d9a 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -16,6 +16,9 @@ namespace DB } +static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; + + MemoryTracker::~MemoryTracker() { if (static_cast(level) < static_cast(VariableContext::Process) && peak) @@ -52,6 +55,13 @@ void MemoryTracker::logPeakMemoryUsage() const << ": " << formatReadableSizeWithBinarySuffix(peak) << "."); } +static void logMemoryUsage(Int64 amount) +{ + LOG_DEBUG(&Logger::get("MemoryTracker"), + "Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << "."); +} + + void MemoryTracker::alloc(Int64 size) { @@ -101,9 +111,15 @@ void MemoryTracker::alloc(Int64 size) throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); } - if (will_be > peak.load(std::memory_order_relaxed)) /// Races doesn't matter. Could rewrite with CAS, but not worth. + auto peak_old = peak.load(std::memory_order_relaxed); + if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. + { peak.store(will_be, std::memory_order_relaxed); + if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) + logMemoryUsage(will_be); + } + if (auto loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->alloc(size); } From 5cc8e802d511b36b167f77f32b7e1d5757313993 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 25 Sep 2018 00:02:40 +0300 Subject: [PATCH 4/6] Added test #3205 --- .../0_stateless/00723_remerge_sort.reference | 20 +++++++++++++++++++ .../0_stateless/00723_remerge_sort.sql | 3 +++ 2 files changed, 23 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00723_remerge_sort.reference create mode 100644 dbms/tests/queries/0_stateless/00723_remerge_sort.sql diff --git a/dbms/tests/queries/0_stateless/00723_remerge_sort.reference b/dbms/tests/queries/0_stateless/00723_remerge_sort.reference new file mode 100644 index 00000000000..e22c5f2d430 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00723_remerge_sort.reference @@ -0,0 +1,20 @@ +0 +1 +10 +100 +1000 +10000 +100000 +1000000 +1000001 +1000002 +0 +1 +10 +100 +1000 +10000 +100000 +1000000 +1000001 +1000002 diff --git a/dbms/tests/queries/0_stateless/00723_remerge_sort.sql b/dbms/tests/queries/0_stateless/00723_remerge_sort.sql new file mode 100644 index 00000000000..562a5e80223 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00723_remerge_sort.sql @@ -0,0 +1,3 @@ +SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10; +SET max_bytes_before_remerge_sort = 1000000; +SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10; From 57822ad59213ec5a70843e0bc418007083bf15e1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 26 Sep 2018 04:30:07 +0300 Subject: [PATCH 5/6] Improvement #3205 --- .../MergeSortingBlockInputStream.cpp | 20 +++++++++++++------ .../MergeSortingBlockInputStream.h | 1 + 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index c8b3e8aecaa..6122f54630d 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -102,12 +102,20 @@ Block MergeSortingBlockInputStream::readImpl() removeConstantsFromBlock(block); blocks.push_back(block); + sum_rows_in_blocks += block.rows(); sum_bytes_in_blocks += block.allocatedBytes(); /** If significant amount of data was accumulated, perform preliminary merging step. */ - if (blocks.size() > 1 && limit && remerge_is_useful && max_bytes_before_remerge && sum_bytes_in_blocks > max_bytes_before_remerge) + if (blocks.size() > 1 + && limit + && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. + && remerge_is_useful + && max_bytes_before_remerge + && sum_bytes_in_blocks > max_bytes_before_remerge) + { remerge(); + } /** If too many of them and if external sorting is enabled, * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. @@ -130,6 +138,7 @@ Block MergeSortingBlockInputStream::readImpl() blocks.clear(); sum_bytes_in_blocks = 0; + sum_rows_in_blocks = 0; } } @@ -264,21 +273,19 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue impl; From 33e48652405d2a308d7f2eb1c1e251a16895fcbb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 26 Sep 2018 05:33:29 +0300 Subject: [PATCH 6/6] Adjusted test #3205 --- dbms/tests/queries/0_stateless/00110_external_sort.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/tests/queries/0_stateless/00110_external_sort.sql b/dbms/tests/queries/0_stateless/00110_external_sort.sql index 6f56de66e3b..91459d2dabb 100644 --- a/dbms/tests/queries/0_stateless/00110_external_sort.sql +++ b/dbms/tests/queries/0_stateless/00110_external_sort.sql @@ -1,3 +1,3 @@ SET max_memory_usage = 100000000; -SET max_bytes_before_external_sort = 10000000; +SET max_bytes_before_external_sort = 20000000; SELECT number FROM (SELECT number FROM system.numbers LIMIT 10000000) ORDER BY number * 1234567890123456789 LIMIT 9999990, 10;