From 297a1547c5eaedd18e6ca0af0c01d25cdb50ee5a Mon Sep 17 00:00:00 2001 From: Yuri Dyachenko Date: Wed, 19 Oct 2016 20:48:02 +0300 Subject: [PATCH] more usable settings for delayed insert. issue: #METR-22563 --- dbms/include/DB/Core/Defines.h | 3 -- .../DB/Storages/MergeTree/MergeTreeSettings.h | 11 +++-- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 43 +++++++++---------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index 711453b05c2..7ad42b4c6ed 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -15,9 +15,6 @@ #define DBMS_DEFAULT_PING_TIMEOUT_SEC 5 #define DBMS_DEFAULT_POLL_INTERVAL 10 -/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков. -#define DBMS_MAX_DELAY_OF_INSERT 200.0 - /// Размер буфера ввода-вывода по-умолчанию. #define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index c6261d5af98..03f67a28eb1 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -55,9 +55,11 @@ struct MergeTreeSettings /// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу. size_t parts_to_delay_insert = 150; - /// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока. - /// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний. - double insert_delay_step = 1.1; + /// Если в таблице хотя бы столько активных кусков, выдавать ошибку 'Too much parts ...' + size_t parts_to_throw_insert = 300; + + /// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков. + size_t max_delay_to_insert = 200; /** Настройки репликации. */ @@ -127,7 +129,8 @@ struct MergeTreeSettings SET_SIZE_T(old_parts_lifetime); SET_SIZE_T(temporary_directories_lifetime); SET_SIZE_T(parts_to_delay_insert); - SET_DOUBLE(insert_delay_step); + SET_SIZE_T(parts_to_throw_insert); + SET_SIZE_T(max_delay_to_insert); SET_SIZE_T(replicated_deduplication_window); SET_SIZE_T(replicated_logs_to_keep); SET_SIZE_T(prefer_fetch_merged_part_time_threshold); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 67081a26b8e..7daa39a642a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1224,32 +1224,31 @@ bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) co return false; } - void MergeTreeData::delayInsertIfNeeded(Poco::Event * until) { - size_t parts_count = getMaxPartsCountForMonth(); - if (parts_count > settings.parts_to_delay_insert) + const size_t parts_count = getMaxPartsCountForMonth(); + if (parts_count < settings.parts_to_delay_insert) + return; + if (parts_count >= settings.parts_to_throw_insert) { - double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert); - delay /= 1000; - - if (delay > DBMS_MAX_DELAY_OF_INSERT) - { - ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS); - } - - ProfileEvents::increment(ProfileEvents::DelayedInserts); - ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000); - - LOG_INFO(log, "Delaying inserting block by " - << std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts"); - - if (until) - until->tryWait(delay * 1000); - else - std::this_thread::sleep_for(std::chrono::duration(delay)); + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS); } + + const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0 + const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k + const double delay_sec = ::pow(settings.max_delay_to_insert, static_cast(k)/static_cast(max_k)); + + ProfileEvents::increment(ProfileEvents::DelayedInserts); + ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_sec * 1000); + + LOG_INFO(log, "Delaying inserting block by " + << std::fixed << std::setprecision(4) << delay_sec << " sec. because there are " << parts_count << " parts"); + + if (until) + until->tryWait(delay_sec * 1000); + else + std::this_thread::sleep_for(std::chrono::duration(delay_sec)); } MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)