diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 3dc33584657..39bcebee9ea 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -49,27 +49,31 @@ struct Range; struct StorageMergeTreeSettings { - /// В каких случаях можно объединять куски разного уровня. - ssize_t delay_time_to_merge_different_level_parts; - size_t max_level_to_merge_different_level_parts; - size_t max_rows_to_merge_different_level_parts; + /// Набор кусков разрешено объединить, если среди них максимальный размер не более чем во столько раз больше суммы остальных. + /// Должно быть больше 1. + double max_size_ratio_to_merge_parts; + + /// Сколько за раз сливать кусков. + /// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once), так что не следует делать это число слишком большим. + /// С другой стороны, чтобы слияния точно не могли зайти в тупик, нужно хотя бы + /// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts). + size_t max_parts_to_merge_at_once; /// Куски настолько большого размера объединять нельзя вообще. size_t max_rows_to_merge_parts; - /// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. - size_t min_rows_for_concurrent_read; - /// Сколько потоков использовать для объединения кусков. size_t merging_threads; + /// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. + size_t min_rows_for_concurrent_read; + StorageMergeTreeSettings() : - delay_time_to_merge_different_level_parts(36000), - max_level_to_merge_different_level_parts(10), - max_rows_to_merge_different_level_parts(10 * 1024 * 1024), + max_size_ratio_to_merge_parts(5), + max_parts_to_merge_at_once(10), max_rows_to_merge_parts(100 * 1024 * 1024), - min_rows_for_concurrent_read(20 * 8192), - merging_threads(2) {} + merging_threads(2), + min_rows_for_concurrent_read(20 * 8192) {} }; @@ -162,6 +166,7 @@ private: Yandex::DayNum_t right_date; UInt64 left; UInt64 right; + /// Уровень игнорируется. Использовался предыдущей эвристикой слияния. UInt32 level; std::string name; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4322246be91..9a8faed30d0 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1297,119 +1297,95 @@ void StorageMergeTree::joinMergeThreads() } +/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных. +/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления. +/// При этом ограничении стараемся выбрать куски поменьше размером, но побольше количеством: +/// Из подходящих отрезков выбираем отрезок с минимальным максимумом размера. +/// Из всех таких отрезков выбираем отрезок с минимальным минимумом размера. +/// Из всех таких отрезков выбираем отрезок с максимальной длиной. bool StorageMergeTree::selectPartsToMerge(std::vector & parts) { LOG_DEBUG(log, "Selecting parts to merge"); - parts.clear(); - parts.resize(2); - DataPartPtr & left = parts[0]; - DataPartPtr & right = parts[1]; - Poco::ScopedLock lock(data_parts_mutex); - if (data_parts.size() < 2) + size_t min_max; + size_t min_min; + int max_len; + DataParts::iterator best_begin; + bool found = false; + + for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) { - LOG_DEBUG(log, "Too few parts"); - return false; - } - - DataParts::iterator first = data_parts.begin(); - DataParts::iterator second = first; - DataParts::iterator argmin_first = data_parts.end(); - DataParts::iterator argmin_second = data_parts.end(); - ++second; - - /** Два первых подряд идущих куска одинакового минимального уровня, за один, одинаковый месяц. - * Также проверяем, что куски неперекрываются. - * (обратное может быть только после неправильного объединения кусков, если старые куски не были удалены) - * Также проверяем ограничение в settings. - */ - - UInt32 min_adjacent_level = -1U; - while (second != data_parts.end()) - { - if ( !(*first)->currently_merging - && !(*second)->currently_merging - && (*first)->left_month == (*first)->right_month - && (*first)->right_month == (*second)->left_month - && (*second)->left_month == (*second)->right_month - && (*first)->right < (*second)->left - && (*first)->level == (*second)->level - && (*first)->level < min_adjacent_level - && (*first)->size * index_granularity <= settings.max_rows_to_merge_parts - && (*second)->size * index_granularity <= settings.max_rows_to_merge_parts) + const DataPartPtr & part = *it; + + /// Кусок не занят, достаточно мал и в одном месяце. + if (part->currently_merging || + part->size * index_granularity > settings.max_rows_to_merge_parts || + part->left_month != part->right_month) + continue; + + size_t cur_max = part->size; + size_t cur_min = part->size; + size_t cur_sum = part->size; + int cur_len = 1; + + Yandex::DayNum_t month = part->left_month; + UInt64 cur_id = part->right; + + DataParts::iterator jt = it; + for (++jt; jt != data_parts.end() && cur_len < settings.max_parts_to_merge_at_once; ++jt) { - min_adjacent_level = (*first)->level; - argmin_first = first; - argmin_second = second; + /// Кусок не занят, достаточно мал, в одном правильном месяце, правее предыдущего. + if (part->currently_merging || + part->size * index_granularity > settings.max_rows_to_merge_parts || + part->left_month != part->right_month || + part->left_month != month || + part->left < cur_id) + break; + + cur_max = std::max(cur_max, part->size); + cur_min = std::min(cur_min, part->size); + cur_sum += part->size; + ++cur_len; + cur_id = part->right; + + if (cur_max > min_max) + break; + if (cur_len >= 2 && + static_cast(cur_max) / (cur_sum - cur_max) < settings.max_size_ratio_to_merge_parts && + (!found || + std::make_pair(std::make_pair(cur_max, cur_min), -cur_len) < + std::make_pair(std::make_pair(min_max, min_min), -max_len))) + { + found = true; + min_max = cur_max; + min_min = cur_min; + max_len = cur_len; + best_begin = it; + } } - - ++first; - ++second; } - - if (argmin_first != data_parts.end()) + + if (found) { - left = *argmin_first; - right = *argmin_second; - left->currently_merging = true; - right->currently_merging = true; - LOG_DEBUG(log, "Selected parts " << left->name << " and " << right->name); - return true; - } - - first = data_parts.begin(); - second = first; - argmin_first = data_parts.end(); - argmin_second = data_parts.end(); - ++second; - - /** Два подряд идущих куска минимального суммарного размера с временем создания - * раньше текущего минус заданное, за один, одинаковый месяц. - * Также проверяем ограничения в settings. - */ - - time_t cutoff_time = time(0) - settings.delay_time_to_merge_different_level_parts; - size_t min_adjacent_size = -1ULL; - while (second != data_parts.end()) - { - if ( !(*first)->currently_merging - && !(*second)->currently_merging - && (*first)->left_month == (*first)->right_month - && (*first)->right_month == (*second)->left_month - && (*second)->left_month == (*second)->right_month - && (*first)->right < (*second)->left /// Куски неперекрываются. - && (*first)->modification_time < cutoff_time - && (*second)->modification_time < cutoff_time - && (*first)->size + (*second)->size < min_adjacent_size - && (*first)->size * index_granularity <= settings.max_rows_to_merge_different_level_parts - && (*second)->size * index_granularity <= settings.max_rows_to_merge_different_level_parts - && (*first)->level <= settings.max_level_to_merge_different_level_parts - && (*second)->level <= settings.max_level_to_merge_different_level_parts) + parts.clear(); + + DataParts::iterator it = best_begin; + for (int i = 0; i < max_len; ++i) { - min_adjacent_size = (*first)->size + (*second)->size; - argmin_first = first; - argmin_second = second; + parts.push_back(*it); + ++it; } - - ++first; - ++second; + + LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts[0]->name << " to " << parts.back()->name); } - - if (argmin_first != data_parts.end()) + else { - left = *argmin_first; - right = *argmin_second; - left->currently_merging = true; - right->currently_merging = true; - LOG_DEBUG(log, "Selected parts " << left->name << " and " << right->name); - return true; + LOG_DEBUG(log, "No parts to merge"); } - - LOG_DEBUG(log, "No parts to merge"); - parts.clear(); - return false; + + return found; } @@ -1441,9 +1417,9 @@ void StorageMergeTree::mergeParts(std::vector parts) new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date); - /** Читаем из левого и правого куска, сливаем и пишем в новый. + /** Читаем из всех кусков куска, сливаем и пишем в новый. * Попутно вычисляем выражение для сортировки. - * Также, если в одном из кусков есть не все столбцы, то добавляем столбцы с значениями по-умолчанию. + * Также, если некоторых кусках есть не все столбцы, то добавляем столбцы с значениями по-умолчанию. */ BlockInputStreams src_streams;