2014-03-13 12:48:07 +00:00
# include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
# include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
# include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
# include <DB/DataStreams/ExpressionBlockInputStream.h>
# include <DB/DataStreams/MergingSortedBlockInputStream.h>
# include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
# include <DB/DataStreams/SummingSortedBlockInputStream.h>
2014-05-26 16:11:20 +00:00
# include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
2014-03-13 12:48:07 +00:00
namespace DB
{
/// Н е будем соглашаться мерджить куски, если места на диске менее чем во столько раз больше суммарного размера кусков.
2014-03-13 17:44:00 +00:00
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 1.6 ;
/// Объединяя куски, зарезервируем столько места на диске. Лучше сделать немного меньше, чем DISK_USAGE_COEFFICIENT_TO_SELECT,
/// потому что между выбором кусков и резервированием места места может стать немного меньше.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4 ;
2014-03-13 12:48:07 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
2014-07-23 15:24:45 +00:00
/// При max_parts_to_merge_at_once >= log(max_bytes_to_merge_parts)/log(max_size_ratio_to_merge_parts),
2014-03-13 12:48:07 +00:00
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
2014-07-23 15:24:45 +00:00
/// log(max_bytes_to_merge_parts)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_bytes_to_merge_parts).
2014-03-13 12:48:07 +00:00
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
/// Из всех таких выбираем отрезок с минимальным минимумом размера.
/// Из всех таких выбираем отрезок с максимальной длиной.
/// Дополнительно:
/// 1) с 1:00 до 5:00 ограничение сверху на размер куска в основном потоке увеличивается в несколько раз
/// 2) в зависимоти от возраста кусков меняется допустимая неравномерность при слиянии
/// 3) Молодые куски крупного размера (примерно больше 1 Гб) можно сливать не меньше чем по три
/// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки
/// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности
2014-04-04 10:37:33 +00:00
bool MergeTreeDataMerger : : selectPartsToMerge ( MergeTreeData : : DataPartsVector & parts , String & merged_name , size_t available_disk_space ,
2014-03-13 17:44:00 +00:00
bool merge_anything_for_old_months , bool aggressive , bool only_small , const AllowedMergingPredicate & can_merge )
2014-03-13 12:48:07 +00:00
{
MergeTreeData : : DataParts data_parts = data . getDataParts ( ) ;
2014-07-08 23:52:53 +00:00
DateLUT & date_lut = DateLUT : : instance ( ) ;
2014-03-13 12:48:07 +00:00
size_t min_max = - 1U ;
size_t min_min = - 1U ;
int max_len = 0 ;
MergeTreeData : : DataParts : : iterator best_begin ;
bool found = false ;
DayNum_t now_day = date_lut . toDayNum ( time ( 0 ) ) ;
DayNum_t now_month = date_lut . toFirstDayNumOfMonth ( now_day ) ;
int now_hour = date_lut . toHourInaccurate ( time ( 0 ) ) ;
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
int max_count_from_left = 0 ;
2014-07-23 15:24:45 +00:00
size_t cur_max_bytes_to_merge_parts = data . settings . max_bytes_to_merge_parts ;
2014-03-13 12:48:07 +00:00
/// Если ночь, можем мерджить сильно большие куски
if ( now_hour > = 1 & & now_hour < = 5 )
2014-07-23 15:24:45 +00:00
cur_max_bytes_to_merge_parts * = data . settings . merge_parts_at_night_inc ;
2014-03-13 12:48:07 +00:00
2014-03-13 17:44:00 +00:00
if ( only_small )
2014-07-23 15:24:45 +00:00
cur_max_bytes_to_merge_parts = data . settings . max_bytes_to_merge_parts_small ;
2014-03-13 12:48:07 +00:00
2014-04-10 12:30:59 +00:00
/// Найдем суммарный размер еще не пройденных кусков (то есть всех).
2014-07-23 15:24:45 +00:00
size_t size_in_bytes_of_remaining_parts = 0 ;
2014-04-10 12:30:59 +00:00
for ( const auto & part : data_parts )
2014-07-23 15:24:45 +00:00
size_in_bytes_of_remaining_parts + = part - > size_in_bytes ;
2014-04-10 12:30:59 +00:00
2014-03-13 12:48:07 +00:00
/// Левый конец отрезка.
for ( MergeTreeData : : DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
{
const MergeTreeData : : DataPartPtr & first_part = * it ;
max_count_from_left = std : : max ( 0 , max_count_from_left - 1 ) ;
2014-07-23 15:24:45 +00:00
size_in_bytes_of_remaining_parts - = first_part - > size_in_bytes ;
2014-03-13 12:48:07 +00:00
/// Кусок достаточно мал или слияние "агрессивное".
2014-07-23 15:24:45 +00:00
if ( first_part - > size_in_bytes > cur_max_bytes_to_merge_parts
2014-03-13 12:48:07 +00:00
& & ! aggressive )
2014-04-11 13:05:17 +00:00
{
2014-03-13 12:48:07 +00:00
continue ;
2014-04-11 13:05:17 +00:00
}
2014-03-13 12:48:07 +00:00
/// Кусок в одном месяце.
if ( first_part - > left_month ! = first_part - > right_month )
{
LOG_WARNING ( log , " Part " < < first_part - > name < < " spans more than one month " ) ;
continue ;
}
/// Самый длинный валидный отрезок, начинающийся здесь.
size_t cur_longest_max = - 1U ;
size_t cur_longest_min = - 1U ;
int cur_longest_len = 0 ;
/// Текущий отрезок, не обязательно валидный.
2014-07-23 15:24:45 +00:00
size_t cur_max = first_part - > size_in_bytes ;
size_t cur_min = first_part - > size_in_bytes ;
size_t cur_sum = first_part - > size_in_bytes ;
2014-03-13 12:48:07 +00:00
int cur_len = 1 ;
DayNum_t month = first_part - > left_month ;
UInt64 cur_id = first_part - > right ;
/// Этот месяц кончился хотя бы день назад.
bool is_old_month = now_day - now_month > = 1 & & now_month > month ;
time_t oldest_modification_time = first_part - > modification_time ;
/// Правый конец отрезка.
MergeTreeData : : DataParts : : iterator jt = it ;
2014-03-13 17:44:00 +00:00
for ( ; cur_len < static_cast < int > ( data . settings . max_parts_to_merge_at_once ) ; )
2014-03-13 12:48:07 +00:00
{
2014-03-13 17:44:00 +00:00
const MergeTreeData : : DataPartPtr & prev_part = * jt ;
+ + jt ;
if ( jt = = data_parts . end ( ) )
break ;
2014-03-13 12:48:07 +00:00
const MergeTreeData : : DataPartPtr & last_part = * jt ;
2014-03-13 17:44:00 +00:00
/// Кусок разрешено сливать с предыдущим, и в одном правильном месяце.
2014-05-07 13:56:01 +00:00
if ( last_part - > left_month ! = last_part - > right_month | |
last_part - > left_month ! = month | |
! can_merge ( prev_part , last_part ) )
2014-04-11 13:05:17 +00:00
{
2014-03-13 12:48:07 +00:00
break ;
2014-04-11 13:05:17 +00:00
}
2014-03-13 12:48:07 +00:00
/// Кусок достаточно мал или слияние "агрессивное".
2014-07-23 15:24:45 +00:00
if ( last_part - > size_in_bytes > cur_max_bytes_to_merge_parts
2014-03-13 12:48:07 +00:00
& & ! aggressive )
break ;
/// Кусок правее предыдущего.
if ( last_part - > left < cur_id )
{
LOG_WARNING ( log , " Part " < < last_part - > name < < " intersects previous part " ) ;
break ;
}
oldest_modification_time = std : : max ( oldest_modification_time , last_part - > modification_time ) ;
2014-07-23 15:24:45 +00:00
cur_max = std : : max ( cur_max , static_cast < size_t > ( last_part - > size_in_bytes ) ) ;
cur_min = std : : min ( cur_min , static_cast < size_t > ( last_part - > size_in_bytes ) ) ;
cur_sum + = last_part - > size_in_bytes ;
2014-03-13 12:48:07 +00:00
+ + cur_len ;
cur_id = last_part - > right ;
int min_len = 2 ;
int cur_age_in_sec = time ( 0 ) - oldest_modification_time ;
2014-07-23 15:24:45 +00:00
/// Если куски больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if ( cur_max > 1024 * 1024 * 1024 & & cur_age_in_sec < 6 * 3600 )
2014-03-13 12:48:07 +00:00
min_len = 3 ;
2014-04-11 13:05:17 +00:00
/// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
2014-07-23 15:24:45 +00:00
size_t oldness_coef = ( size_in_bytes_of_remaining_parts + first_part - > size_in_bytes - cur_sum + 0.0 ) / cur_max ;
2014-03-13 12:48:07 +00:00
2014-04-10 12:30:59 +00:00
/// Эвристика: если после этой группы кусков еще накопилось мало строк, не будем соглашаться на плохо
2014-04-11 13:05:17 +00:00
/// сбалансированные слияния, расчитывая, что после будущих вставок данных появятся более привлекательные слияния.
2014-04-10 12:30:59 +00:00
double ratio = ( oldness_coef + 1 ) * data . settings . size_ratio_coefficient_to_merge_parts ;
2014-03-13 12:48:07 +00:00
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
if ( cur_len > = min_len
& & ( static_cast < double > ( cur_max ) / ( cur_sum - cur_max ) < ratio
2014-04-10 12:30:59 +00:00
/// З а старый месяц объединяем что угодно, если разрешено и если этому куску хотя бы 5 дней
2014-07-23 15:24:45 +00:00
| | ( is_old_month & & merge_anything_for_old_months & & cur_age_in_sec > 3600 * 24 * 5 )
2014-03-13 12:48:07 +00:00
/// Если слияние "агрессивное", то сливаем что угодно
| | aggressive ) )
{
/// Достаточно места на диске, чтобы покрыть новый мердж с запасом.
2014-07-23 15:24:45 +00:00
if ( available_disk_space > cur_sum * DISK_USAGE_COEFFICIENT_TO_SELECT )
2014-03-13 12:48:07 +00:00
{
cur_longest_max = cur_max ;
cur_longest_min = cur_min ;
cur_longest_len = cur_len ;
}
else
2014-05-21 10:20:41 +00:00
{
time_t now = time ( 0 ) ;
if ( now - disk_space_warning_time > 3600 )
{
disk_space_warning_time = now ;
LOG_WARNING ( log , " Won't merge parts from " < < first_part - > name < < " to " < < last_part - > name
< < " because not enough free space: " < < available_disk_space < < " free and unreserved, "
2014-07-23 15:24:45 +00:00
< < cur_sum < < " required now (+ " < < static_cast < int > ( ( DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0 ) * 100 )
2014-05-21 10:20:41 +00:00
< < " % on overhead); suppressing similar warnings for the next hour " ) ;
}
2014-05-21 10:37:53 +00:00
break ;
2014-05-21 10:20:41 +00:00
}
2014-03-13 12:48:07 +00:00
}
}
/// Это максимальный по включению валидный отрезок.
if ( cur_longest_len > max_count_from_left )
{
max_count_from_left = cur_longest_len ;
if ( ! found
2014-07-21 09:11:20 +00:00
| | std : : forward_as_tuple ( cur_longest_max , cur_longest_min , - cur_longest_len )
< std : : forward_as_tuple ( min_max , min_min , - max_len ) )
2014-03-13 12:48:07 +00:00
{
found = true ;
min_max = cur_longest_max ;
min_min = cur_longest_min ;
max_len = cur_longest_len ;
best_begin = it ;
}
}
}
if ( found )
{
parts . clear ( ) ;
2014-04-04 10:37:33 +00:00
DayNum_t left_date = DayNum_t ( std : : numeric_limits < UInt16 > : : max ( ) ) ;
DayNum_t right_date = DayNum_t ( std : : numeric_limits < UInt16 > : : min ( ) ) ;
UInt32 level = 0 ;
2014-03-13 12:48:07 +00:00
MergeTreeData : : DataParts : : iterator it = best_begin ;
for ( int i = 0 ; i < max_len ; + + i )
{
parts . push_back ( * it ) ;
2014-04-04 10:37:33 +00:00
level = std : : max ( level , parts [ i ] - > level ) ;
left_date = std : : min ( left_date , parts [ i ] - > left_date ) ;
right_date = std : : max ( right_date , parts [ i ] - > right_date ) ;
2014-03-13 12:48:07 +00:00
+ + it ;
}
2014-05-26 10:41:40 +00:00
merged_name = ActiveDataPartSet : : getPartName (
2014-04-04 10:37:33 +00:00
left_date , right_date , parts . front ( ) - > left , parts . back ( ) - > right , level + 1 ) ;
2014-04-11 13:05:17 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name
< < ( only_small ? " (only small) " : " " ) ) ;
2014-03-13 12:48:07 +00:00
}
return found ;
}
/// parts должны быть отсортированы.
2014-07-01 15:58:25 +00:00
MergeTreeData : : DataPartPtr MergeTreeDataMerger : : mergeParts (
const MergeTreeData : : DataPartsVector & parts , const String & merged_name , MergeTreeData : : Transaction * out_transaction )
2014-03-13 12:48:07 +00:00
{
2014-06-30 13:57:20 +00:00
LOG_DEBUG ( log , " Merging " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name < < " into " < < merged_name ) ;
2014-03-13 12:48:07 +00:00
2014-07-15 09:56:17 +00:00
NameSet union_columns_set ;
for ( const MergeTreeData : : DataPartPtr & part : parts )
{
Poco : : ScopedReadRWLock part_lock ( part - > columns_lock ) ;
Names part_columns = part - > columns . getNames ( ) ;
union_columns_set . insert ( part_columns . begin ( ) , part_columns . end ( ) ) ;
}
2014-03-19 10:45:13 +00:00
NamesAndTypesList columns_list = data . getColumnsList ( ) ;
2014-07-15 09:56:17 +00:00
NamesAndTypesList union_columns = columns_list . filter ( union_columns_set ) ;
Names union_column_names = union_columns . getNames ( ) ;
2014-03-13 12:48:07 +00:00
2014-03-14 17:19:38 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part = std : : make_shared < MergeTreeData : : DataPart > ( data ) ;
2014-05-26 10:41:40 +00:00
ActiveDataPartSet : : parsePartName ( merged_name , * new_data_part ) ;
2014-04-07 15:45:46 +00:00
new_data_part - > name = " tmp_ " + merged_name ;
2014-07-17 10:44:17 +00:00
new_data_part - > is_temp = true ;
2014-03-13 12:48:07 +00:00
/** Читаем из всех кусков, сливаем и пишем в новый.
* П о п у т н о в ы ч и с л я е м в ы р а ж е н и е д л я с о р т и р о в к и .
*/
BlockInputStreams src_streams ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
MarkRanges ranges ( 1 , MarkRange ( 0 , parts [ i ] - > size ) ) ;
src_streams . push_back ( new ExpressionBlockInputStream ( new MergeTreeBlockInputStream (
2014-07-15 09:56:17 +00:00
data . getFullPath ( ) + parts [ i ] - > name + ' / ' , DEFAULT_MERGE_BLOCK_SIZE , union_column_names , data ,
2014-04-08 07:58:53 +00:00
parts [ i ] , ranges , false , nullptr , " " ) , data . getPrimaryExpression ( ) ) ) ;
2014-03-13 12:48:07 +00:00
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
2014-06-30 10:02:25 +00:00
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска,
/// то есть (примерного) возрастания времени вставки.
2014-03-13 12:48:07 +00:00
BlockInputStreamPtr merged_stream ;
switch ( data . mode )
{
case MergeTreeData : : Ordinary :
2014-03-14 17:03:52 +00:00
merged_stream = new MergingSortedBlockInputStream ( src_streams , data . getSortDescription ( ) , DEFAULT_MERGE_BLOCK_SIZE ) ;
2014-03-13 12:48:07 +00:00
break ;
case MergeTreeData : : Collapsing :
2014-03-14 17:03:52 +00:00
merged_stream = new CollapsingSortedBlockInputStream ( src_streams , data . getSortDescription ( ) , data . sign_column , DEFAULT_MERGE_BLOCK_SIZE ) ;
2014-03-13 12:48:07 +00:00
break ;
case MergeTreeData : : Summing :
2014-03-14 17:03:52 +00:00
merged_stream = new SummingSortedBlockInputStream ( src_streams , data . getSortDescription ( ) , DEFAULT_MERGE_BLOCK_SIZE ) ;
2014-03-13 12:48:07 +00:00
break ;
2014-05-26 16:11:20 +00:00
case MergeTreeData : : Aggregating :
merged_stream = new AggregatingSortedBlockInputStream ( src_streams , data . getSortDescription ( ) , DEFAULT_MERGE_BLOCK_SIZE ) ;
break ;
2014-03-13 12:48:07 +00:00
default :
throw Exception ( " Unknown mode of operation for MergeTreeData: " + toString ( data . mode ) , ErrorCodes : : LOGICAL_ERROR ) ;
}
2014-04-07 15:45:46 +00:00
String new_part_tmp_path = data . getFullPath ( ) + " tmp_ " + merged_name + " / " ;
2014-03-27 12:32:37 +00:00
2014-07-15 09:56:17 +00:00
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream ( data , new_part_tmp_path , union_columns ) ;
2014-03-13 12:48:07 +00:00
merged_stream - > readPrefix ( ) ;
to - > writePrefix ( ) ;
Block block ;
while ( ! canceled & & ( block = merged_stream - > read ( ) ) )
to - > write ( block ) ;
if ( canceled )
2014-04-04 13:27:47 +00:00
throw Exception ( " Canceled merging parts " , ErrorCodes : : ABORTED ) ;
2014-03-13 12:48:07 +00:00
merged_stream - > readSuffix ( ) ;
2014-07-15 09:56:17 +00:00
new_data_part - > columns = union_columns ;
2014-03-27 17:30:04 +00:00
new_data_part - > checksums = to - > writeSuffixAndGetChecksums ( ) ;
2014-03-27 12:32:37 +00:00
new_data_part - > index . swap ( to - > getIndex ( ) ) ;
2014-04-04 17:20:45 +00:00
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
if ( 0 = = to - > marksCount ( ) )
2014-03-13 12:48:07 +00:00
throw Exception ( " Empty part after merge " , ErrorCodes : : LOGICAL_ERROR ) ;
new_data_part - > size = to - > marksCount ( ) ;
new_data_part - > modification_time = time ( 0 ) ;
2014-05-14 17:51:37 +00:00
new_data_part - > size_in_bytes = MergeTreeData : : DataPart : : calcTotalSize ( new_part_tmp_path ) ;
2014-03-13 12:48:07 +00:00
2014-04-07 15:45:46 +00:00
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
2014-07-01 15:58:25 +00:00
auto replaced_parts = data . renameTempPartAndReplace ( new_data_part , nullptr , out_transaction ) ;
2014-03-13 12:48:07 +00:00
2014-04-07 15:45:46 +00:00
if ( new_data_part - > name ! = merged_name )
LOG_ERROR ( log , " Unexpected part name: " < < new_data_part - > name < < " instead of " < < merged_name ) ;
/// Проверим, что удалились все исходные куски и только они.
if ( replaced_parts . size ( ) ! = parts . size ( ) )
{
2014-05-27 12:08:40 +00:00
LOG_WARNING ( log , " Unexpected number of parts removed when adding " < < new_data_part - > name < < " : " < < replaced_parts . size ( )
2014-04-07 15:45:46 +00:00
< < " instead of " < < parts . size ( ) ) ;
}
else
{
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
if ( parts [ i ] - > name ! = replaced_parts [ i ] - > name )
{
LOG_ERROR ( log , " Unexpected part removed when adding " < < new_data_part - > name < < " : " < < replaced_parts [ i ] - > name
< < " instead of " < < parts [ i ] - > name ) ;
}
}
}
2014-03-13 12:48:07 +00:00
LOG_TRACE ( log , " Merged " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2014-04-04 13:27:47 +00:00
return new_data_part ;
2014-03-13 12:48:07 +00:00
}
2014-03-13 17:44:00 +00:00
size_t MergeTreeDataMerger : : estimateDiskSpaceForMerge ( const MergeTreeData : : DataPartsVector & parts )
{
size_t res = 0 ;
for ( const MergeTreeData : : DataPartPtr & part : parts )
{
res + = part - > size_in_bytes ;
}
return static_cast < size_t > ( res * DISK_USAGE_COEFFICIENT_TO_RESERVE ) ;
}
2014-03-13 12:48:07 +00:00
}