This commit is contained in:
Michael Kolupaev 2012-11-29 10:50:17 +00:00
parent f55d19f71a
commit 982686f51c
2 changed files with 92 additions and 111 deletions

View File

@ -49,27 +49,31 @@ struct Range;
struct StorageMergeTreeSettings
{
/// В каких случаях можно объединять куски разного уровня.
ssize_t delay_time_to_merge_different_level_parts;
size_t max_level_to_merge_different_level_parts;
size_t max_rows_to_merge_different_level_parts;
/// Набор кусков разрешено объединить, если среди них максимальный размер не более чем во столько раз больше суммы остальных.
/// Должно быть больше 1.
double max_size_ratio_to_merge_parts;
/// Сколько за раз сливать кусков.
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once), так что не следует делать это число слишком большим.
/// С другой стороны, чтобы слияния точно не могли зайти в тупик, нужно хотя бы
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts).
size_t max_parts_to_merge_at_once;
/// Куски настолько большого размера объединять нельзя вообще.
size_t max_rows_to_merge_parts;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read;
/// Сколько потоков использовать для объединения кусков.
size_t merging_threads;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read;
StorageMergeTreeSettings() :
delay_time_to_merge_different_level_parts(36000),
max_level_to_merge_different_level_parts(10),
max_rows_to_merge_different_level_parts(10 * 1024 * 1024),
max_size_ratio_to_merge_parts(5),
max_parts_to_merge_at_once(10),
max_rows_to_merge_parts(100 * 1024 * 1024),
min_rows_for_concurrent_read(20 * 8192),
merging_threads(2) {}
merging_threads(2),
min_rows_for_concurrent_read(20 * 8192) {}
};
@ -162,6 +166,7 @@ private:
Yandex::DayNum_t right_date;
UInt64 left;
UInt64 right;
/// Уровень игнорируется. Использовался предыдущей эвристикой слияния.
UInt32 level;
std::string name;

View File

