Merge pull request #145 from yurial/yurial-delayed-insert-settings-METR-22563

more usable settings for delayed insert. issue: #METR-22563
This commit is contained in:
alexey-milovidov 2016-10-20 01:43:19 +04:00 committed by GitHub
commit 2611048743
3 changed files with 28 additions and 29 deletions

View File

@ -15,9 +15,6 @@
#define DBMS_DEFAULT_PING_TIMEOUT_SEC 5 #define DBMS_DEFAULT_PING_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10 #define DBMS_DEFAULT_POLL_INTERVAL 10
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
#define DBMS_MAX_DELAY_OF_INSERT 200.0
/// Размер буфера ввода-вывода по-умолчанию. /// Размер буфера ввода-вывода по-умолчанию.
#define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL #define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL

View File

@ -55,9 +55,11 @@ struct MergeTreeSettings
/// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу. /// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу.
size_t parts_to_delay_insert = 150; size_t parts_to_delay_insert = 150;
/// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока. /// Если в таблице хотя бы столько активных кусков, выдавать ошибку 'Too much parts ...'
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний. size_t parts_to_throw_insert = 300;
double insert_delay_step = 1.1;
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
size_t max_delay_to_insert = 200;
/** Настройки репликации. */ /** Настройки репликации. */
@ -127,7 +129,8 @@ struct MergeTreeSettings
SET_SIZE_T(old_parts_lifetime); SET_SIZE_T(old_parts_lifetime);
SET_SIZE_T(temporary_directories_lifetime); SET_SIZE_T(temporary_directories_lifetime);
SET_SIZE_T(parts_to_delay_insert); SET_SIZE_T(parts_to_delay_insert);
SET_DOUBLE(insert_delay_step); SET_SIZE_T(parts_to_throw_insert);
SET_SIZE_T(max_delay_to_insert);
SET_SIZE_T(replicated_deduplication_window); SET_SIZE_T(replicated_deduplication_window);
SET_SIZE_T(replicated_logs_to_keep); SET_SIZE_T(replicated_logs_to_keep);
SET_SIZE_T(prefer_fetch_merged_part_time_threshold); SET_SIZE_T(prefer_fetch_merged_part_time_threshold);

View File

@ -1224,32 +1224,31 @@ bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) co
return false; return false;
} }
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until) void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
{ {
size_t parts_count = getMaxPartsCountForMonth(); const size_t parts_count = getMaxPartsCountForMonth();
if (parts_count > settings.parts_to_delay_insert) if (parts_count < settings.parts_to_delay_insert)
return;
if (parts_count >= settings.parts_to_throw_insert)
{ {
double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert); ProfileEvents::increment(ProfileEvents::RejectedInserts);
delay /= 1000; throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
if (delay > DBMS_MAX_DELAY_OF_INSERT)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
}
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts");
if (until)
until->tryWait(delay * 1000);
else
std::this_thread::sleep_for(std::chrono::duration<double>(delay));
} }
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k
const double delay_sec = ::pow(settings.max_delay_to_insert, static_cast<double>(k)/static_cast<double>(max_k));
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_sec * 1000);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay_sec << " sec. because there are " << parts_count << " parts");
if (until)
until->tryWait(delay_sec * 1000);
else
std::this_thread::sleep_for(std::chrono::duration<double>(delay_sec));
} }
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)