more usable settings for delayed insert. issue: #METR-22563

This commit is contained in:
Yuri Dyachenko 2016-10-19 20:48:02 +03:00
parent cd33a9dbda
commit 297a1547c5
3 changed files with 28 additions and 29 deletions

View File

@ -15,9 +15,6 @@
#define DBMS_DEFAULT_PING_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
#define DBMS_MAX_DELAY_OF_INSERT 200.0
/// Размер буфера ввода-вывода по-умолчанию.
#define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL

View File

@ -55,9 +55,11 @@ struct MergeTreeSettings
/// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу.
size_t parts_to_delay_insert = 150;
/// Если в таблице parts_to_delay_insert + k кусков, спать insert_delay_step^k миллисекунд перед вставкой каждого блока.
/// Таким образом, скорость вставок автоматически замедлится примерно до скорости слияний.
double insert_delay_step = 1.1;
/// Если в таблице хотя бы столько активных кусков, выдавать ошибку 'Too much parts ...'
size_t parts_to_throw_insert = 300;
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
size_t max_delay_to_insert = 200;
/** Настройки репликации. */
@ -127,7 +129,8 @@ struct MergeTreeSettings
SET_SIZE_T(old_parts_lifetime);
SET_SIZE_T(temporary_directories_lifetime);
SET_SIZE_T(parts_to_delay_insert);
SET_DOUBLE(insert_delay_step);
SET_SIZE_T(parts_to_throw_insert);
SET_SIZE_T(max_delay_to_insert);
SET_SIZE_T(replicated_deduplication_window);
SET_SIZE_T(replicated_logs_to_keep);
SET_SIZE_T(prefer_fetch_merged_part_time_threshold);

View File

@ -1224,32 +1224,31 @@ bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) co
return false;
}
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
{
size_t parts_count = getMaxPartsCountForMonth();
if (parts_count > settings.parts_to_delay_insert)
const size_t parts_count = getMaxPartsCountForMonth();
if (parts_count < settings.parts_to_delay_insert)
return;
if (parts_count >= settings.parts_to_throw_insert)
{
double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
delay /= 1000;
if (delay > DBMS_MAX_DELAY_OF_INSERT)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
}
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts");
if (until)
until->tryWait(delay * 1000);
else
std::this_thread::sleep_for(std::chrono::duration<double>(delay));
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
}
const size_t max_k = settings.parts_to_throw_insert - settings.parts_to_delay_insert; /// always > 0
const size_t k = 1 + parts_count - settings.parts_to_delay_insert; /// from 1 to max_k
const double delay_sec = ::pow(settings.max_delay_to_insert, static_cast<double>(k)/static_cast<double>(max_k));
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_sec * 1000);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay_sec << " sec. because there are " << parts_count << " parts");
if (until)
until->tryWait(delay_sec * 1000);
else
std::this_thread::sleep_for(std::chrono::duration<double>(delay_sec));
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)