@ -1297,119 +1297,95 @@ void StorageMergeTree::joinMergeThreads()
}
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
/// При этом ограничении стараемся выбрать куски поменьше размером, но побольше количеством:
/// Из подходящих отрезков выбираем отрезок с минимальным максимумом размера.
/// Из всех таких отрезков выбираем отрезок с минимальным минимумом размера.
/// Из всех таких отрезков выбираем отрезок с максимальной длиной.
bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts)
{
LOG_DEBUG(log, "Selecting parts to merge");
parts.clear();
parts.resize(2);
DataPartPtr & left = parts[0];
DataPartPtr & right = parts[1];
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
if (data_parts.size() < 2)
size_t min_max;
size_t min_min;
int max_len;
DataParts::iterator best_begin;
bool found = false;
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
{
LOG_DEBUG(log, "Too few parts");
return false;
}
DataParts::iterator first = data_parts.begin();
DataParts::iterator second = first;
DataParts::iterator argmin_first = data_parts.end();
DataParts::iterator argmin_second = data_parts.end();
++second;
/** Два первых подряд идущих куска одинакового минимального уровня, за один, одинаковый месяц.
* Также проверяем, что куски неперекрываются.
* (обратное может быть только после неправильного объединения кусков, если старые куски не были удалены)
* Также проверяем ограничение в settings.
*/
UInt32 min_adjacent_level = -1U;
while (second != data_parts.end())
{
if ( !(*first)->currently_merging
&& !(*second)->currently_merging
&& (*first)->left_month == (*first)->right_month
&& (*first)->right_month == (*second)->left_month
&& (*second)->left_month == (*second)->right_month
&& (*first)->right < (*second)->left
&& (*first)->level == (*second)->level
&& (*first)->level < min_adjacent_level
&& (*first)->size * index_granularity <= settings.max_rows_to_merge_parts
&& (*second)->size * index_granularity <= settings.max_rows_to_merge_parts)
const DataPartPtr & part = *it;
/// Кусок не занят, достаточно мал и в одном месяце.
if (part->currently_merging ||
part->size * index_granularity > settings.max_rows_to_merge_parts ||
part->left_month != part->right_month)
continue;
size_t cur_max = part->size;
size_t cur_min = part->size;
size_t cur_sum = part->size;
int cur_len = 1;
Yandex::DayNum_t month = part->left_month;
UInt64 cur_id = part->right;
DataParts::iterator jt = it;
for (++jt; jt != data_parts.end() && cur_len < settings.max_parts_to_merge_at_once; ++jt)
{
min_adjacent_level = (*first)->level;
argmin_first = first;
argmin_second = second;
/// Кусок не занят, достаточно мал, в одном правильном месяце, правее предыдущего.
if (part->currently_merging ||
part->size * index_granularity > settings.max_rows_to_merge_parts ||
part->left_month != part->right_month ||
part->left_month != month ||
part->left < cur_id)
break;
cur_max = std::max(cur_max, part->size);
cur_min = std::min(cur_min, part->size);
cur_sum += part->size;
++cur_len;
cur_id = part->right;
if (cur_max > min_max)
break;
if (cur_len >= 2 &&
static_cast<double>(cur_max) / (cur_sum - cur_max) < settings.max_size_ratio_to_merge_parts &&
(!found ||
std::make_pair(std::make_pair(cur_max, cur_min), -cur_len) <
std::make_pair(std::make_pair(min_max, min_min), -max_len)))
{
found = true;
min_max = cur_max;
min_min = cur_min;
max_len = cur_len;
best_begin = it;
}
}
++first;
++second;
}
if (argmin_first != data_parts.end())
if (found)
{
left = *argmin_first;
right = *argmin_second;
left->currently_merging = true;
right->currently_merging = true;
LOG_DEBUG(log, "Selected parts " << left->name << " and " << right->name);
return true;
}
first = data_parts.begin();
second = first;
argmin_first = data_parts.end();
argmin_second = data_parts.end();
++second;
/** Два подряд идущих куска минимального суммарного размера с временем создания
* раньше текущего минус заданное, за один, одинаковый месяц.
* Также проверяем ограничения в settings.
*/
time_t cutoff_time = time(0) - settings.delay_time_to_merge_different_level_parts;
size_t min_adjacent_size = -1ULL;
while (second != data_parts.end())
{
if ( !(*first)->currently_merging
&& !(*second)->currently_merging
&& (*first)->left_month == (*first)->right_month
&& (*first)->right_month == (*second)->left_month
&& (*second)->left_month == (*second)->right_month
&& (*first)->right < (*second)->left /// Куски неперекрываются.
&& (*first)->modification_time < cutoff_time
&& (*second)->modification_time < cutoff_time
&& (*first)->size + (*second)->size < min_adjacent_size
&& (*first)->size * index_granularity <= settings.max_rows_to_merge_different_level_parts
&& (*second)->size * index_granularity <= settings.max_rows_to_merge_different_level_parts
&& (*first)->level <= settings.max_level_to_merge_different_level_parts
&& (*second)->level <= settings.max_level_to_merge_different_level_parts)
parts.clear();
DataParts::iterator it = best_begin;
for (int i = 0; i < max_len; ++i)
{
min_adjacent_size = (*first)->size + (*second)->size;
argmin_first = first;
argmin_second = second;
parts.push_back(*it);
++it;
}
++first;
++second;
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts[0]->name << " to " << parts.back()->name);
}
if (argmin_first != data_parts.end())
else
{
left = *argmin_first;
right = *argmin_second;
left->currently_merging = true;
right->currently_merging = true;
LOG_DEBUG(log, "Selected parts " << left->name << " and " << right->name);
return true;
LOG_DEBUG(log, "No parts to merge");
}
LOG_DEBUG(log, "No parts to merge");
parts.clear();
return false;
return found;
}
@ -1441,9 +1417,9 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
/** Читаем из левого и правого куска, сливаем и пишем в новый.
/** Читаем из всех кусков куска, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
* Также, если в одном из кусков есть не все столбцы, то добавляем столбцы с значениями по-умолчанию.
* Также, если некоторых кусках есть не все столбцы, то добавляем столбцы с значениями по-умолчанию.
*/
BlockInputStreams src_streams;