2012-07-18 19:16:16 +00:00
# include <boost/bind.hpp>
2012-11-28 08:52:15 +00:00
# include <numeric>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
# include <Poco/DirectoryIterator.h>
2013-04-08 19:47:25 +00:00
# include <Poco/Ext/ScopedTry.h>
2012-07-19 20:32:10 +00:00
# include <Yandex/time2str.h>
2012-07-17 20:04:39 +00:00
# include <DB/Common/escapeForFileName.h>
# include <DB/IO/WriteBufferFromString.h>
# include <DB/IO/WriteBufferFromFile.h>
# include <DB/IO/CompressedWriteBuffer.h>
2012-07-21 03:45:48 +00:00
# include <DB/IO/ReadBufferFromString.h>
2012-07-19 20:32:10 +00:00
# include <DB/IO/ReadBufferFromFile.h>
# include <DB/IO/CompressedReadBuffer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Columns/ColumnsNumber.h>
2012-08-30 17:43:31 +00:00
# include <DB/Columns/ColumnArray.h>
2013-07-12 13:35:05 +00:00
# include <DB/Columns/ColumnNested.h>
2012-07-17 20:04:39 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/DataTypes/DataTypesNumberFixed.h>
2012-08-30 17:43:31 +00:00
# include <DB/DataTypes/DataTypeArray.h>
2013-07-12 13:35:05 +00:00
# include <DB/DataTypes/DataTypeNested.h>
2012-07-21 03:45:48 +00:00
2012-07-19 20:32:10 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/MergingSortedBlockInputStream.h>
2012-08-16 17:27:40 +00:00
# include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2013-04-25 15:46:14 +00:00
# include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
2012-07-31 17:22:40 +00:00
# include <DB/DataStreams/ExpressionBlockInputStream.h>
2012-11-28 08:52:15 +00:00
# include <DB/DataStreams/ConcatBlockInputStream.h>
2012-07-21 06:47:17 +00:00
# include <DB/DataStreams/narrowBlockInputStreams.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/copyData.h>
2012-12-12 14:25:55 +00:00
# include <DB/DataStreams/FilterBlockInputStream.h>
2012-07-19 20:32:10 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/Parsers/ASTExpressionList.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTFunction.h>
# include <DB/Parsers/ASTLiteral.h>
2012-12-12 14:25:55 +00:00
# include <DB/Parsers/ASTIdentifier.h>
2013-08-07 13:07:42 +00:00
# include <DB/Parsers/ASTNameTypePair.h>
2012-07-21 03:45:48 +00:00
2012-07-17 20:04:39 +00:00
# include <DB/Interpreters/sortBlock.h>
2013-06-03 13:17:17 +00:00
# include <DB/Interpreters/ExpressionAnalyzer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Storages/StorageMergeTree.h>
2013-04-24 10:31:32 +00:00
# include <DB/Storages/MergeTree/PKCondition.h>
# include <DB/Storages/MergeTree/MergeTreeBlockOutputStream.h>
# include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
# include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
2012-07-19 20:32:10 +00:00
2013-08-08 09:50:15 +00:00
# include <algorithm>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
StorageMergeTree : : StorageMergeTree (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
2013-05-05 18:02:05 +00:00
const Context & context_ ,
2012-12-12 14:25:55 +00:00
ASTPtr & primary_expr_ast_ ,
2012-12-12 15:45:08 +00:00
const String & date_column_name_ , const ASTPtr & sampling_expression_ ,
2012-08-16 17:27:40 +00:00
size_t index_granularity_ ,
2012-08-20 05:32:50 +00:00
const String & sign_column_ ,
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_ )
2012-07-17 20:04:39 +00:00
: path ( path_ ) , name ( name_ ) , full_path ( path + escapeForFileName ( name ) + ' / ' ) , columns ( columns_ ) ,
context ( context_ ) , primary_expr_ast ( primary_expr_ast_ - > clone ( ) ) ,
2012-12-12 15:45:08 +00:00
date_column_name ( date_column_name_ ) , sampling_expression ( sampling_expression_ ) ,
2012-12-12 14:25:55 +00:00
index_granularity ( index_granularity_ ) ,
2012-08-20 05:32:50 +00:00
sign_column ( sign_column_ ) ,
2012-08-29 20:23:19 +00:00
settings ( settings_ ) ,
2013-08-07 13:07:42 +00:00
increment ( full_path + " increment.txt " ) , log ( & Logger : : get ( " StorageMergeTree: " + name ) ) ,
file_name_regexp ( " ^( \\ d{8})_( \\ d{8})_( \\ d+)_( \\ d+)_( \\ d+) " )
2012-07-17 20:04:39 +00:00
{
2012-12-06 13:07:29 +00:00
min_marks_for_seek = ( settings . min_rows_for_seek + index_granularity - 1 ) / index_granularity ;
min_marks_for_concurrent_read = ( settings . min_rows_for_concurrent_read + index_granularity - 1 ) / index_granularity ;
2013-09-08 07:30:52 +00:00
max_marks_to_use_cache = ( settings . max_rows_to_use_cache + index_granularity - 1 ) / index_granularity ;
2012-12-06 13:07:29 +00:00
2012-07-17 20:04:39 +00:00
/// создаём директорию, если её нет
Poco : : File ( full_path ) . createDirectories ( ) ;
/// инициализируем описание сортировки
sort_descr . reserve ( primary_expr_ast - > children . size ( ) ) ;
for ( ASTs : : iterator it = primary_expr_ast - > children . begin ( ) ;
it ! = primary_expr_ast - > children . end ( ) ;
+ + it )
{
2012-07-18 20:14:41 +00:00
String name = ( * it ) - > getColumnName ( ) ;
2012-07-17 20:04:39 +00:00
sort_descr . push_back ( SortColumnDescription ( name , 1 ) ) ;
}
2012-07-18 20:14:41 +00:00
2013-06-03 13:17:17 +00:00
primary_expr = ExpressionAnalyzer ( primary_expr_ast , context , * columns ) . getActions ( false ) ;
2013-06-04 11:00:33 +00:00
ExpressionActionsPtr projected_expr = ExpressionAnalyzer ( primary_expr_ast , context , * columns ) . getActions ( true ) ;
primary_key_sample = projected_expr - > getSampleBlock ( ) ;
2012-07-19 20:32:10 +00:00
2012-11-28 08:52:15 +00:00
merge_threads = new boost : : threadpool : : pool ( settings . merging_threads ) ;
2012-07-19 20:32:10 +00:00
loadDataParts ( ) ;
2012-07-17 20:04:39 +00:00
}
2013-02-06 11:26:35 +00:00
StoragePtr StorageMergeTree : : create (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
2013-05-05 18:02:05 +00:00
const Context & context_ ,
2013-02-06 11:26:35 +00:00
ASTPtr & primary_expr_ast_ ,
const String & date_column_name_ , const ASTPtr & sampling_expression_ ,
size_t index_granularity_ ,
const String & sign_column_ ,
const StorageMergeTreeSettings & settings_ )
{
return ( new StorageMergeTree ( path_ , name_ , columns_ , context_ , primary_expr_ast_ , date_column_name_ , sampling_expression_ , index_granularity_ , sign_column_ , settings_ ) ) - > thisPtr ( ) ;
}
2012-07-17 20:04:39 +00:00
2012-07-30 20:32:36 +00:00
StorageMergeTree : : ~ StorageMergeTree ( )
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-07-30 20:32:36 +00:00
}
2012-07-18 19:44:04 +00:00
BlockOutputStreamPtr StorageMergeTree : : write ( ASTPtr query )
{
2013-01-23 17:38:03 +00:00
return new MergeTreeBlockOutputStream ( thisPtr ( ) ) ;
2012-07-18 19:44:04 +00:00
}
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree : : read (
2012-12-12 16:11:27 +00:00
const Names & column_names_to_return ,
2012-07-21 05:07:14 +00:00
ASTPtr query ,
2013-02-01 19:02:04 +00:00
const Settings & settings ,
2012-07-21 05:07:14 +00:00
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
2012-12-15 02:11:37 +00:00
check ( column_names_to_return ) ;
processed_stage = QueryProcessingStage : : FetchColumns ;
2013-05-06 12:15:34 +00:00
PKCondition key_condition ( query , context , * columns , sort_descr ) ;
PKCondition date_condition ( query , context , * columns , SortDescription ( 1 , SortColumnDescription ( date_column_name , 1 ) ) ) ;
2012-07-21 03:45:48 +00:00
2012-12-13 10:08:54 +00:00
typedef std : : vector < DataPartPtr > PartsList ;
PartsList parts ;
2013-01-09 10:19:24 +00:00
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition.
2012-12-13 10:08:54 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
if ( date_condition . mayBeTrueInRange ( Row ( 1 , static_cast < UInt64 > ( ( * it ) - > left_date ) ) , Row ( 1 , static_cast < UInt64 > ( ( * it ) - > right_date ) ) ) )
parts . push_back ( * it ) ;
}
2012-12-12 16:26:18 +00:00
/// Семплирование.
2012-12-12 16:11:27 +00:00
Names column_names_to_read = column_names_to_return ;
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_value_limit = 0 ;
2012-12-12 16:26:18 +00:00
typedef Poco : : SharedPtr < ASTFunction > ASTFunctionPtr ;
ASTFunctionPtr filter_function ;
2013-06-03 13:17:17 +00:00
ExpressionActionsPtr filter_expression ;
2012-12-12 14:25:55 +00:00
ASTSelectQuery & select = * dynamic_cast < ASTSelectQuery * > ( & * query ) ;
if ( select . sample_size )
{
2013-01-05 20:03:19 +00:00
double size = apply_visitor ( FieldVisitorConvertToNumber < double > ( ) ,
2012-12-13 10:08:54 +00:00
dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-12 14:25:55 +00:00
if ( size < 0 )
throw Exception ( " Negative sample size " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
if ( size > 1 )
{
2013-01-05 20:03:19 +00:00
size_t requested_count = apply_visitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-13 10:08:54 +00:00
/// Узнаем, сколько строк мы бы прочли без семплирования.
LOG_DEBUG ( log , " Preliminary index scan with condition: " < < key_condition . toString ( ) ) ;
2012-12-13 09:32:08 +00:00
size_t total_count = 0 ;
2012-12-13 10:08:54 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
2012-12-13 09:32:08 +00:00
{
2012-12-13 10:08:54 +00:00
DataPartPtr & part = parts [ i ] ;
MarkRanges ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( full_path + part - > name + ' / ' ,
part - > size ,
* this ,
key_condition ) ;
for ( size_t j = 0 ; j < ranges . size ( ) ; + + j )
total_count + = ranges [ j ] . end - ranges [ j ] . begin ;
2012-12-13 09:32:08 +00:00
}
total_count * = index_granularity ;
size = std : : min ( 1. , static_cast < double > ( requested_count ) / total_count ) ;
2012-12-13 09:38:36 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Selected relative sample size: " < < size ) ;
2012-12-12 14:25:55 +00:00
}
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_max = 0 ;
2013-06-03 13:17:17 +00:00
DataTypePtr type = primary_expr - > getSampleBlock ( ) . getByName ( sampling_expression - > getColumnName ( ) ) . type ;
2012-12-13 09:32:08 +00:00
if ( type - > getName ( ) = = " UInt64 " )
sampling_column_max = std : : numeric_limits < UInt64 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt32 " )
sampling_column_max = std : : numeric_limits < UInt32 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt16 " )
sampling_column_max = std : : numeric_limits < UInt16 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt8 " )
sampling_column_max = std : : numeric_limits < UInt8 > : : max ( ) ;
2012-12-12 14:25:55 +00:00
else
2012-12-13 09:32:08 +00:00
throw Exception ( " Invalid sampling column type in storage parameters: " + type - > getName ( ) + " . Must be unsigned integer type. " , ErrorCodes : : ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ) ;
2012-12-13 10:08:54 +00:00
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
2012-12-13 09:32:08 +00:00
sampling_column_value_limit = static_cast < UInt64 > ( size * sampling_column_max ) ;
if ( ! key_condition . addCondition ( sampling_expression - > getColumnName ( ) ,
Range : : RightBounded ( sampling_column_value_limit , true ) ) )
throw Exception ( " Sampling column not in primary key " , ErrorCodes : : ILLEGAL_COLUMN ) ;
/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit
2012-12-12 16:26:18 +00:00
ASTPtr filter_function_args = new ASTExpressionList ;
filter_function_args - > children . push_back ( sampling_expression ) ;
2012-12-13 09:32:08 +00:00
filter_function_args - > children . push_back ( new ASTLiteral ( StringRange ( ) , sampling_column_value_limit ) ) ;
2012-12-12 16:26:18 +00:00
filter_function = new ASTFunction ;
filter_function - > name = " lessOrEquals " ;
filter_function - > arguments = filter_function_args ;
filter_function - > children . push_back ( filter_function - > arguments ) ;
2013-06-03 13:17:17 +00:00
filter_expression = ExpressionAnalyzer ( filter_function , context , * columns ) . getActions ( false ) ;
2012-12-12 16:26:18 +00:00
2013-04-25 11:37:20 +00:00
/// Добавим столбцы, нужные для sampling_expression.
2012-12-12 16:26:18 +00:00
std : : vector < String > add_columns = filter_expression - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
}
2012-12-12 14:25:55 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Key condition: " < < key_condition . toString ( ) ) ;
LOG_DEBUG ( log , " Date condition: " < < date_condition . toString ( ) ) ;
2012-11-28 08:52:15 +00:00
2012-12-06 11:48:41 +00:00
RangesInDataParts parts_with_ranges ;
2012-11-28 08:52:15 +00:00
/// Найдем, какой диапазон читать из каждого куска.
size_t sum_marks = 0 ;
2012-12-06 11:48:41 +00:00
size_t sum_ranges = 0 ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 11:48:41 +00:00
DataPartPtr & part = parts [ i ] ;
RangesInDataPart ranges ( part ) ;
ranges . ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( full_path + part - > name + ' / ' ,
2012-12-13 10:08:54 +00:00
part - > size ,
* this ,
key_condition ) ;
2012-12-06 11:48:41 +00:00
if ( ! ranges . ranges . empty ( ) )
{
parts_with_ranges . push_back ( ranges ) ;
sum_ranges + = ranges . ranges . size ( ) ;
for ( size_t j = 0 ; j < ranges . ranges . size ( ) ; + + j )
{
sum_marks + = ranges . ranges [ j ] . end - ranges . ranges [ j ] . begin ;
}
}
2012-11-28 08:52:15 +00:00
}
2012-12-06 12:24:08 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts by date, " < < parts_with_ranges . size ( ) < < " parts by key, "
2012-12-06 11:48:41 +00:00
< < sum_marks < < " marks to read from " < < sum_ranges < < " ranges " ) ;
2012-11-28 08:52:15 +00:00
2013-04-23 11:08:41 +00:00
BlockInputStreams res ;
if ( select . final )
{
2013-04-25 11:37:20 +00:00
/// Добавим столбцы, нужные для вычисления первичного ключа и знака.
2013-04-24 11:39:12 +00:00
std : : vector < String > add_columns = primary_expr - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
2013-04-25 11:37:20 +00:00
column_names_to_read . push_back ( sign_column ) ;
2013-04-24 11:39:12 +00:00
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
2013-09-08 05:53:10 +00:00
res = spreadMarkRangesAmongThreadsFinal ( parts_with_ranges , threads , column_names_to_read , max_block_size , settings . use_uncompressed_cache ) ;
2013-04-23 11:08:41 +00:00
}
else
{
2013-09-08 05:53:10 +00:00
res = spreadMarkRangesAmongThreads ( parts_with_ranges , threads , column_names_to_read , max_block_size , settings . use_uncompressed_cache ) ;
2013-04-23 11:08:41 +00:00
}
2012-12-12 14:25:55 +00:00
2012-12-13 09:32:08 +00:00
if ( select . sample_size )
2012-12-12 14:25:55 +00:00
{
for ( size_t i = 0 ; i < res . size ( ) ; + + i )
{
BlockInputStreamPtr original_stream = res [ i ] ;
2013-06-03 13:17:17 +00:00
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream ( original_stream , filter_expression ) ;
2012-12-12 14:25:55 +00:00
BlockInputStreamPtr filter_stream = new FilterBlockInputStream ( expression_stream , filter_function - > getColumnName ( ) ) ;
res [ i ] = filter_stream ;
}
}
return res ;
2012-12-06 09:45:09 +00:00
}
/// Примерно поровну распределить засечки между потоками.
2013-09-08 05:53:10 +00:00
BlockInputStreams StorageMergeTree : : spreadMarkRangesAmongThreads (
RangesInDataParts parts , size_t threads , const Names & column_names , size_t max_block_size , bool use_uncompressed_cache )
2012-12-06 09:45:09 +00:00
{
/// Н а всякий случай перемешаем куски.
std : : random_shuffle ( parts . begin ( ) , parts . end ( ) ) ;
/// Посчитаем засечки для каждого куска.
std : : vector < size_t > sum_marks_in_parts ( parts . size ( ) ) ;
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std : : reverse ( parts [ i ] . ranges . begin ( ) , parts [ i ] . ranges . end ( ) ) ;
sum_marks_in_parts [ i ] = 0 ;
for ( size_t j = 0 ; j < parts [ i ] . ranges . size ( ) ; + + j )
{
MarkRange & range = parts [ i ] . ranges [ j ] ;
sum_marks_in_parts [ i ] + = range . end - range . begin ;
}
sum_marks + = sum_marks_in_parts [ i ] ;
}
2013-09-08 07:30:52 +00:00
if ( sum_marks > max_marks_to_use_cache )
use_uncompressed_cache = false ;
2012-12-06 09:45:09 +00:00
2012-11-28 08:52:15 +00:00
BlockInputStreams res ;
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( sum_marks > 0 )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t min_marks_per_thread = ( sum_marks - 1 ) / threads + 1 ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
for ( size_t i = 0 ; i < threads & & ! parts . empty ( ) ; + + i )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t need_marks = min_marks_per_thread ;
2012-11-30 08:33:36 +00:00
BlockInputStreams streams ;
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по кускам.
while ( need_marks > 0 & & ! parts . empty ( ) )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
RangesInDataPart & part = parts . back ( ) ;
size_t & marks_in_part = sum_marks_in_parts . back ( ) ;
/// Н е будем брать из куска слишком мало строк.
2012-12-06 13:07:29 +00:00
if ( marks_in_part > = min_marks_for_concurrent_read & &
need_marks < min_marks_for_concurrent_read )
need_marks = min_marks_for_concurrent_read ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Н е будем оставлять в куске слишком мало строк.
if ( marks_in_part > need_marks & &
2012-12-06 13:07:29 +00:00
marks_in_part - need_marks < min_marks_for_concurrent_read )
2012-12-06 09:45:09 +00:00
need_marks = marks_in_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Возьмем весь кусок, если он достаточно мал.
if ( marks_in_part < = need_marks )
2012-11-30 08:33:36 +00:00
{
2012-12-06 09:45:09 +00:00
/// Восстановим порядок отрезков.
std : : reverse ( part . ranges . begin ( ) , part . ranges . end ( ) ) ;
2013-09-08 05:53:10 +00:00
streams . push_back ( new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
part . data_part , part . ranges , thisPtr ( ) , use_uncompressed_cache ) ) ;
2012-12-06 09:45:09 +00:00
need_marks - = marks_in_part ;
parts . pop_back ( ) ;
sum_marks_in_parts . pop_back ( ) ;
2012-11-30 08:33:36 +00:00
continue ;
}
2012-12-06 09:45:09 +00:00
MarkRanges ranges_to_get_from_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по отрезкам куска.
while ( need_marks > 0 )
{
if ( part . ranges . empty ( ) )
throw Exception ( " Unexpected end of ranges while spreading marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
MarkRange & range = part . ranges . back ( ) ;
size_t marks_in_range = range . end - range . begin ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
size_t marks_to_get_from_range = std : : min ( marks_in_range , need_marks ) ;
ranges_to_get_from_part . push_back ( MarkRange ( range . begin , range . begin + marks_to_get_from_range ) ) ;
range . begin + = marks_to_get_from_range ;
marks_in_part - = marks_to_get_from_range ;
need_marks - = marks_to_get_from_range ;
if ( range . begin = = range . end )
part . ranges . pop_back ( ) ;
}
2012-11-30 08:33:36 +00:00
2013-09-08 05:53:10 +00:00
streams . push_back ( new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
part . data_part , ranges_to_get_from_part , thisPtr ( ) , use_uncompressed_cache ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( streams . size ( ) = = 1 )
res . push_back ( streams [ 0 ] ) ;
2012-11-29 08:41:20 +00:00
else
2012-11-30 08:33:36 +00:00
res . push_back ( new ConcatBlockInputStream ( streams ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
if ( ! parts . empty ( ) )
2012-12-03 09:39:04 +00:00
throw Exception ( " Couldn't spread marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 11:10:05 +00:00
return res ;
2012-07-21 03:45:48 +00:00
}
2013-04-24 11:39:12 +00:00
/// Распределить засечки между потоками и сделать, чтобы в ответе (почти) все данные были сколлапсированы (модификатор FINAL).
2013-09-08 05:53:10 +00:00
BlockInputStreams StorageMergeTree : : spreadMarkRangesAmongThreadsFinal (
RangesInDataParts parts , size_t threads , const Names & column_names , size_t max_block_size , bool use_uncompressed_cache )
2013-04-23 11:08:41 +00:00
{
2013-09-08 07:30:52 +00:00
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
for ( size_t j = 0 ; j < parts [ i ] . ranges . size ( ) ; + + j )
sum_marks + = parts [ i ] . ranges [ j ] . end - parts [ i ] . ranges [ j ] . begin ;
if ( sum_marks > max_marks_to_use_cache )
use_uncompressed_cache = false ;
2013-06-03 13:17:17 +00:00
ExpressionActionsPtr sign_filter_expression ;
2013-04-26 13:20:42 +00:00
String sign_filter_column ;
createPositiveSignCondition ( sign_filter_expression , sign_filter_column ) ;
2013-04-24 12:17:29 +00:00
BlockInputStreams to_collapse ;
2013-04-23 11:08:41 +00:00
for ( size_t part_index = 0 ; part_index < parts . size ( ) ; + + part_index )
{
RangesInDataPart & part = parts [ part_index ] ;
2013-09-08 05:53:10 +00:00
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream (
full_path + part . data_part - > name + ' / ' , max_block_size , column_names , * this ,
part . data_part , part . ranges , thisPtr ( ) , use_uncompressed_cache ) ;
2013-04-24 12:17:29 +00:00
2013-06-03 13:17:17 +00:00
to_collapse . push_back ( new ExpressionBlockInputStream ( source_stream , primary_expr ) ) ;
2013-04-23 11:08:41 +00:00
}
2013-05-20 15:48:38 +00:00
BlockInputStreams res ;
2013-04-24 12:17:29 +00:00
if ( to_collapse . size ( ) = = 1 )
2013-06-03 13:17:17 +00:00
res . push_back ( new FilterBlockInputStream ( new ExpressionBlockInputStream ( to_collapse [ 0 ] , sign_filter_expression ) , sign_filter_column ) ) ;
2013-04-24 12:17:29 +00:00
else if ( to_collapse . size ( ) > 1 )
2013-04-25 15:46:14 +00:00
res . push_back ( new CollapsingFinalBlockInputStream ( to_collapse , sort_descr , sign_column ) ) ;
2013-04-24 12:17:29 +00:00
return res ;
2013-04-23 11:08:41 +00:00
}
2013-06-03 13:17:17 +00:00
void StorageMergeTree : : createPositiveSignCondition ( ExpressionActionsPtr & out_expression , String & out_column )
2013-04-26 13:20:42 +00:00
{
ASTFunction * function = new ASTFunction ;
ASTPtr function_ptr = function ;
ASTExpressionList * arguments = new ASTExpressionList ;
ASTPtr arguments_ptr = arguments ;
ASTIdentifier * sign = new ASTIdentifier ;
ASTPtr sign_ptr = sign ;
ASTLiteral * one = new ASTLiteral ;
ASTPtr one_ptr = one ;
function - > name = " equals " ;
function - > arguments = arguments_ptr ;
function - > children . push_back ( arguments_ptr ) ;
arguments - > children . push_back ( sign_ptr ) ;
arguments - > children . push_back ( one_ptr ) ;
sign - > name = sign_column ;
sign - > kind = ASTIdentifier : : Column ;
one - > type = new DataTypeInt8 ;
one - > value = Field ( static_cast < Int64 > ( 1 ) ) ;
2013-06-03 13:17:17 +00:00
out_expression = ExpressionAnalyzer ( function_ptr , context , * columns ) . getActions ( false ) ;
2013-04-26 13:20:42 +00:00
out_column = function - > getColumnName ( ) ;
}
2013-08-11 03:40:14 +00:00
String StorageMergeTree : : getPartName ( DayNum_t left_date , DayNum_t right_date , UInt64 left_id , UInt64 right_id , UInt64 level )
2012-07-17 20:04:39 +00:00
{
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
2012-07-17 20:04:39 +00:00
String res ;
{
2013-08-11 03:40:14 +00:00
unsigned left_date_id = Date2OrderedIdentifier ( date_lut . fromDayNum ( left_date ) ) ;
unsigned right_date_id = Date2OrderedIdentifier ( date_lut . fromDayNum ( right_date ) ) ;
2012-07-17 20:04:39 +00:00
WriteBufferFromString wb ( res ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( left_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( right_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
writeIntText ( left_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( right_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( level , wb ) ;
}
return res ;
}
2012-07-19 20:32:10 +00:00
void StorageMergeTree : : loadDataParts ( )
{
LOG_DEBUG ( log , " Loading data parts " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2012-08-10 20:04:34 +00:00
data_parts . clear ( ) ;
2012-07-19 20:32:10 +00:00
Poco : : DirectoryIterator end ;
Poco : : RegularExpression : : MatchVec matches ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
std : : string file_name = it . name ( ) ;
2013-09-15 03:44:32 +00:00
/// Удаляем временные директории старше суток.
if ( 0 = = file_name . compare ( 0 , strlen ( " tmp_ " ) , " tmp_ " ) )
{
Poco : : File tmp_dir ( full_path + file_name ) ;
if ( tmp_dir . isDirectory ( ) & & tmp_dir . getLastModified ( ) . epochTime ( ) + 86400 < time ( 0 ) )
{
LOG_WARNING ( log , " Removing temporary directory " < < full_path < < file_name ) ;
Poco : : File ( full_path + file_name ) . remove ( true ) ;
}
continue ;
}
2013-08-09 00:12:59 +00:00
if ( ! isPartDirectory ( file_name , matches ) )
2012-07-19 20:32:10 +00:00
continue ;
2013-09-15 03:44:32 +00:00
/// Удалить битые куски, которые могут образовываться после г р у б о г о перезапуска сервера.
if ( removeIfBroken ( full_path + file_name ) )
continue ;
2012-07-19 20:32:10 +00:00
2012-07-31 20:03:53 +00:00
DataPartPtr part = new DataPart ( * this ) ;
2013-08-11 03:40:14 +00:00
part - > left_date = date_lut . toDayNum ( OrderedIdentifier2Date ( file_name . substr ( matches [ 1 ] . offset , matches [ 1 ] . length ) ) ) ;
part - > right_date = date_lut . toDayNum ( OrderedIdentifier2Date ( file_name . substr ( matches [ 2 ] . offset , matches [ 2 ] . length ) ) ) ;
2013-06-21 21:05:16 +00:00
part - > left = parse < UInt64 > ( file_name . substr ( matches [ 3 ] . offset , matches [ 3 ] . length ) ) ;
part - > right = parse < UInt64 > ( file_name . substr ( matches [ 4 ] . offset , matches [ 4 ] . length ) ) ;
part - > level = parse < UInt32 > ( file_name . substr ( matches [ 5 ] . offset , matches [ 5 ] . length ) ) ;
2012-07-23 06:23:29 +00:00
part - > name = file_name ;
2012-07-19 20:32:10 +00:00
/// Размер - в количестве засечек.
2012-07-23 06:23:29 +00:00
part - > size = Poco : : File ( full_path + file_name + " / " + escapeForFileName ( columns - > front ( ) . first ) + " .mrk " ) . getSize ( )
2012-07-19 20:32:10 +00:00
/ MERGE_TREE_MARK_SIZE ;
2012-07-23 06:23:29 +00:00
part - > modification_time = it - > getLastModified ( ) . epochTime ( ) ;
2012-07-19 20:32:10 +00:00
2012-08-31 20:38:05 +00:00
part - > left_month = date_lut . toFirstDayNumOfMonth ( part - > left_date ) ;
part - > right_month = date_lut . toFirstDayNumOfMonth ( part - > right_date ) ;
2012-07-19 20:32:10 +00:00
2012-08-10 20:04:34 +00:00
data_parts . insert ( part ) ;
2012-07-19 20:32:10 +00:00
}
2012-08-10 20:04:34 +00:00
all_data_parts = data_parts ;
2012-07-31 20:03:53 +00:00
2012-07-31 20:13:14 +00:00
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* н о п о к а к и м - т о п р и ч и н а м о с т а л и с ь л е ж а т ь в ф а й л о в о й с и с т е м е .
* У д а л е н и е ф а й л о в б у д е т п р о и з в е д е н о п о т о м в м е т о д е clearOldParts .
*/
2012-08-10 20:04:34 +00:00
if ( data_parts . size ( ) > = 2 )
2012-07-31 20:13:14 +00:00
{
2012-08-10 20:04:34 +00:00
DataParts : : iterator prev_jt = data_parts . begin ( ) ;
2012-08-07 20:37:45 +00:00
DataParts : : iterator curr_jt = prev_jt ;
+ + curr_jt ;
2012-08-10 20:04:34 +00:00
while ( curr_jt ! = data_parts . end ( ) )
2012-07-31 20:13:14 +00:00
{
2012-08-07 20:37:45 +00:00
/// Куски данных за разные месяцы рассматривать не будем
if ( ( * curr_jt ) - > left_month ! = ( * curr_jt ) - > right_month
| | ( * curr_jt ) - > right_month ! = ( * prev_jt ) - > left_month
| | ( * prev_jt ) - > left_month ! = ( * prev_jt ) - > right_month )
{
+ + prev_jt ;
+ + curr_jt ;
continue ;
}
2012-07-31 20:13:14 +00:00
2012-08-07 20:37:45 +00:00
if ( ( * curr_jt ) - > contains ( * * prev_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * curr_jt ) - > name < < " contains " < < ( * prev_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( prev_jt ) ;
2012-08-07 20:37:45 +00:00
prev_jt = curr_jt ;
+ + curr_jt ;
}
else if ( ( * prev_jt ) - > contains ( * * curr_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * prev_jt ) - > name < < " contains " < < ( * curr_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( curr_jt + + ) ;
2012-08-07 20:37:45 +00:00
}
else
{
+ + prev_jt ;
+ + curr_jt ;
}
2012-07-31 20:13:14 +00:00
}
}
2012-07-31 20:03:53 +00:00
2012-08-10 20:04:34 +00:00
LOG_DEBUG ( log , " Loaded data parts ( " < < data_parts . size ( ) < < " items) " ) ;
2012-07-19 20:32:10 +00:00
}
2012-07-23 06:23:29 +00:00
void StorageMergeTree : : clearOldParts ( )
{
Poco : : ScopedTry < Poco : : FastMutex > lock ;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if ( ! lock . lock ( & all_data_parts_mutex ) )
{
LOG_TRACE ( log , " Already clearing or modifying old parts " ) ;
return ;
}
LOG_TRACE ( log , " Clearing old parts " ) ;
for ( DataParts : : iterator it = all_data_parts . begin ( ) ; it ! = all_data_parts . end ( ) ; )
{
2012-07-31 18:25:16 +00:00
int ref_count = it - > referenceCount ( ) ;
LOG_TRACE ( log , ( * it ) - > name < < " : ref_count = " < < ref_count ) ;
2012-08-10 20:04:34 +00:00
if ( ref_count = = 1 ) /// После этого ref_count не может увеличиться.
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Removing part " < < ( * it ) - > name ) ;
( * it ) - > remove ( ) ;
all_data_parts . erase ( it + + ) ;
}
else
+ + it ;
}
}
2012-09-10 19:05:06 +00:00
void StorageMergeTree : : merge ( size_t iterations , bool async )
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
bool while_can = false ;
2013-09-13 23:52:10 +00:00
if ( iterations = = 0 )
{
2012-11-28 08:52:15 +00:00
while_can = true ;
iterations = settings . merging_threads ;
2012-08-01 20:08:59 +00:00
}
2012-11-28 17:17:17 +00:00
for ( size_t i = 0 ; i < iterations ; + + i )
2012-11-28 08:52:15 +00:00
merge_threads - > schedule ( boost : : bind ( & StorageMergeTree : : mergeThread , this , while_can ) ) ;
if ( ! async )
joinMergeThreads ( ) ;
2012-08-13 19:13:11 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeThread ( bool while_can )
2012-08-13 19:13:11 +00:00
{
try
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
std : : vector < DataPartPtr > parts ;
2013-02-14 11:22:56 +00:00
while ( selectPartsToMerge ( parts , false ) | |
selectPartsToMerge ( parts , true ) )
2012-08-13 19:13:11 +00:00
{
2012-11-28 08:52:15 +00:00
mergeParts ( parts ) ;
2012-08-13 19:13:11 +00:00
/// Удаляем старые куски.
2012-11-28 08:52:15 +00:00
parts . clear ( ) ;
2012-08-13 19:13:11 +00:00
clearOldParts ( ) ;
2012-11-28 08:52:15 +00:00
if ( ! while_can )
break ;
2012-08-13 19:13:11 +00:00
}
}
catch ( const Exception & e )
{
LOG_ERROR ( log , " Code: " < < e . code ( ) < < " . " < < e . displayText ( ) < < std : : endl
< < std : : endl
< < " Stack trace: " < < std : : endl
< < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
LOG_ERROR ( log , " Poco::Exception: " < < e . code ( ) < < " . " < < e . displayText ( ) ) ;
}
catch ( const std : : exception & e )
{
LOG_ERROR ( log , " std::exception: " < < e . what ( ) ) ;
}
catch ( . . . )
{
LOG_ERROR ( log , " Unknown exception " ) ;
}
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : joinMergeThreads ( )
{
2013-09-14 05:14:22 +00:00
LOG_DEBUG ( log , " Waiting for merge threads to finish. " ) ;
2012-11-28 08:52:15 +00:00
merge_threads - > wait ( ) ;
}
2012-11-29 10:50:17 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
2012-11-29 17:43:23 +00:00
/// При max_parts_to_merge_at_once >= log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts),
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_rows_to_merge_parts).
2012-11-29 12:24:08 +00:00
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
/// Из всех таких выбираем отрезок с минимальным минимумом размера.
/// Из всех таких выбираем отрезок с максимальной длиной.
2013-02-14 11:22:56 +00:00
bool StorageMergeTree : : selectPartsToMerge ( std : : vector < DataPartPtr > & parts , bool merge_anything_for_old_months )
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Selecting parts to merge " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
2012-07-23 06:23:29 +00:00
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2013-02-14 11:22:56 +00:00
2012-11-29 11:32:29 +00:00
size_t min_max = - 1U ;
size_t min_min = - 1U ;
int max_len = 0 ;
2012-11-29 10:50:17 +00:00
DataParts : : iterator best_begin ;
bool found = false ;
2013-02-14 11:22:56 +00:00
2013-08-11 03:40:14 +00:00
DayNum_t now_day = date_lut . toDayNum ( time ( 0 ) ) ;
DayNum_t now_month = date_lut . toFirstDayNumOfMonth ( now_day ) ;
2012-11-29 12:24:08 +00:00
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
2012-11-29 12:26:34 +00:00
int max_count_from_left = 0 ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Левый конец отрезка.
2012-11-29 10:50:17 +00:00
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & first_part = * it ;
2012-11-29 12:26:34 +00:00
max_count_from_left = std : : max ( 0 , max_count_from_left - 1 ) ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Кусок не занят и достаточно мал.
if ( first_part - > currently_merging | |
first_part - > size * index_granularity > settings . max_rows_to_merge_parts )
continue ;
/// Кусок в одном месяце.
if ( first_part - > left_month ! = first_part - > right_month )
{
LOG_WARNING ( log , " Part " < < first_part - > name < < " spans more than one month " ) ;
2012-11-29 10:50:17 +00:00
continue ;
2012-11-29 12:24:08 +00:00
}
/// Самый длинный валидный отрезок, начинающийся здесь.
2012-11-29 16:39:29 +00:00
size_t cur_longest_max = - 1U ;
size_t cur_longest_min = - 1U ;
2012-11-29 12:24:08 +00:00
int cur_longest_len = 0 ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part - > size ;
size_t cur_min = first_part - > size ;
size_t cur_sum = first_part - > size ;
2012-11-29 10:50:17 +00:00
int cur_len = 1 ;
2013-08-11 03:40:14 +00:00
DayNum_t month = first_part - > left_month ;
2012-11-29 12:24:08 +00:00
UInt64 cur_id = first_part - > right ;
2012-11-29 10:50:17 +00:00
2013-02-14 11:22:56 +00:00
/// Этот месяц кончился хотя бы день назад.
bool is_old_month = now_day - now_month > = 1 & & now_month > month ;
2012-11-29 12:24:08 +00:00
/// Правый конец отрезка.
2012-11-29 10:50:17 +00:00
DataParts : : iterator jt = it ;
2012-11-29 11:32:29 +00:00
for ( + + jt ; jt ! = data_parts . end ( ) & & cur_len < static_cast < int > ( settings . max_parts_to_merge_at_once ) ; + + jt )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & last_part = * jt ;
/// Кусок не занят, достаточно мал и в одном правильном месяце.
if ( last_part - > currently_merging | |
last_part - > size * index_granularity > settings . max_rows_to_merge_parts | |
last_part - > left_month ! = last_part - > right_month | |
last_part - > left_month ! = month )
2012-11-29 10:50:17 +00:00
break ;
2012-11-29 12:24:08 +00:00
/// Кусок правее предыдущего.
2012-11-30 00:52:45 +00:00
if ( last_part - > left < cur_id )
2012-11-29 12:24:08 +00:00
{
LOG_WARNING ( log , " Part " < < last_part - > name < < " intersects previous part " ) ;
break ;
}
cur_max = std : : max ( cur_max , last_part - > size ) ;
cur_min = std : : min ( cur_min , last_part - > size ) ;
cur_sum + = last_part - > size ;
2012-11-29 10:50:17 +00:00
+ + cur_len ;
2012-11-29 12:24:08 +00:00
cur_id = last_part - > right ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
2012-11-29 10:50:17 +00:00
if ( cur_len > = 2 & &
2013-02-14 11:22:56 +00:00
( static_cast < double > ( cur_max ) / ( cur_sum - cur_max ) < settings . max_size_ratio_to_merge_parts | |
( is_old_month & & merge_anything_for_old_months ) ) ) /// З а старый месяц объединяем что угодно, если разрешено.
2012-11-29 12:24:08 +00:00
{
cur_longest_max = cur_max ;
cur_longest_min = cur_min ;
cur_longest_len = cur_len ;
}
}
2012-11-29 12:26:34 +00:00
/// Это максимальный по включению валидный отрезок.
2012-11-29 12:24:08 +00:00
if ( cur_longest_len > max_count_from_left )
{
max_count_from_left = cur_longest_len ;
if ( ! found | |
std : : make_pair ( std : : make_pair ( cur_longest_max , cur_longest_min ) , - cur_longest_len ) <
std : : make_pair ( std : : make_pair ( min_max , min_min ) , - max_len ) )
2012-11-29 10:50:17 +00:00
{
found = true ;
2012-11-29 12:24:08 +00:00
min_max = cur_longest_max ;
min_min = cur_longest_min ;
max_len = cur_longest_len ;
2012-11-29 10:50:17 +00:00
best_begin = it ;
}
2012-07-23 06:23:29 +00:00
}
}
2012-11-29 10:50:17 +00:00
if ( found )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . clear ( ) ;
DataParts : : iterator it = best_begin ;
for ( int i = 0 ; i < max_len ; + + i )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . push_back ( * it ) ;
2012-11-29 11:48:27 +00:00
parts . back ( ) - > currently_merging = true ;
2012-11-29 10:50:17 +00:00
+ + it ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
2012-11-30 02:01:02 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
else
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
LOG_DEBUG ( log , " No parts to merge " ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
return found ;
2012-07-23 06:23:29 +00:00
}
2013-04-24 10:31:32 +00:00
/// parts должны быть отсортированы.
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeParts ( std : : vector < DataPartPtr > parts )
2012-07-23 06:23:29 +00:00
{
2012-11-30 02:01:02 +00:00
LOG_DEBUG ( log , " Merging " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Names all_column_names ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
all_column_names . push_back ( it - > first ) ;
2012-07-30 20:32:36 +00:00
2013-08-11 03:40:14 +00:00
DateLUTSingleton & date_lut = DateLUTSingleton : : instance ( ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
StorageMergeTree : : DataPartPtr new_data_part = new DataPart ( * this ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : numeric_limits < UInt16 > : : max ( ) ;
new_data_part - > right_date = std : : numeric_limits < UInt16 > : : min ( ) ;
2012-11-30 02:01:02 +00:00
new_data_part - > left = parts . front ( ) - > left ;
2012-11-28 08:52:15 +00:00
new_data_part - > right = parts . back ( ) - > right ;
new_data_part - > level = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
new_data_part - > level = std : : max ( new_data_part - > level , parts [ i ] - > level ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : min ( new_data_part - > left_date , parts [ i ] - > left_date ) ;
new_data_part - > right_date = std : : max ( new_data_part - > right_date , parts [ i ] - > right_date ) ;
2012-11-28 08:52:15 +00:00
}
+ + new_data_part - > level ;
2012-08-13 19:13:11 +00:00
new_data_part - > name = getPartName (
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-13 19:13:11 +00:00
2012-11-29 16:39:29 +00:00
/** Читаем из всех кусков, сливаем и пишем в новый.
2012-08-13 19:13:11 +00:00
* П о п у т н о в ы ч и с л я е м в ы р а ж е н и е д л я с о р т и р о в к и .
*/
BlockInputStreams src_streams ;
2012-07-30 20:32:36 +00:00
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 09:45:09 +00:00
MarkRanges ranges ( 1 , MarkRange ( 0 , parts [ i ] - > size ) ) ;
2013-06-03 13:17:17 +00:00
src_streams . push_back ( new ExpressionBlockInputStream ( new MergeTreeBlockInputStream (
2013-09-13 23:28:40 +00:00
full_path + parts [ i ] - > name + ' / ' , DEFAULT_MERGE_BLOCK_SIZE , all_column_names , * this , parts [ i ] , ranges , StoragePtr ( ) , false ) , primary_expr ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-07-30 20:32:36 +00:00
2013-06-18 09:43:35 +00:00
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
2013-04-24 10:31:32 +00:00
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска, то есть (примерного) возрастания времени вставки.
2012-11-30 00:52:45 +00:00
BlockInputStreamPtr merged_stream = sign_column . empty ( )
2013-09-13 23:28:40 +00:00
? new MergingSortedBlockInputStream ( src_streams , sort_descr , DEFAULT_MERGE_BLOCK_SIZE )
: new CollapsingSortedBlockInputStream ( src_streams , sort_descr , sign_column , DEFAULT_MERGE_BLOCK_SIZE ) ;
2012-08-13 20:16:06 +00:00
2012-12-03 08:52:58 +00:00
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream ( * this ,
2012-11-28 08:52:15 +00:00
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
copyData ( * merged_stream , * to ) ;
2012-07-31 20:03:53 +00:00
2012-12-03 08:52:58 +00:00
new_data_part - > size = to - > marksCount ( ) ;
2012-08-13 19:13:11 +00:00
new_data_part - > modification_time = time ( 0 ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
/// Добавляем новый кусок в набор.
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
if ( data_parts . end ( ) = = data_parts . find ( parts [ i ] ) )
throw Exception ( " Logical error: cannot find data part " + parts [ i ] - > name + " in list " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
data_parts . insert ( new_data_part ) ;
all_data_parts . insert ( new_data_part ) ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
data_parts . erase ( data_parts . find ( parts [ i ] ) ) ;
}
2012-07-31 17:07:20 +00:00
}
2012-08-13 19:13:11 +00:00
2012-12-13 20:28:10 +00:00
LOG_TRACE ( log , " Merged " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-08-16 18:17:01 +00:00
2013-01-23 11:16:32 +00:00
void StorageMergeTree : : rename ( const String & new_path_to_db , const String & new_name )
{
joinMergeThreads ( ) ;
std : : string new_full_path = new_path_to_db + escapeForFileName ( new_name ) + ' / ' ;
Poco : : File ( full_path ) . renameTo ( new_full_path ) ;
path = new_path_to_db ;
full_path = new_full_path ;
name = new_name ;
increment . setPath ( full_path + " increment.txt " ) ;
}
2013-01-23 17:43:19 +00:00
void StorageMergeTree : : dropImpl ( )
2012-08-16 18:17:01 +00:00
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-08-16 18:17:01 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
data_parts . clear ( ) ;
all_data_parts . clear ( ) ;
Poco : : File ( full_path ) . remove ( true ) ;
}
2013-08-14 16:52:40 +00:00
/// одинаковыми считаются имена, если они совпадают целиком или nameWithoutDot совпадает с частью имени до точки
bool namesEqual ( const String & nameWithoutDot , const DB : : NameAndTypePair & name_type )
2013-08-08 09:50:15 +00:00
{
2013-08-14 16:52:40 +00:00
String nameWithDot = nameWithoutDot + " . " ;
return ( nameWithDot = = name_type . first . substr ( 0 , nameWithoutDot . length ( ) + 1 ) | | nameWithoutDot = = name_type . first ) ;
2013-08-08 09:50:15 +00:00
}
2013-08-09 00:12:59 +00:00
2013-08-08 09:50:15 +00:00
void StorageMergeTree : : removeColumn ( String column_name )
2013-08-07 13:07:42 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2013-08-14 16:52:40 +00:00
/// Удаляем колонки из листа columns
bool is_first = true ;
NamesAndTypesList : : iterator column_it ;
do
{
column_it = std : : find_if ( columns - > begin ( ) , columns - > end ( ) , boost : : bind ( namesEqual , column_name , _1 ) ) ;
2013-08-08 09:50:15 +00:00
2013-08-14 16:52:40 +00:00
if ( column_it = = columns - > end ( ) )
{
if ( is_first )
throw DB : : Exception ( " Wrong column name. Cannot find column to drop " , DB : : ErrorCodes : : ILLEGAL_COLUMN ) ;
}
else
columns - > erase ( column_it ) ;
is_first = false ;
}
while ( column_it ! = columns - > end ( ) ) ;
/// Регэксп выбирает файлы столбца для удаления
Poco : : RegularExpression re ( column_name + " (?:(?: \\ .| \\ %2E).+){0,1} " + " (?: \\ .mrk| \\ .bin| \\ .size \\ d+ \\ .bin| \\ .size \\ d+ \\ .mrk) " ) ;
/// Цикл по всем директориям кусочков
2013-08-09 00:12:59 +00:00
Poco : : RegularExpression : : MatchVec matches ;
Poco : : DirectoryIterator end ;
2013-08-14 16:52:40 +00:00
for ( Poco : : DirectoryIterator it_dir = Poco : : DirectoryIterator ( full_path ) ; it_dir ! = end ; + + it_dir )
2013-08-07 13:07:42 +00:00
{
2013-08-14 16:52:40 +00:00
std : : string dir_name = it_dir . name ( ) ;
2013-08-07 13:07:42 +00:00
2013-08-14 16:52:40 +00:00
if ( ! isPartDirectory ( dir_name , matches ) )
2013-08-07 13:07:42 +00:00
continue ;
2013-08-14 16:52:40 +00:00
/// Цикл по каждому из файлов в директории кусочков
String full_dir_name = full_path + dir_name + " / " ;
for ( Poco : : DirectoryIterator it_file ( full_dir_name ) ; it_file ! = end ; + + it_file )
2013-08-09 00:12:59 +00:00
{
2013-08-14 16:52:40 +00:00
if ( re . match ( it_file . name ( ) ) )
{
Poco : : File file ( full_dir_name + it_file . name ( ) ) ;
if ( file . exists ( ) )
file . remove ( ) ;
}
2013-08-09 00:12:59 +00:00
}
2013-08-07 13:07:42 +00:00
}
}
2013-08-08 09:50:15 +00:00
2013-08-09 00:12:59 +00:00
void StorageMergeTree : : alter ( const ASTAlterQuery : : Parameters & params )
2013-08-07 13:07:42 +00:00
{
if ( params . type = = ASTAlterQuery : : ADD )
2013-08-08 09:50:15 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
NamesAndTypesList : : iterator insert_it = columns - > end ( ) ;
if ( params . column )
{
String column_name = dynamic_cast < const ASTIdentifier & > ( * params . column ) . name ;
2013-08-14 16:52:40 +00:00
/// Пытаемся найти первую с конца колонку с именем column_name или column_name.*
NamesAndTypesList : : reverse_iterator reverse_insert_it = std : : find_if ( columns - > rbegin ( ) , columns - > rend ( ) , boost : : bind ( namesEqual , column_name , _1 ) ) ;
if ( reverse_insert_it = = columns - > rend ( ) )
2013-08-08 09:50:15 +00:00
throw DB : : Exception ( " Wrong column name. Cannot find column to insert after " , DB : : ErrorCodes : : ILLEGAL_COLUMN ) ;
else
2013-08-14 16:52:40 +00:00
{
/// base возвращает итератор уже смещенный на один элемент вправо
insert_it = reverse_insert_it . base ( ) ;
}
2013-08-08 09:50:15 +00:00
}
2013-08-09 00:12:59 +00:00
const ASTNameTypePair & ast_name_type = dynamic_cast < const ASTNameTypePair & > ( * params . name_type ) ;
2013-08-08 09:50:15 +00:00
StringRange type_range = ast_name_type . type - > range ;
String type_string = String ( type_range . first , type_range . second - type_range . first ) ;
2013-08-14 16:52:40 +00:00
DB : : DataTypePtr data_type = context . getDataTypeFactory ( ) . get ( type_string ) ;
NameAndTypePair pair ( ast_name_type . name , data_type ) ;
2013-08-08 09:50:15 +00:00
columns - > insert ( insert_it , pair ) ;
2013-08-14 16:52:40 +00:00
/// Медленно, так как каждый раз копируется список
columns = DataTypeNested : : expandNestedColumns ( * columns ) ;
2013-08-07 13:07:42 +00:00
return ;
2013-08-08 09:50:15 +00:00
}
2013-08-09 00:12:59 +00:00
else if ( params . type = = ASTAlterQuery : : DROP )
2013-08-07 13:07:42 +00:00
{
2013-08-09 00:12:59 +00:00
String column_name = dynamic_cast < const ASTIdentifier & > ( * params . column ) . name ;
2013-08-08 09:50:15 +00:00
removeColumn ( column_name ) ;
2013-08-07 13:07:42 +00:00
}
else
2013-08-09 00:12:59 +00:00
throw Exception ( " Wrong parameter type in ALTER query " , ErrorCodes : : LOGICAL_ERROR ) ;
2013-08-07 13:07:42 +00:00
}
2013-08-09 00:12:59 +00:00
bool StorageMergeTree : : isPartDirectory ( const String & dir_name , Poco : : RegularExpression : : MatchVec & matches ) const
2013-08-07 13:07:42 +00:00
{
return ( file_name_regexp . match ( dir_name , 0 , matches ) & & 6 = = matches . size ( ) ) ;
}
2013-08-09 00:12:59 +00:00
2013-09-15 03:44:32 +00:00
bool StorageMergeTree : : removeIfBroken ( const String & path )
{
/// Проверяем, что первичный ключ непуст.
Poco : : File index_file ( path + " /primary.idx " ) ;
if ( ! index_file . exists ( ) | | index_file . getSize ( ) = = 0 )
{
LOG_ERROR ( log , " Part " < < path < < " is broken: primary key is empty. Removing. " ) ;
Poco : : File ( path ) . remove ( true ) ;
return true ;
}
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = - 1 ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
{
Poco : : File marks_file ( path + " / " + escapeForFileName ( it - > first ) + " .mrk " ) ;
if ( ! marks_file . exists ( ) ) /// Возможно, логическая ошибка. Н е будем ничего удалять.
throw Exception ( " File " + marks_file . path ( ) + " doesn't exist. " , ErrorCodes : : FILE_DOESNT_EXIST ) ;
if ( marks_size = = - 1 )
{
marks_size = marks_file . getSize ( ) ;
if ( 0 = = marks_size )
{
LOG_ERROR ( log , " Part " < < path < < " is broken: " < < marks_file . path ( ) < < " is empty. Removing. " ) ;
Poco : : File ( path ) . remove ( true ) ;
return true ;
}
}
else
{
if ( static_cast < ssize_t > ( marks_file . getSize ( ) ) ! = marks_size )
{
LOG_ERROR ( log , " Part " < < path < < " is broken: marks have different sizes. Removing. " ) ;
Poco : : File ( path ) . remove ( true ) ;
return true ;
}
}
}
return false ;
}
2012-07-17 20:04:39 +00:00
}