2012-07-18 19:16:16 +00:00
# include <boost/bind.hpp>
2012-11-28 08:52:15 +00:00
# include <numeric>
2013-12-13 14:23:04 +00:00
# include <sys/statvfs.h>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
# include <Poco/DirectoryIterator.h>
2013-04-08 19:47:25 +00:00
# include <Poco/Ext/ScopedTry.h>
2012-07-19 20:32:10 +00:00
# include <Yandex/time2str.h>
2012-07-17 20:04:39 +00:00
# include <DB/Common/escapeForFileName.h>
# include <DB/IO/WriteBufferFromString.h>
# include <DB/IO/WriteBufferFromFile.h>
# include <DB/IO/CompressedWriteBuffer.h>
2012-07-21 03:45:48 +00:00
# include <DB/IO/ReadBufferFromString.h>
2012-07-19 20:32:10 +00:00
# include <DB/IO/ReadBufferFromFile.h>
# include <DB/IO/CompressedReadBuffer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Columns/ColumnsNumber.h>
2012-08-30 17:43:31 +00:00
# include <DB/Columns/ColumnArray.h>
2013-07-12 13:35:05 +00:00
# include <DB/Columns/ColumnNested.h>
2012-07-17 20:04:39 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/DataTypes/DataTypesNumberFixed.h>
2012-08-30 17:43:31 +00:00
# include <DB/DataTypes/DataTypeArray.h>
2013-07-12 13:35:05 +00:00
# include <DB/DataTypes/DataTypeNested.h>
2012-07-21 03:45:48 +00:00
2012-07-19 20:32:10 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/MergingSortedBlockInputStream.h>
2012-08-16 17:27:40 +00:00
# include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2013-10-01 20:39:26 +00:00
# include <DB/DataStreams/SummingSortedBlockInputStream.h>
2013-04-25 15:46:14 +00:00
# include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
2012-07-31 17:22:40 +00:00
# include <DB/DataStreams/ExpressionBlockInputStream.h>
2012-11-28 08:52:15 +00:00
# include <DB/DataStreams/ConcatBlockInputStream.h>
2012-07-21 06:47:17 +00:00
# include <DB/DataStreams/narrowBlockInputStreams.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/copyData.h>
2012-12-12 14:25:55 +00:00
# include <DB/DataStreams/FilterBlockInputStream.h>
2012-07-19 20:32:10 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/Parsers/ASTExpressionList.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTFunction.h>
# include <DB/Parsers/ASTLiteral.h>
2012-12-12 14:25:55 +00:00
# include <DB/Parsers/ASTIdentifier.h>
2013-08-07 13:07:42 +00:00
# include <DB/Parsers/ASTNameTypePair.h>
2012-07-21 03:45:48 +00:00
2012-07-17 20:04:39 +00:00
# include <DB/Interpreters/sortBlock.h>
2013-06-03 13:17:17 +00:00
# include <DB/Interpreters/ExpressionAnalyzer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Storages/StorageMergeTree.h>
2013-04-24 10:31:32 +00:00
# include <DB/Storages/MergeTree/PKCondition.h>
# include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
# include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
# include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
2012-07-19 20:32:10 +00:00
2013-08-08 09:50:15 +00:00
# include <algorithm>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
2013-12-13 14:23:04 +00:00
size_t StorageMergeTree : : total_size_of_currently_merging_parts = 0 ;
2012-07-17 20:04:39 +00:00
StorageMergeTree : : StorageMergeTree (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
2013-05-05 18:02:05 +00:00
const Context & context_ ,
2012-12-12 14:25:55 +00:00
ASTPtr & primary_expr_ast_ ,
2012-12-12 15:45:08 +00:00
const String & date_column_name_ , const ASTPtr & sampling_expression_ ,
2012-08-16 17:27:40 +00:00
size_t index_granularity_ ,
2013-09-30 19:54:25 +00:00
Mode mode_ ,
2012-08-20 05:32:50 +00:00
const String & sign_column_ ,
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_ )
2012-07-17 20:04:39 +00:00
: path ( path_ ) , name ( name_ ) , full_path ( path + escapeForFileName ( name ) + ' / ' ) , columns ( columns_ ) ,
context ( context_ ) , primary_expr_ast ( primary_expr_ast_ - > clone ( ) ) ,
2012-12-12 15:45:08 +00:00
date_column_name ( date_column_name_ ) , sampling_expression ( sampling_expression_ ) ,
2012-12-12 14:25:55 +00:00
index_granularity ( index_granularity_ ) ,
2013-09-30 19:54:25 +00:00
mode ( mode_ ) , sign_column ( sign_column_ ) ,
2012-08-29 20:23:19 +00:00
settings ( settings_ ) ,
2013-09-30 01:29:19 +00:00
increment ( full_path + " increment.txt " ) , log ( & Logger : : get ( " StorageMergeTree: " + name ) ) , shutdown_called ( false ) ,
2013-08-07 13:07:42 +00:00
file_name_regexp ( " ^( \\ d{8})_( \\ d{8})_( \\ d+)_( \\ d+)_( \\ d+) " )
2012-07-17 20:04:39 +00:00
{
2012-12-06 13:07:29 +00:00
min_marks_for_seek = ( settings . min_rows_for_seek + index_granularity - 1 ) / index_granularity ;
min_marks_for_concurrent_read = ( settings . min_rows_for_concurrent_read + index_granularity - 1 ) / index_granularity ;
2013-09-08 07:30:52 +00:00
max_marks_to_use_cache = ( settings . max_rows_to_use_cache + index_granularity - 1 ) / index_granularity ;
2012-12-06 13:07:29 +00:00
2012-07-17 20:04:39 +00:00
/// создаём директорию, если её нет
Poco : : File ( full_path ) . createDirectories ( ) ;
/// инициализируем описание сортировки
sort_descr . reserve ( primary_expr_ast - > children . size ( ) ) ;
for ( ASTs : : iterator it = primary_expr_ast - > children . begin ( ) ;
it ! = primary_expr_ast - > children . end ( ) ;
+ + it )
{
2012-07-18 20:14:41 +00:00
String name = ( * it ) - > getColumnName ( ) ;
2012-07-17 20:04:39 +00:00
sort_descr . push_back ( SortColumnDescription ( name , 1 ) ) ;
}
2012-07-18 20:14:41 +00:00
2013-06-03 13:17:17 +00:00
primary_expr = ExpressionAnalyzer ( primary_expr_ast , context , * columns ) . getActions ( false ) ;
2013-06-04 11:00:33 +00:00
ExpressionActionsPtr projected_expr = ExpressionAnalyzer ( primary_expr_ast , context , * columns ) . getActions ( true ) ;
primary_key_sample = projected_expr - > getSampleBlock ( ) ;
2012-07-19 20:32:10 +00:00
2012-11-28 08:52:15 +00:00
merge_threads = new boost : : threadpool : : pool ( settings . merging_threads ) ;
2012-07-19 20:32:10 +00:00
loadDataParts ( ) ;
2014-02-28 10:32:07 +00:00
clearOldParts ( ) ;
2013-10-25 12:15:12 +00:00
UInt64 max_part_id = 0 ;
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
{
max_part_id = std : : max ( max_part_id , ( * it ) - > right ) ;
}
increment . fixIfBroken ( max_part_id ) ;
2012-07-17 20:04:39 +00:00
}
2013-02-06 11:26:35 +00:00
StoragePtr StorageMergeTree : : create (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
2013-05-05 18:02:05 +00:00
const Context & context_ ,
2013-02-06 11:26:35 +00:00
ASTPtr & primary_expr_ast_ ,
const String & date_column_name_ , const ASTPtr & sampling_expression_ ,
size_t index_granularity_ ,
2013-09-30 19:54:25 +00:00
Mode mode_ ,
2013-02-06 11:26:35 +00:00
const String & sign_column_ ,
const StorageMergeTreeSettings & settings_ )
{
2013-09-30 19:54:25 +00:00
return ( new StorageMergeTree (
path_ , name_ , columns_ , context_ , primary_expr_ast_ , date_column_name_ ,
sampling_expression_ , index_granularity_ , mode_ , sign_column_ , settings_ ) ) - > thisPtr ( ) ;
2013-02-06 11:26:35 +00:00
}
2012-07-17 20:04:39 +00:00
2013-09-30 01:29:19 +00:00
void StorageMergeTree : : shutdown ( )
2012-07-30 20:32:36 +00:00
{
2013-09-30 01:29:19 +00:00
if ( shutdown_called )
return ;
shutdown_called = true ;
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-07-30 20:32:36 +00:00
}
2013-09-30 01:29:19 +00:00
StorageMergeTree : : ~ StorageMergeTree ( )
{
shutdown ( ) ;
}
2012-07-18 19:44:04 +00:00
BlockOutputStreamPtr StorageMergeTree : : write ( ASTPtr query )
{
2013-01-23 17:38:03 +00:00
return new MergeTreeBlockOutputStream ( thisPtr ( ) ) ;
2012-07-18 19:44:04 +00:00
}
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree : : read (
2012-12-12 16:11:27 +00:00
const Names & column_names_to_return ,
2012-07-21 05:07:14 +00:00
ASTPtr query ,
2013-02-01 19:02:04 +00:00
const Settings & settings ,
2012-07-21 05:07:14 +00:00
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
2014-02-28 14:25:15 +00:00
Poco : : ScopedReadRWLock lock ( read_lock ) ;
2014-03-04 11:30:50 +00:00
2012-12-15 02:11:37 +00:00
check ( column_names_to_return ) ;
processed_stage = QueryProcessingStage : : FetchColumns ;
2013-05-06 12:15:34 +00:00
PKCondition key_condition ( query , context , * columns , sort_descr ) ;
PKCondition date_condition ( query , context , * columns , SortDescription ( 1 , SortColumnDescription ( date_column_name , 1 ) ) ) ;
2012-07-21 03:45:48 +00:00
2012-12-13 10:08:54 +00:00
typedef std : : vector < DataPartPtr > PartsList ;
PartsList parts ;
2013-01-09 10:19:24 +00:00
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition.
2012-12-13 10:08:54 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
2013-12-09 00:29:24 +00:00
2012-12-13 10:08:54 +00:00
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2013-12-09 00:29:24 +00:00
{
Field left = static_cast < UInt64 > ( ( * it ) - > left_date ) ;
Field right = static_cast < UInt64 > ( ( * it ) - > right_date ) ;
if ( date_condition . mayBeTrueInRange ( & left , & right ) )
2012-12-13 10:08:54 +00:00
parts . push_back ( * it ) ;
2013-12-09 00:29:24 +00:00
}
2012-12-13 10:08:54 +00:00
}
2012-12-12 16:26:18 +00:00
/// Семплирование.
2012-12-12 16:11:27 +00:00
Names column_names_to_read = column_names_to_return ;
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_value_limit = 0 ;
2012-12-12 16:26:18 +00:00
typedef Poco : : SharedPtr < ASTFunction > ASTFunctionPtr ;
ASTFunctionPtr filter_function ;
2013-06-03 13:17:17 +00:00
ExpressionActionsPtr filter_expression ;
2012-12-12 14:25:55 +00:00
ASTSelectQuery & select = * dynamic_cast < ASTSelectQuery * > ( & * query ) ;
if ( select . sample_size )
{
2013-01-05 20:03:19 +00:00
double size = apply_visitor ( FieldVisitorConvertToNumber < double > ( ) ,
2013-12-11 20:44:06 +00:00
dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-12 14:25:55 +00:00
if ( size < 0 )
throw Exception ( " Negative sample size " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
2013-12-11 20:44:06 +00:00
2012-12-12 14:25:55 +00:00
if ( size > 1 )
{
2013-01-05 20:03:19 +00:00
size_t requested_count = apply_visitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-13 10:08:54 +00:00
/// Узнаем, сколько строк мы бы прочли без семплирования.
LOG_DEBUG ( log , " Preliminary index scan with condition: " < < key_condition . toString ( ) ) ;
2012-12-13 09:32:08 +00:00
size_t total_count = 0 ;
2012-12-13 10:08:54 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
2012-12-13 09:32:08 +00:00
{
2012-12-13 10:08:54 +00:00
DataPartPtr & part = parts [ i ] ;
2013-12-09 00:29:24 +00:00
MarkRanges ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( part - > index , * this , key_condition ) ;
2012-12-13 10:08:54 +00:00
for ( size_t j = 0 ; j < ranges . size ( ) ; + + j )
total_count + = ranges [ j ] . end - ranges [ j ] . begin ;
2012-12-13 09:32:08 +00:00
}
total_count * = index_granularity ;
size = std : : min ( 1. , static_cast < double > ( requested_count ) / total_count ) ;
2012-12-13 09:38:36 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Selected relative sample size: " < < size ) ;
2012-12-12 14:25:55 +00:00
}
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_max = 0 ;
2013-06-03 13:17:17 +00:00
DataTypePtr type = primary_expr - > getSampleBlock ( ) . getByName ( sampling_expression - > getColumnName ( ) ) . type ;
2012-12-13 09:32:08 +00:00
if ( type - > getName ( ) = = " UInt64 " )
sampling_column_max = std : : numeric_limits < UInt64 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt32 " )
sampling_column_max = std : : numeric_limits < UInt32 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt16 " )
sampling_column_max = std : : numeric_limits < UInt16 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt8 " )
sampling_column_max = std : : numeric_limits < UInt8 > : : max ( ) ;
2012-12-12 14:25:55 +00:00
else
2012-12-13 09:32:08 +00:00
throw Exception ( " Invalid sampling column type in storage parameters: " + type - > getName ( ) + " . Must be unsigned integer type. " , ErrorCodes : : ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ) ;
2012-12-13 10:08:54 +00:00
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
2012-12-13 09:32:08 +00:00
sampling_column_value_limit = static_cast < UInt64 > ( size * sampling_column_max ) ;
if ( ! key_condition . addCondition ( sampling_expression - > getColumnName ( ) ,
2013-12-11 20:44:06 +00:00
Range : : createRightBounded ( sampling_column_value_limit , true ) ) )
2012-12-13 09:32:08 +00:00
throw Exception ( " Sampling column not in primary key " , ErrorCodes : : ILLEGAL_COLUMN ) ;
/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit
2012-12-12 16:26:18 +00:00
ASTPtr filter_function_args = new ASTExpressionList ;
filter_function_args - > children . push_back ( sampling_expression ) ;
2012-12-13 09:32:08 +00:00
filter_function_args - > children . push_back ( new ASTLiteral ( StringRange ( ) , sampling_column_value_limit ) ) ;
2012-12-12 16:26:18 +00:00
filter_function = new ASTFunction ;
filter_function - > name = " lessOrEquals " ;
filter_function - > arguments = filter_function_args ;
filter_function - > children . push_back ( filter_function - > arguments ) ;
2013-06-03 13:17:17 +00:00
filter_expression = ExpressionAnalyzer ( filter_function , context , * columns ) . getActions ( false ) ;
2012-12-12 16:26:18 +00:00
2013-04-25 11:37:20 +00:00
/// Добавим столбцы, нужные для sampling_expression.
2012-12-12 16:26:18 +00:00
std : : vector < String > add_columns = filter_expression - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
}
2012-12-12 14:25:55 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Key condition: " < < key_condition . toString ( ) ) ;
LOG_DEBUG ( log , " Date condition: " < < date_condition . toString ( ) ) ;
2013-11-28 13:16:46 +00:00
/// PREWHERE
ExpressionActionsPtr prewhere_actions ;
String prewhere_column ;
if ( select . prewhere_expression )
{
ExpressionAnalyzer analyzer ( select . prewhere_expression , context , * columns ) ;
prewhere_actions = analyzer . getActions ( false ) ;
prewhere_column = select . prewhere_expression - > getColumnName ( ) ;
}
2012-11-28 08:52:15 +00:00
2012-12-06 11:48:41 +00:00
RangesInDataParts parts_with_ranges ;
2012-11-28 08:52:15 +00:00
/// Найдем, какой диапазон читать из каждого куска.
size_t sum_marks = 0 ;
2012-12-06 11:48:41 +00:00
size_t sum_ranges = 0 ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 11:48:41 +00:00
DataPartPtr & part = parts [ i ] ;
RangesInDataPart ranges ( part ) ;
2013-12-09 00:29:24 +00:00
ranges . ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( part - > index , * this , key_condition ) ;
2012-12-06 11:48:41 +00:00
if ( ! ranges . ranges . empty ( ) )
{
parts_with_ranges . push_back ( ranges ) ;
sum_ranges + = ranges . ranges . size ( ) ;
for ( size_t j = 0 ; j < ranges . ranges . size ( ) ; + + j )
{
sum_marks + = ranges . ranges [ j ] . end - ranges . ranges [ j ] . begin ;
}
}
2012-11-28 08:52:15 +00:00
}
2012-12-06 12:24:08 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts by date, " < < parts_with_ranges . size ( ) < < " parts by key, "
2012-12-06 11:48:41 +00:00
< < sum_marks < < " marks to read from " < < sum_ranges < < " ranges " ) ;
2012-11-28 08:52:15 +00:00
2013-04-23 11:08:41 +00:00
BlockInputStreams res ;
if ( select . final )
{
2013-04-25 11:37:20 +00:00
/// Добавим столбцы, нужные для вычисления первичного ключа и знака.
2013-04-24 11:39:12 +00:00
std : : vector < String > add_columns = primary_expr - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
2013-04-25 11:37:20 +00:00
column_names_to_read . push_back ( sign_column ) ;
2013-04-24 11:39:12 +00:00
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
2013-11-28 13:16:46 +00:00
res = spreadMarkRangesAmongThreadsFinal (
parts_with_ranges ,
threads ,
column_names_to_read ,
max_block_size ,
settings . use_uncompressed_cache ,
prewhere_actions ,
prewhere_column ) ;
2013-04-23 11:08:41 +00:00
}
else
{
2013-11-28 13:16:46 +00:00
res = spreadMarkRangesAmongThreads (
parts_with_ranges ,
threads ,
column_names_to_read ,
max_block_size ,
settings . use_uncompressed_cache ,
prewhere_actions ,
prewhere_column ) ;
2013-04-23 11:08:41 +00:00
}
2012-12-12 14:25:55 +00:00
2012-12-13 09:32:08 +00:00
if ( select . sample_size )
2012-12-12 14:25:55 +00:00
{
for ( size_t i = 0 ; i < res . size ( ) ; + + i )
{
BlockInputStreamPtr original_stream = res [ i ] ;
2013-06-03 13:17:17 +00:00
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream ( original_stream , filter_expression ) ;
2012-12-12 14:25:55 +00:00
BlockInputStreamPtr filter_stream = new FilterBlockInputStream ( expression_stream , filter_function - > getColumnName ( ) ) ;
res [ i ] = filter_stream ;
}
}
return res ;
2012-12-06 09:45:09 +00:00
}
/// Примерно поровну распределить засечки между потоками.
2013-09-08 05:53:10 +00:00
BlockInputStreams StorageMergeTree : : spreadMarkRangesAmongThreads (
2013-11-28 13:16:46 +00:00
RangesInDataParts parts ,
size_t threads ,
const Names & column_names ,
size_t max_block_size ,
bool use_uncompressed_cache ,
ExpressionActionsPtr prewhere_actions ,
const String & prewhere_column )
2012-12-06 09:45:09 +00:00
{
/// Н а всякий случай перемешаем куски.
std : : random_shuffle ( parts . begin ( ) , parts . end ( ) ) ;
/// Посчитаем засечки для каждого куска.
std : : vector < size_t > sum_marks_in_parts ( parts . size ( ) ) ;
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std : : reverse ( parts [ i ] . ranges . begin ( ) , parts [ i ] . ranges . end ( ) ) ;
sum_marks_in_parts [ i ] = 0 ;
for ( size_t j = 0 ; j < parts [ i ] . ranges . size ( ) ; + + j )
{
MarkRange & range = parts [ i ] . ranges [ j ] ;
sum_marks_in_parts [ i ] + = range . end - range . begin ;
}
sum_marks + = sum_marks_in_parts [ i ] ;
}
2013-09-08 07:30:52 +00:00
if ( sum_marks > max_marks_to_use_cache )
use_uncompressed_cache = false ;
2012-12-06 09:45:09 +00:00
2012-11-28 08:52:15 +00:00
BlockInputStreams res ;
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( sum_marks > 0 )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t min_marks_per_thread = ( sum_marks - 1 ) / threads + 1 ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
for ( size_t i = 0 ; i < threads & & ! parts . empty ( ) ; + + i )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t need_marks = min_marks_per_thread ;
2012-11-30 08:33:36 +00:00
BlockInputStreams streams ;
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по кускам.
while ( need_marks > 0 & & ! parts . empty ( ) )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
RangesInDataPart & part = parts . back ( ) ;
size_t & marks_in_part = sum_marks_in_parts . back ( ) ;
/// Н е будем брать из куска слишком мало строк.
2012-12-06 13:07:29 +00:00
if ( marks_in_part > = min_marks_for_concurrent_read & &
need_marks < min_marks_for_concurrent_read )
need_marks = min_marks_for_concurrent_read ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Н е будем оставлять в куске слишком мало строк.
if ( marks_in_part > need_marks & &
2012-12-06 13:07:29 +00:00
marks_in_part - need_marks < min_marks_for_concurrent_read )
2012-12-06 09:45:09 +00:00
need_marks = marks_in_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Возьмем весь кусок, если он достаточно мал.
if ( marks_in_part < = need_marks )
2012-11-30 08:33:36 +00:00
{
2012-12-06 09:45:09 +00:00
/// Восстановим порядок отрезков.
std : : reverse ( part . ranges . begin ( ) , part . ranges . end ( ) ) ;
2013-09-08 05:53:10 +00:00
streams . push_back ( new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
2013-11-28 13:16:46 +00:00
part . data_part , part . ranges , thisPtr ( ) , use_uncompressed_cache ,
prewhere_actions , prewhere_column ) ) ;
2012-12-06 09:45:09 +00:00
need_marks - = marks_in_part ;
parts . pop_back ( ) ;
sum_marks_in_parts . pop_back ( ) ;
2012-11-30 08:33:36 +00:00
continue ;
}
2012-12-06 09:45:09 +00:00
MarkRanges ranges_to_get_from_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по отрезкам куска.
while ( need_marks > 0 )
{
if ( part . ranges . empty ( ) )
throw Exception ( " Unexpected end of ranges while spreading marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
MarkRange & range = part . ranges . back ( ) ;
size_t marks_in_range = range . end - range . begin ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
size_t marks_to_get_from_range = std : : min ( marks_in_range , need_marks ) ;
ranges_to_get_from_part . push_back ( MarkRange ( range . begin , range . begin + marks_to_get_from_range ) ) ;
range . begin + = marks_to_get_from_range ;
marks_in_part - = marks_to_get_from_range ;
need_marks - = marks_to_get_from_range ;
if ( range . begin = = range . end )
part . ranges . pop_back ( ) ;
}
2012-11-30 08:33:36 +00:00
2013-09-08 05:53:10 +00:00
streams . push_back ( new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
2013-11-28 13:16:46 +00:00
part . data_part , ranges_to_get_from_part , thisPtr ( ) , use_uncompressed_cache ,
prewhere_actions , prewhere_column ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( streams . size ( ) = = 1 )
res . push_back ( streams [ 0 ] ) ;
2012-11-29 08:41:20 +00:00
else
2012-11-30 08:33:36 +00:00
res . push_back ( new ConcatBlockInputStream ( streams ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
if ( ! parts . empty ( ) )
2012-12-03 09:39:04 +00:00
throw Exception ( " Couldn't spread marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 11:10:05 +00:00
return res ;
2012-07-21 03:45:48 +00:00
}
2013-04-24 11:39:12 +00:00
/// Распределить засечки между потоками и сделать, чтобы в ответе (почти) все данные были сколлапсированы (модификатор FINAL).
2013-09-08 05:53:10 +00:00
BlockInputStreams StorageMergeTree : : spreadMarkRangesAmongThreadsFinal (
2013-11-28 13:16:46 +00:00
RangesInDataParts parts ,
size_t threads ,
const Names & column_names ,
size_t max_block_size ,
bool use_uncompressed_cache ,
ExpressionActionsPtr prewhere_actions ,
const String & prewhere_column )
2013-04-23 11:08:41 +00:00
{
2013-09-08 07:30:52 +00:00
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
for ( size_t j = 0 ; j < parts [ i ] . ranges . size ( ) ; + + j )
sum_marks + = parts [ i ] . ranges [ j ] . end - parts [ i ] . ranges [ j ] . begin ;
if ( sum_marks > max_marks_to_use_cache )
use_uncompressed_cache = false ;
2013-06-03 13:17:17 +00:00
ExpressionActionsPtr sign_filter_expression ;
2013-04-26 13:20:42 +00:00
String sign_filter_column ;
createPositiveSignCondition ( sign_filter_expression , sign_filter_column ) ;
2013-04-24 12:17:29 +00:00
BlockInputStreams to_collapse ;
2013-04-23 11:08:41 +00:00
for ( size_t part_index = 0 ; part_index < parts . size ( ) ; + + part_index )
{
RangesInDataPart & part = parts [ part_index ] ;
2013-09-08 05:53:10 +00:00
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
2013-11-28 13:16:46 +00:00
part . data_part , part . ranges , thisPtr ( ) , use_uncompressed_cache ,
prewhere_actions , prewhere_column ) ;
2013-04-24 12:17:29 +00:00
2013-06-03 13:17:17 +00:00
to_collapse . push_back ( new ExpressionBlockInputStream ( source_stream , primary_expr ) ) ;
2013-04-23 11:08:41 +00:00
}
2013-05-20 15:48:38 +00:00
BlockInputStreams res ;
2013-04-24 12:17:29 +00:00
if ( to_collapse . size ( ) = = 1 )
2013-06-03 13:17:17 +00:00
res . push_back ( new FilterBlockInputStream ( new ExpressionBlockInputStream ( to_collapse [ 0 ] , sign_filter_expression ) , sign_filter_column ) ) ;
2013-04-24 12:17:29 +00:00
else if ( to_collapse . size ( ) > 1 )
2013-04-25 15:46:14 +00:00
res . push_back ( new CollapsingFinalBlockInputStream ( to_collapse , sort_descr , sign_column ) ) ;
2013-04-24 12:17:29 +00:00
return res ;
2013-04-23 11:08:41 +00:00
}
2013-06-03 13:17:17 +00:00
void StorageMergeTree : : createPositiveSignCondition ( ExpressionActionsPtr & out_expression , String & out_column )
2013-04-26 13:20:42 +00:00
{
ASTFunction * function = new ASTFunction ;
ASTPtr function_ptr = function ;
ASTExpressionList * arguments = new ASTExpressionList ;
ASTPtr arguments_ptr = arguments ;
ASTIdentifier * sign = new ASTIdentifier ;
ASTPtr sign_ptr = sign ;
ASTLiteral * one = new ASTLiteral ;
ASTPtr one_ptr = one ;
function - > name = " equals " ;
function - > arguments = arguments_ptr ;
function - > children . push_back ( arguments_ptr ) ;
arguments - > children . push_back ( sign_ptr ) ;
arguments - > children . push_back ( one_ptr ) ;
sign - > name = sign_column ;
sign - > kind = ASTIdentifier : : Column ;
one - > type = new DataTypeInt8 ;
one - > value = Field ( static_cast < Int64 > ( 1 ) ) ;
2013-06-03 13:17:17 +00:00
out_expression = ExpressionAnalyzer ( function_ptr , context , * columns ) . getActions ( false ) ;
2013-04-26 13:20:42 +00:00
out_column = function - > getColumnName ( ) ;
}
2013-08-11 03:40:14 +00:00
String StorageMergeTree : : getPartName ( DayNum_t left_date , DayNum_t right_date , UInt64 left_id , UInt64 right_id , UInt64 level )
2012-07-17 20:04:39 +00:00
{
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
2012-07-17 20:04:39 +00:00
String res ;
{
2013-08-11 03:40:14 +00:00
unsigned left_date_id = Date2OrderedIdentifier ( date_lut . fromDayNum ( left_date ) ) ;
unsigned right_date_id = Date2OrderedIdentifier ( date_lut . fromDayNum ( right_date ) ) ;
2012-07-17 20:04:39 +00:00
WriteBufferFromString wb ( res ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( left_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( right_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
writeIntText ( left_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( right_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( level , wb ) ;
}
return res ;
}
2012-07-19 20:32:10 +00:00
2013-10-03 12:46:17 +00:00
void StorageMergeTree : : parsePartName ( const String & file_name , const Poco : : RegularExpression : : MatchVec & matches , DataPart & part )
{
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
part . left_date = date_lut . toDayNum ( OrderedIdentifier2Date ( file_name . substr ( matches [ 1 ] . offset , matches [ 1 ] . length ) ) ) ;
part . right_date = date_lut . toDayNum ( OrderedIdentifier2Date ( file_name . substr ( matches [ 2 ] . offset , matches [ 2 ] . length ) ) ) ;
part . left = parse < UInt64 > ( file_name . substr ( matches [ 3 ] . offset , matches [ 3 ] . length ) ) ;
part . right = parse < UInt64 > ( file_name . substr ( matches [ 4 ] . offset , matches [ 4 ] . length ) ) ;
part . level = parse < UInt32 > ( file_name . substr ( matches [ 5 ] . offset , matches [ 5 ] . length ) ) ;
part . left_month = date_lut . toFirstDayNumOfMonth ( part . left_date ) ;
part . right_month = date_lut . toFirstDayNumOfMonth ( part . right_date ) ;
}
2012-07-19 20:32:10 +00:00
void StorageMergeTree : : loadDataParts ( )
{
LOG_DEBUG ( log , " Loading data parts " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
data_parts . clear ( ) ;
2012-07-19 20:32:10 +00:00
2013-10-03 12:46:17 +00:00
Strings part_file_names ;
Strings old_file_names ;
2012-07-19 20:32:10 +00:00
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
2013-10-03 12:46:17 +00:00
String file_name = it . name ( ) ;
2013-09-15 03:44:32 +00:00
/// Удаляем временные директории старше суток.
if ( 0 = = file_name . compare ( 0 , strlen ( " tmp_ " ) , " tmp_ " ) )
{
Poco : : File tmp_dir ( full_path + file_name ) ;
2013-10-03 12:46:17 +00:00
2013-09-15 03:44:32 +00:00
if ( tmp_dir . isDirectory ( ) & & tmp_dir . getLastModified ( ) . epochTime ( ) + 86400 < time ( 0 ) )
{
LOG_WARNING ( log , " Removing temporary directory " < < full_path < < file_name ) ;
Poco : : File ( full_path + file_name ) . remove ( true ) ;
}
2013-10-03 12:46:17 +00:00
2013-09-15 03:44:32 +00:00
continue ;
}
2013-10-03 12:46:17 +00:00
if ( 0 = = file_name . compare ( 0 , strlen ( " old_ " ) , " old_ " ) )
old_file_names . push_back ( file_name ) ;
else
part_file_names . push_back ( file_name ) ;
}
Poco : : RegularExpression : : MatchVec matches ;
while ( ! part_file_names . empty ( ) )
{
String file_name = part_file_names . back ( ) ;
part_file_names . pop_back ( ) ;
2013-08-09 00:12:59 +00:00
if ( ! isPartDirectory ( file_name , matches ) )
2012-07-19 20:32:10 +00:00
continue ;
2013-09-15 03:44:32 +00:00
2014-03-01 20:37:04 +00:00
DataPartPtr part = new DataPart ( * this ) ;
parsePartName ( file_name , matches , * part ) ;
2014-03-04 00:17:20 +00:00
part - > name = file_name ;
2014-03-01 20:37:04 +00:00
/// Для битых кусков, которые могут образовываться после г р у б о г о перезапуска сервера, попытаться восстановить куски, из которых они сделаны.
2013-10-03 12:46:17 +00:00
if ( isBrokenPart ( full_path + file_name ) )
{
2014-03-01 20:37:04 +00:00
if ( part - > level = = 0 )
{
/// Восстановить куски нулевого уровня невозможно.
LOG_ERROR ( log , " Removing broken part " < < path + file_name < < " because is't impossible to repair. " ) ;
part - > remove ( ) ;
}
else
{
Strings new_parts = tryRestorePart ( full_path , file_name , old_file_names ) ;
part_file_names . insert ( part_file_names . begin ( ) , new_parts . begin ( ) , new_parts . end ( ) ) ;
}
2013-09-15 03:44:32 +00:00
continue ;
2013-10-03 12:46:17 +00:00
}
2012-07-19 20:32:10 +00:00
/// Размер - в количестве засечек.
2012-07-23 06:23:29 +00:00
part - > size = Poco : : File ( full_path + file_name + " / " + escapeForFileName ( columns - > front ( ) . first ) + " .mrk " ) . getSize ( )
2012-07-19 20:32:10 +00:00
/ MERGE_TREE_MARK_SIZE ;
2013-10-03 12:46:17 +00:00
part - > modification_time = Poco : : File ( full_path + file_name ) . getLastModified ( ) . epochTime ( ) ;
2012-07-19 20:32:10 +00:00
2013-12-09 00:29:24 +00:00
try
{
part - > loadIndex ( ) ;
}
catch ( . . . )
{
/// Н е будем вставлять в набор кусок с битым индексом. Пропустим кусок и позволим серверу запуститься.
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
continue ;
}
2012-08-10 20:04:34 +00:00
data_parts . insert ( part ) ;
2012-07-19 20:32:10 +00:00
}
2013-10-03 12:46:17 +00:00
2012-08-10 20:04:34 +00:00
all_data_parts = data_parts ;
2012-07-31 20:03:53 +00:00
2012-07-31 20:13:14 +00:00
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* н о п о к а к и м - т о п р и ч и н а м о с т а л и с ь л е ж а т ь в ф а й л о в о й с и с т е м е .
* У д а л е н и е ф а й л о в б у д е т п р о и з в е д е н о п о т о м в м е т о д е clearOldParts .
*/
2012-08-10 20:04:34 +00:00
if ( data_parts . size ( ) > = 2 )
2012-07-31 20:13:14 +00:00
{
2012-08-10 20:04:34 +00:00
DataParts : : iterator prev_jt = data_parts . begin ( ) ;
2012-08-07 20:37:45 +00:00
DataParts : : iterator curr_jt = prev_jt ;
+ + curr_jt ;
2012-08-10 20:04:34 +00:00
while ( curr_jt ! = data_parts . end ( ) )
2012-07-31 20:13:14 +00:00
{
2012-08-07 20:37:45 +00:00
/// Куски данных за разные месяцы рассматривать не будем
if ( ( * curr_jt ) - > left_month ! = ( * curr_jt ) - > right_month
| | ( * curr_jt ) - > right_month ! = ( * prev_jt ) - > left_month
| | ( * prev_jt ) - > left_month ! = ( * prev_jt ) - > right_month )
{
+ + prev_jt ;
+ + curr_jt ;
continue ;
}
2012-07-31 20:13:14 +00:00
2012-08-07 20:37:45 +00:00
if ( ( * curr_jt ) - > contains ( * * prev_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * curr_jt ) - > name < < " contains " < < ( * prev_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( prev_jt ) ;
2012-08-07 20:37:45 +00:00
prev_jt = curr_jt ;
+ + curr_jt ;
}
else if ( ( * prev_jt ) - > contains ( * * curr_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * prev_jt ) - > name < < " contains " < < ( * curr_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( curr_jt + + ) ;
2012-08-07 20:37:45 +00:00
}
else
{
+ + prev_jt ;
+ + curr_jt ;
}
2012-07-31 20:13:14 +00:00
}
}
2012-07-31 20:03:53 +00:00
2012-08-10 20:04:34 +00:00
LOG_DEBUG ( log , " Loaded data parts ( " < < data_parts . size ( ) < < " items) " ) ;
2012-07-19 20:32:10 +00:00
}
2012-07-23 06:23:29 +00:00
void StorageMergeTree : : clearOldParts ( )
{
Poco : : ScopedTry < Poco : : FastMutex > lock ;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if ( ! lock . lock ( & all_data_parts_mutex ) )
{
LOG_TRACE ( log , " Already clearing or modifying old parts " ) ;
return ;
}
LOG_TRACE ( log , " Clearing old parts " ) ;
for ( DataParts : : iterator it = all_data_parts . begin ( ) ; it ! = all_data_parts . end ( ) ; )
{
2012-07-31 18:25:16 +00:00
int ref_count = it - > referenceCount ( ) ;
2012-08-10 20:04:34 +00:00
if ( ref_count = = 1 ) /// После этого ref_count не может увеличиться.
2012-07-23 06:23:29 +00:00
{
2013-10-03 12:46:17 +00:00
LOG_DEBUG ( log , " 'Removing' part " < < ( * it ) - > name < < " (prepending old_ to its name) " ) ;
2012-07-23 06:23:29 +00:00
2013-10-03 12:46:17 +00:00
( * it ) - > renameToOld ( ) ;
2012-07-23 06:23:29 +00:00
all_data_parts . erase ( it + + ) ;
}
else
+ + it ;
}
2013-10-03 12:46:17 +00:00
/// Удалим старые old_ куски.
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
if ( 0 ! = it . name ( ) . compare ( 0 , strlen ( " old_ " ) , " old_ " ) )
continue ;
if ( it - > isDirectory ( ) & & it - > getLastModified ( ) . epochTime ( ) + settings . old_parts_lifetime < time ( 0 ) )
{
it - > remove ( true ) ;
}
}
2012-07-23 06:23:29 +00:00
}
2013-12-16 03:51:30 +00:00
void StorageMergeTree : : merge ( size_t iterations , bool async , bool aggressive )
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
bool while_can = false ;
2013-09-13 23:52:10 +00:00
if ( iterations = = 0 )
{
2012-11-28 08:52:15 +00:00
while_can = true ;
iterations = settings . merging_threads ;
2012-08-01 20:08:59 +00:00
}
2012-11-28 17:17:17 +00:00
for ( size_t i = 0 ; i < iterations ; + + i )
2013-12-16 03:51:30 +00:00
merge_threads - > schedule ( boost : : bind ( & StorageMergeTree : : mergeThread , this , while_can , aggressive ) ) ;
2012-11-28 08:52:15 +00:00
if ( ! async )
joinMergeThreads ( ) ;
2012-08-13 19:13:11 +00:00
}
2013-12-16 03:51:30 +00:00
void StorageMergeTree : : mergeThread ( bool while_can , bool aggressive )
2012-08-13 19:13:11 +00:00
{
try
2012-07-23 06:23:29 +00:00
{
2013-12-13 16:20:06 +00:00
while ( ! shutdown_called )
2012-08-13 19:13:11 +00:00
{
2014-02-28 10:32:07 +00:00
/// Удаляем старые куски. Н а случай, если в слиянии что-то сломано, и из следующего блока вылетит исключение.
2014-02-14 19:38:13 +00:00
clearOldParts ( ) ;
2013-12-13 16:20:06 +00:00
{
2014-02-28 14:25:15 +00:00
Poco : : ScopedReadRWLock lock ( merge_lock ) ;
2013-12-13 16:20:06 +00:00
/// К концу этого логического блока должен быть вызван деструктор, чтобы затем корректно определить удаленные куски
Poco : : SharedPtr < CurrentlyMergingPartsTagger > what ;
2013-12-16 03:51:30 +00:00
if ( ! selectPartsToMerge ( what , false , aggressive ) & & ! selectPartsToMerge ( what , true , aggressive ) )
2013-12-13 16:20:06 +00:00
break ;
mergeParts ( what ) ;
}
2012-08-13 19:13:11 +00:00
2013-10-22 18:39:14 +00:00
if ( shutdown_called )
break ;
2014-02-28 10:32:07 +00:00
/// Удаляем куски, которые мы только что сливали.
clearOldParts ( ) ;
2012-11-28 08:52:15 +00:00
if ( ! while_can )
break ;
2012-08-13 19:13:11 +00:00
}
}
catch ( const Exception & e )
{
LOG_ERROR ( log , " Code: " < < e . code ( ) < < " . " < < e . displayText ( ) < < std : : endl
< < std : : endl
< < " Stack trace: " < < std : : endl
< < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
LOG_ERROR ( log , " Poco::Exception: " < < e . code ( ) < < " . " < < e . displayText ( ) ) ;
}
catch ( const std : : exception & e )
{
LOG_ERROR ( log , " std::exception: " < < e . what ( ) ) ;
}
catch ( . . . )
{
LOG_ERROR ( log , " Unknown exception " ) ;
}
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : joinMergeThreads ( )
{
2013-09-14 05:14:22 +00:00
LOG_DEBUG ( log , " Waiting for merge threads to finish. " ) ;
2012-11-28 08:52:15 +00:00
merge_threads - > wait ( ) ;
}
2012-11-29 10:50:17 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
2012-11-29 17:43:23 +00:00
/// При max_parts_to_merge_at_once >= log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts),
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_rows_to_merge_parts).
2012-11-29 12:24:08 +00:00
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
/// Из всех таких выбираем отрезок с минимальным минимумом размера.
/// Из всех таких выбираем отрезок с максимальной длиной.
2013-12-03 14:40:20 +00:00
/// Дополнительно:
/// 1) с 1:00 до 5:00 ограничение сверху на размер куска в основном потоке увеличивается в несколько раз
/// 2) в зависимоти от возраста кусков меняется допустимая неравномерность при слиянии
/// 3) Молодые куски крупного размера (примерно больше 1 Гб) можно сливать не меньше чем по три
2013-12-03 15:06:59 +00:00
/// 4) Если в одном из потоков идет мердж крупных кусков, то во втором сливать только маленькие кусочки
2013-12-03 16:10:20 +00:00
/// 5) С ростом логарифма суммарного размера кусочков в мердже увеличиваем требование сбалансированности
2013-12-03 14:40:20 +00:00
2014-02-28 20:19:00 +00:00
bool StorageMergeTree : : selectPartsToMerge ( Poco : : SharedPtr < CurrentlyMergingPartsTagger > & what , bool merge_anything_for_old_months , bool aggressive )
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Selecting parts to merge " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
2012-07-23 06:23:29 +00:00
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2013-02-14 11:22:56 +00:00
2012-11-29 11:32:29 +00:00
size_t min_max = - 1U ;
size_t min_min = - 1U ;
int max_len = 0 ;
2012-11-29 10:50:17 +00:00
DataParts : : iterator best_begin ;
bool found = false ;
2013-02-14 11:22:56 +00:00
2013-08-11 03:40:14 +00:00
DayNum_t now_day = date_lut . toDayNum ( time ( 0 ) ) ;
DayNum_t now_month = date_lut . toFirstDayNumOfMonth ( now_day ) ;
2013-12-03 14:40:20 +00:00
int now_hour = date_lut . toHourInaccurate ( time ( 0 ) ) ;
2013-12-13 14:23:04 +00:00
size_t maybe_used_bytes = total_size_of_currently_merging_parts ;
size_t total_free_bytes = 0 ;
struct statvfs fs ;
/// Смотрим количество свободного места в файловой системе
2013-12-13 16:20:06 +00:00
if ( statvfs ( full_path . c_str ( ) , & fs ) ! = 0 )
throwFromErrno ( " Could not calculate available disk space (statvfs) " , ErrorCodes : : CANNOT_STATVFS ) ;
total_free_bytes = fs . f_bfree * fs . f_bsize ;
2013-12-03 14:40:20 +00:00
2012-11-29 12:24:08 +00:00
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
2012-11-29 12:26:34 +00:00
int max_count_from_left = 0 ;
2013-12-03 14:40:20 +00:00
2013-12-03 16:10:20 +00:00
size_t cur_max_rows_to_merge_parts = settings . max_rows_to_merge_parts ;
2013-12-03 14:40:20 +00:00
/// Если ночь, можем мерджить сильно большие куски
if ( now_hour > = 1 & & now_hour < = 5 )
cur_max_rows_to_merge_parts * = settings . merge_parts_at_night_inc ;
/// Если есть активный мердж крупных кусков, то ограничаемся мерджем только маленьких частей.
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2013-12-13 14:23:04 +00:00
{
2013-12-03 14:40:20 +00:00
if ( ( * it ) - > currently_merging & & ( * it ) - > size * index_granularity > 25 * 1024 * 1024 )
{
cur_max_rows_to_merge_parts = settings . max_rows_to_merge_parts_second ;
break ;
}
2013-12-13 14:23:04 +00:00
}
2013-12-03 14:40:20 +00:00
2012-11-29 12:24:08 +00:00
/// Левый конец отрезка.
2012-11-29 10:50:17 +00:00
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & first_part = * it ;
2013-12-13 14:23:04 +00:00
2012-11-29 12:26:34 +00:00
max_count_from_left = std : : max ( 0 , max_count_from_left - 1 ) ;
2012-11-29 10:50:17 +00:00
2013-12-16 03:51:30 +00:00
/// Кусок не занят.
if ( first_part - > currently_merging )
continue ;
/// Кусок достаточно мал или слияние "агрессивное".
if ( first_part - > size * index_granularity > cur_max_rows_to_merge_parts
& & ! aggressive )
2012-11-29 12:24:08 +00:00
continue ;
/// Кусок в одном месяце.
if ( first_part - > left_month ! = first_part - > right_month )
{
LOG_WARNING ( log , " Part " < < first_part - > name < < " spans more than one month " ) ;
2012-11-29 10:50:17 +00:00
continue ;
2012-11-29 12:24:08 +00:00
}
/// Самый длинный валидный отрезок, начинающийся здесь.
2012-11-29 16:39:29 +00:00
size_t cur_longest_max = - 1U ;
size_t cur_longest_min = - 1U ;
2012-11-29 12:24:08 +00:00
int cur_longest_len = 0 ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part - > size ;
size_t cur_min = first_part - > size ;
size_t cur_sum = first_part - > size ;
2013-12-13 16:20:06 +00:00
size_t cur_total_size = first_part - > size_in_bytes ;
2012-11-29 10:50:17 +00:00
int cur_len = 1 ;
2013-08-11 03:40:14 +00:00
DayNum_t month = first_part - > left_month ;
2012-11-29 12:24:08 +00:00
UInt64 cur_id = first_part - > right ;
2012-11-29 10:50:17 +00:00
2013-02-14 11:22:56 +00:00
/// Этот месяц кончился хотя бы день назад.
bool is_old_month = now_day - now_month > = 1 & & now_month > month ;
2013-12-03 14:40:20 +00:00
time_t oldest_modification_time = first_part - > modification_time ;
2012-11-29 12:24:08 +00:00
/// Правый конец отрезка.
2012-11-29 10:50:17 +00:00
DataParts : : iterator jt = it ;
2012-11-29 11:32:29 +00:00
for ( + + jt ; jt ! = data_parts . end ( ) & & cur_len < static_cast < int > ( settings . max_parts_to_merge_at_once ) ; + + jt )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & last_part = * jt ;
2013-12-16 03:51:30 +00:00
/// Кусок не занят и в одном правильном месяце.
2012-11-29 12:24:08 +00:00
if ( last_part - > currently_merging | |
last_part - > left_month ! = last_part - > right_month | |
last_part - > left_month ! = month )
2012-11-29 10:50:17 +00:00
break ;
2013-12-16 03:51:30 +00:00
/// Кусок достаточно мал или слияние "агрессивное".
if ( last_part - > size * index_granularity > cur_max_rows_to_merge_parts
& & ! aggressive )
break ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Кусок правее предыдущего.
2012-11-30 00:52:45 +00:00
if ( last_part - > left < cur_id )
2012-11-29 12:24:08 +00:00
{
LOG_WARNING ( log , " Part " < < last_part - > name < < " intersects previous part " ) ;
break ;
}
2013-12-03 15:06:59 +00:00
oldest_modification_time = std : : max ( oldest_modification_time , last_part - > modification_time ) ;
2012-11-29 12:24:08 +00:00
cur_max = std : : max ( cur_max , last_part - > size ) ;
cur_min = std : : min ( cur_min , last_part - > size ) ;
cur_sum + = last_part - > size ;
2013-12-13 16:20:06 +00:00
cur_total_size + = last_part - > size_in_bytes ;
2012-11-29 10:50:17 +00:00
+ + cur_len ;
2012-11-29 12:24:08 +00:00
cur_id = last_part - > right ;
2013-12-03 14:40:20 +00:00
int min_len = 2 ;
int cur_age_in_sec = time ( 0 ) - oldest_modification_time ;
/// Если куски примерно больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if ( cur_max * index_granularity * 150 > 1024 * 1024 * 1024 & & cur_age_in_sec < 6 * 3600 )
min_len = 3 ;
2012-11-29 10:50:17 +00:00
2013-12-03 15:06:59 +00:00
/// Равен 0.5 если возраст порядка 0, равен 5 если возраст около месяца.
2013-12-03 16:10:20 +00:00
double time_ratio_modifier = 0.5 + 9 * static_cast < double > ( cur_age_in_sec ) / ( 3600 * 24 * 30 + cur_age_in_sec ) ;
/// Двоичный логарифм суммарного размера кусочков
2013-12-03 16:14:36 +00:00
double log_cur_sum = std : : log ( cur_sum * index_granularity ) / std : : log ( 2 ) ;
2013-12-03 16:10:20 +00:00
/// Равен ~2 если куски маленькие, уменьшается до 0.5 с увеличением суммарного размера до 2^25.
double size_ratio_modifier = std : : max ( 0.25 , 2 - 3 * ( log_cur_sum ) / ( 25 + log_cur_sum ) ) ;
/// Объединяем все в одну константу
double ratio = std : : max ( 0.5 , time_ratio_modifier * size_ratio_modifier * settings . max_size_ratio_to_merge_parts ) ;
2013-12-03 14:40:20 +00:00
2012-11-29 12:24:08 +00:00
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
2013-12-16 03:51:30 +00:00
if ( cur_len > = min_len
& & ( static_cast < double > ( cur_max ) / ( cur_sum - cur_max ) < ratio
/// З а старый месяц объединяем что угодно, если разрешено и если этому хотя бы 15 дней
| | ( is_old_month & & merge_anything_for_old_months & & cur_age_in_sec > 3600 * 24 * 15 )
/// Если слияние "агрессивное", то сливаем что угодно
| | aggressive ) )
2012-11-29 12:24:08 +00:00
{
2013-12-13 14:23:04 +00:00
/// Достаточно места на диске, чтобы покрыть уже активные и новый мерджи с запасом в 50%
if ( total_free_bytes > ( maybe_used_bytes + cur_total_size ) * 1.5 )
{
cur_longest_max = cur_max ;
cur_longest_min = cur_min ;
cur_longest_len = cur_len ;
2013-12-16 03:51:30 +00:00
}
else
LOG_WARNING ( log , " Won't merge parts from " < < first_part - > name < < " to " < < last_part - > name
< < " because not enough free space: " < < total_free_bytes < < " free, "
< < maybe_used_bytes < < " already involved in merge, "
< < cur_total_size < < " required now (+50% on overhead) " ) ;
2012-11-29 12:24:08 +00:00
}
}
2012-11-29 12:26:34 +00:00
/// Это максимальный по включению валидный отрезок.
2012-11-29 12:24:08 +00:00
if ( cur_longest_len > max_count_from_left )
{
max_count_from_left = cur_longest_len ;
2013-12-16 03:51:30 +00:00
if ( ! found
| | std : : make_pair ( std : : make_pair ( cur_longest_max , cur_longest_min ) , - cur_longest_len )
< std : : make_pair ( std : : make_pair ( min_max , min_min ) , - max_len ) )
2012-11-29 10:50:17 +00:00
{
found = true ;
2012-11-29 12:24:08 +00:00
min_max = cur_longest_max ;
min_min = cur_longest_min ;
max_len = cur_longest_len ;
2012-11-29 10:50:17 +00:00
best_begin = it ;
}
2012-07-23 06:23:29 +00:00
}
}
2012-11-29 10:50:17 +00:00
if ( found )
2012-07-23 06:23:29 +00:00
{
2013-12-13 16:20:06 +00:00
std : : vector < DataPartPtr > parts ;
2012-11-29 10:50:17 +00:00
DataParts : : iterator it = best_begin ;
for ( int i = 0 ; i < max_len ; + + i )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . push_back ( * it ) ;
+ + it ;
2012-07-23 06:23:29 +00:00
}
2013-12-13 16:20:06 +00:00
what = new CurrentlyMergingPartsTagger ( parts , data_parts_mutex ) ;
2012-11-29 10:50:17 +00:00
2012-11-30 02:01:02 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
else
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
LOG_DEBUG ( log , " No parts to merge " ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
return found ;
2012-07-23 06:23:29 +00:00
}
2013-04-24 10:31:32 +00:00
/// parts должны быть отсортированы.
2014-02-28 20:19:00 +00:00
void StorageMergeTree : : mergeParts ( Poco : : SharedPtr < CurrentlyMergingPartsTagger > & what )
2012-07-23 06:23:29 +00:00
{
2014-02-28 20:19:00 +00:00
const std : : vector < DataPartPtr > & parts ( what - > parts ) ;
2012-07-30 20:32:36 +00:00
2013-12-13 16:20:06 +00:00
LOG_DEBUG ( log , " Merging " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2013-12-12 13:54:16 +00:00
2012-08-13 19:13:11 +00:00
Names all_column_names ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
all_column_names . push_back ( it - > first ) ;
2012-07-30 20:32:36 +00:00
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
StorageMergeTree : : DataPartPtr new_data_part = new DataPart ( * this ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : numeric_limits < UInt16 > : : max ( ) ;
new_data_part - > right_date = std : : numeric_limits < UInt16 > : : min ( ) ;
2012-11-30 02:01:02 +00:00
new_data_part - > left = parts . front ( ) - > left ;
2012-11-28 08:52:15 +00:00
new_data_part - > right = parts . back ( ) - > right ;
new_data_part - > level = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
new_data_part - > level = std : : max ( new_data_part - > level , parts [ i ] - > level ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : min ( new_data_part - > left_date , parts [ i ] - > left_date ) ;
new_data_part - > right_date = std : : max ( new_data_part - > right_date , parts [ i ] - > right_date ) ;
2012-11-28 08:52:15 +00:00
}
+ + new_data_part - > level ;
2012-08-13 19:13:11 +00:00
new_data_part - > name = getPartName (
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-13 19:13:11 +00:00
2012-11-29 16:39:29 +00:00
/** Читаем из всех кусков, сливаем и пишем в новый.
2012-08-13 19:13:11 +00:00
* П о п у т н о в ы ч и с л я е м в ы р а ж е н и е д л я с о р т и р о в к и .
*/
BlockInputStreams src_streams ;
2012-07-30 20:32:36 +00:00
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 09:45:09 +00:00
MarkRanges ranges ( 1 , MarkRange ( 0 , parts [ i ] - > size ) ) ;
2013-06-03 13:17:17 +00:00
src_streams . push_back ( new ExpressionBlockInputStream ( new MergeTreeBlockInputStream (
2013-11-28 13:16:46 +00:00
full_path + parts [ i ] - > name + ' / ' , DEFAULT_MERGE_BLOCK_SIZE , all_column_names , * this , parts [ i ] , ranges ,
2013-12-02 09:46:30 +00:00
StoragePtr ( ) , false , NULL , " " ) , primary_expr ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-07-30 20:32:36 +00:00
2013-06-18 09:43:35 +00:00
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
2013-04-24 10:31:32 +00:00
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска, то есть (примерного) возрастания времени вставки.
2013-09-30 19:54:25 +00:00
BlockInputStreamPtr merged_stream ;
switch ( mode )
{
case Ordinary :
merged_stream = new MergingSortedBlockInputStream ( src_streams , sort_descr , DEFAULT_MERGE_BLOCK_SIZE ) ;
break ;
case Collapsing :
merged_stream = new CollapsingSortedBlockInputStream ( src_streams , sort_descr , sign_column , DEFAULT_MERGE_BLOCK_SIZE ) ;
break ;
case Summing :
2013-10-01 20:39:26 +00:00
merged_stream = new SummingSortedBlockInputStream ( src_streams , sort_descr , DEFAULT_MERGE_BLOCK_SIZE ) ;
break ;
2013-09-30 19:54:25 +00:00
default :
throw Exception ( " Unknown mode of operation for StorageMergeTree: " + toString ( mode ) , ErrorCodes : : LOGICAL_ERROR ) ;
}
2012-12-03 08:52:58 +00:00
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream ( * this ,
2012-11-28 08:52:15 +00:00
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-07-30 20:32:36 +00:00
2013-10-22 18:39:14 +00:00
merged_stream - > readPrefix ( ) ;
to - > writePrefix ( ) ;
Block block ;
while ( ! shutdown_called & & ( block = merged_stream - > read ( ) ) )
to - > write ( block ) ;
if ( shutdown_called )
{
LOG_INFO ( log , " Shutdown requested while merging parts. " ) ;
return ;
}
merged_stream - > readSuffix ( ) ;
to - > writeSuffix ( ) ;
2012-07-31 20:03:53 +00:00
2014-02-09 22:07:01 +00:00
/// В обычном режиме строчки не могут удалиться при мердже.
if ( 0 = = to - > marksCount ( ) & & mode = = Ordinary )
throw Exception ( " Empty part after merge " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-12-03 08:52:58 +00:00
new_data_part - > size = to - > marksCount ( ) ;
2012-08-13 19:13:11 +00:00
new_data_part - > modification_time = time ( 0 ) ;
2014-02-09 22:07:01 +00:00
if ( 0 ! = to - > marksCount ( ) )
new_data_part - > loadIndex ( ) ; /// NOTE Только что записанный индекс заново считывается с диска. Можно было бы формировать е г о сразу при записи.
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-31 17:07:20 +00:00
2014-02-09 22:07:01 +00:00
/// Добавляем новый кусок в набор, если он не пустой.
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
if ( data_parts . end ( ) = = data_parts . find ( parts [ i ] ) )
throw Exception ( " Logical error: cannot find data part " + parts [ i ] - > name + " in list " , ErrorCodes : : LOGICAL_ERROR ) ;
2014-02-09 22:07:01 +00:00
if ( 0 = = to - > marksCount ( ) )
{
LOG_INFO ( log , " All rows have been deleted while merging from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
}
else
{
data_parts . insert ( new_data_part ) ;
all_data_parts . insert ( new_data_part ) ;
2012-11-28 08:52:15 +00:00
}
2012-07-31 17:07:20 +00:00
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
data_parts . erase ( data_parts . find ( parts [ i ] ) ) ;
2012-07-31 17:07:20 +00:00
}
2012-08-13 19:13:11 +00:00
2012-12-13 20:28:10 +00:00
LOG_TRACE ( log , " Merged " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-08-16 18:17:01 +00:00
2013-01-23 11:16:32 +00:00
void StorageMergeTree : : rename ( const String & new_path_to_db , const String & new_name )
{
joinMergeThreads ( ) ;
2014-01-09 13:39:06 +00:00
/// Кажется тут race condition - в этот момент мердж может запуститься снова.
2013-01-23 11:16:32 +00:00
std : : string new_full_path = new_path_to_db + escapeForFileName ( new_name ) + ' / ' ;
Poco : : File ( full_path ) . renameTo ( new_full_path ) ;
path = new_path_to_db ;
full_path = new_full_path ;
name = new_name ;
increment . setPath ( full_path + " increment.txt " ) ;
}
2013-01-23 17:43:19 +00:00
void StorageMergeTree : : dropImpl ( )
2012-08-16 18:17:01 +00:00
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-08-16 18:17:01 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
data_parts . clear ( ) ;
all_data_parts . clear ( ) ;
Poco : : File ( full_path ) . remove ( true ) ;
}
2013-09-23 12:01:19 +00:00
void StorageMergeTree : : removeColumnFiles ( String column_name )
2013-08-07 13:07:42 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2013-08-14 16:52:40 +00:00
/// Регэксп выбирает файлы столбца для удаления
Poco : : RegularExpression re ( column_name + " (?:(?: \\ .| \\ %2E).+){0,1} " + " (?: \\ .mrk| \\ .bin| \\ .size \\ d+ \\ .bin| \\ .size \\ d+ \\ .mrk) " ) ;
/// Цикл по всем директориям кусочков
2013-08-09 00:12:59 +00:00
Poco : : RegularExpression : : MatchVec matches ;
Poco : : DirectoryIterator end ;
2013-08-14 16:52:40 +00:00
for ( Poco : : DirectoryIterator it_dir = Poco : : DirectoryIterator ( full_path ) ; it_dir ! = end ; + + it_dir )
2013-08-07 13:07:42 +00:00
{
2013-08-14 16:52:40 +00:00
std : : string dir_name = it_dir . name ( ) ;
2013-08-07 13:07:42 +00:00
2013-08-14 16:52:40 +00:00
if ( ! isPartDirectory ( dir_name , matches ) )
2013-08-07 13:07:42 +00:00
continue ;
2013-08-14 16:52:40 +00:00
/// Цикл по каждому из файлов в директории кусочков
String full_dir_name = full_path + dir_name + " / " ;
for ( Poco : : DirectoryIterator it_file ( full_dir_name ) ; it_file ! = end ; + + it_file )
2013-08-09 00:12:59 +00:00
{
2013-08-14 16:52:40 +00:00
if ( re . match ( it_file . name ( ) ) )
{
Poco : : File file ( full_dir_name + it_file . name ( ) ) ;
if ( file . exists ( ) )
file . remove ( ) ;
}
2013-08-09 00:12:59 +00:00
}
2013-08-07 13:07:42 +00:00
}
}
2014-03-04 11:30:50 +00:00
void StorageMergeTree : : createConvertExpression ( const String & in_column_name , const String & out_type , ExpressionActionsPtr & out_expression , String & out_column )
{
ASTFunction * function = new ASTFunction ;
ASTPtr function_ptr = function ;
ASTExpressionList * arguments = new ASTExpressionList ;
ASTPtr arguments_ptr = arguments ;
function - > name = " to " + out_type ;
function - > arguments = arguments_ptr ;
function - > children . push_back ( arguments_ptr ) ;
ASTIdentifier * in_column = new ASTIdentifier ;
ASTPtr in_column_ptr = in_column ;
arguments - > children . push_back ( in_column_ptr ) ;
in_column - > name = in_column_name ;
in_column - > kind = ASTIdentifier : : Column ;
out_expression = ExpressionAnalyzer ( function_ptr , context , * columns ) . getActions ( false ) ;
out_column = function - > getColumnName ( ) ;
}
2013-08-09 00:12:59 +00:00
void StorageMergeTree : : alter ( const ASTAlterQuery : : Parameters & params )
2013-08-07 13:07:42 +00:00
{
2014-03-04 11:30:50 +00:00
if ( params . type = = ASTAlterQuery : : MODIFY )
{
Poco : : ScopedWriteRWLock mlock ( merge_lock ) ;
Poco : : ScopedWriteRWLock wlock ( write_lock ) ;
typedef std : : vector < DataPartPtr > PartsList ;
PartsList parts ;
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
for ( auto & data_part : data_parts )
{
parts . push_back ( data_part ) ;
}
}
/*for (DataPart & part : parts)
{
ReadBufferFromFile in ( full_path + part . name + ' / ' ) ;
while ( DB : : Block b = in . read ( ) )
{
b .
}
} */
Names column_name ;
const ASTNameTypePair & name_type = dynamic_cast < const ASTNameTypePair & > ( * params . name_type ) ;
StringRange type_range = name_type . type - > range ;
String type ( type_range . first , type_range . second - type_range . first ) ;
column_name . push_back ( name_type . name ) ;
DB : : ExpressionActionsPtr expr ;
String out_column ;
createConvertExpression ( name_type . name , type , expr , out_column ) ;
ColumnNumbers num ( 1 , 0 ) ;
for ( DataPartPtr & part : parts )
{
MarkRanges ranges ( 1 , MarkRange ( 0 , part - > size ) ) ;
ExpressionBlockInputStream in ( new MergeTreeBlockInputStream ( full_path + part - > name + ' / ' ,
DEFAULT_MERGE_BLOCK_SIZE , column_name , * this , part , ranges , StoragePtr ( ) , false , NULL , " " ) , expr ) ;
MergedColumnOnlyOutputStream out ( * this , full_path + part - > name + ' / ' ) ;
while ( DB : : Block b = in . read ( ) )
{
std : : stringstream s ;
for ( auto & column : b . getColumnsList ( ) )
s < < " " < < column . first ;
LOG_TRACE ( log , " Block after modification has " < < b . columns ( ) < < " columns with names " < < s . str ( ) ) ;
/// писать только некоторые столбцы и добавить и изменить название
out . write ( b ) ;
}
}
/// переименовываем файлы
Poco : : ScopedWriteRWLock rlock ( read_lock ) ;
for ( DataPartPtr & part : parts )
{
std : : string path = full_path + part - > name + ' / ' + name_type . name ;
Poco : : File ( path + " .bin " ) . renameTo ( path + " .bin " + " .old " ) ;
Poco : : File ( path + " .mrk " ) . renameTo ( path + " .mrk " + " .old " ) ;
}
for ( DataPartPtr & part : parts )
{
std : : string path = full_path + part - > name + ' / ' + out_column ;
std : : string new_path = full_path + part - > name + ' / ' + name_type . name ;
Poco : : File ( path + " .bin " ) . renameTo ( new_path + " .bin " + " .old " ) ;
Poco : : File ( path + " .mrk " ) . renameTo ( new_path + " .mrk " + " .old " ) ;
}
for ( DataPartPtr & part : parts )
{
std : : string path = full_path + part - > name + ' / ' + name_type . name ;
Poco : : File ( path + " .bin " + " .old " ) . remove ( ) ;
Poco : : File ( path + " .mrk " + " .old " ) . remove ( ) ;
}
}
2013-08-08 09:50:15 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2013-11-13 09:47:12 +00:00
alterColumns ( params , columns , context ) ;
2013-08-08 09:50:15 +00:00
}
2013-09-23 12:01:19 +00:00
if ( params . type = = ASTAlterQuery : : DROP )
2013-08-07 13:07:42 +00:00
{
2013-08-09 00:12:59 +00:00
String column_name = dynamic_cast < const ASTIdentifier & > ( * params . column ) . name ;
2013-09-23 12:01:19 +00:00
removeColumnFiles ( column_name ) ;
2013-08-07 13:07:42 +00:00
}
}
2013-08-09 00:12:59 +00:00
bool StorageMergeTree : : isPartDirectory ( const String & dir_name , Poco : : RegularExpression : : MatchVec & matches ) const
2013-08-07 13:07:42 +00:00
{
return ( file_name_regexp . match ( dir_name , 0 , matches ) & & 6 = = matches . size ( ) ) ;
}
2013-08-09 00:12:59 +00:00
2013-09-15 03:44:32 +00:00
2013-10-03 12:46:17 +00:00
bool StorageMergeTree : : isBrokenPart ( const String & path )
2013-09-15 03:44:32 +00:00
{
/// Проверяем, что первичный ключ непуст.
Poco : : File index_file ( path + " /primary.idx " ) ;
if ( ! index_file . exists ( ) | | index_file . getSize ( ) = = 0 )
{
2013-10-03 12:46:17 +00:00
LOG_ERROR ( log , " Part " < < path < < " is broken: primary key is empty. " ) ;
2013-09-15 03:44:32 +00:00
return true ;
}
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = - 1 ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
{
Poco : : File marks_file ( path + " / " + escapeForFileName ( it - > first ) + " .mrk " ) ;
2013-09-23 12:01:19 +00:00
/// при добавлении нового столбца в таблицу файлы .mrk не создаются. Н е будем ничего удалять.
2013-09-19 15:20:31 +00:00
if ( ! marks_file . exists ( ) )
2013-10-03 12:46:17 +00:00
continue ;
2013-09-15 03:44:32 +00:00
if ( marks_size = = - 1 )
{
marks_size = marks_file . getSize ( ) ;
if ( 0 = = marks_size )
{
2013-10-03 12:46:17 +00:00
LOG_ERROR ( log , " Part " < < path < < " is broken: " < < marks_file . path ( ) < < " is empty. " ) ;
2013-09-15 03:44:32 +00:00
return true ;
}
}
else
{
if ( static_cast < ssize_t > ( marks_file . getSize ( ) ) ! = marks_size )
{
2013-10-03 12:46:17 +00:00
LOG_ERROR ( log , " Part " < < path < < " is broken: marks have different sizes. " ) ;
2013-09-15 03:44:32 +00:00
return true ;
}
}
}
return false ;
}
2013-10-03 12:46:17 +00:00
Strings StorageMergeTree : : tryRestorePart ( const String & path , const String & file_name , Strings & old_parts )
{
LOG_ERROR ( log , " Restoring all old_ parts covered by " < < file_name ) ;
Poco : : RegularExpression : : MatchVec matches ;
Strings restored_parts ;
isPartDirectory ( file_name , matches ) ;
DataPart broken_part ( * this ) ;
parsePartName ( file_name , matches , broken_part ) ;
for ( int i = static_cast < int > ( old_parts . size ( ) ) - 1 ; i > = 0 ; - - i )
{
DataPart old_part ( * this ) ;
String name = old_parts [ i ] . substr ( strlen ( " old_ " ) ) ;
if ( ! isPartDirectory ( name , matches ) )
{
LOG_ERROR ( log , " Strange file name: " < < path + old_parts [ i ] < < " ; ignoring " ) ;
old_parts . erase ( old_parts . begin ( ) + i ) ;
continue ;
}
parsePartName ( name , matches , old_part ) ;
if ( broken_part . contains ( old_part ) )
{
/// Восстанавливаем все содержащиеся куски. Если некоторые из них содержатся в других, их удалит loadDataParts.
LOG_ERROR ( log , " Restoring part " < < path + old_parts [ i ] ) ;
Poco : : File ( path + old_parts [ i ] ) . renameTo ( path + name ) ;
old_parts . erase ( old_parts . begin ( ) + i ) ;
restored_parts . push_back ( name ) ;
}
}
if ( restored_parts . size ( ) > = 2 )
{
LOG_ERROR ( log , " Removing broken part " < < path + file_name < < " because at least 2 old_ parts were restored in its place " ) ;
Poco : : File ( path + file_name ) . remove ( true ) ;
}
else
{
LOG_ERROR ( log , " Not removing broken part " < < path + file_name
< < " because less than 2 old_ parts were restored in its place. You need to resolve this manually " ) ;
}
return restored_parts ;
}
2012-07-17 20:04:39 +00:00
}