2012-07-18 19:16:16 +00:00
# include <boost/bind.hpp>
2012-11-28 08:52:15 +00:00
# include <numeric>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
# include <Poco/DirectoryIterator.h>
# include <Poco/NumberParser.h>
2012-11-28 08:52:15 +00:00
# include <Poco/Ext/scopedTry.h>
2012-07-19 20:32:10 +00:00
# include <Yandex/time2str.h>
2012-07-17 20:04:39 +00:00
# include <DB/Common/escapeForFileName.h>
# include <DB/IO/WriteBufferFromString.h>
# include <DB/IO/WriteBufferFromFile.h>
# include <DB/IO/CompressedWriteBuffer.h>
2012-07-21 03:45:48 +00:00
# include <DB/IO/ReadBufferFromString.h>
2012-07-19 20:32:10 +00:00
# include <DB/IO/ReadBufferFromFile.h>
# include <DB/IO/CompressedReadBuffer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Columns/ColumnsNumber.h>
2012-08-30 17:43:31 +00:00
# include <DB/Columns/ColumnArray.h>
2012-07-17 20:04:39 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/DataTypes/DataTypesNumberFixed.h>
2012-08-30 17:43:31 +00:00
# include <DB/DataTypes/DataTypeArray.h>
2012-07-21 03:45:48 +00:00
2012-07-19 20:32:10 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/MergingSortedBlockInputStream.h>
2012-08-16 17:27:40 +00:00
# include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2012-07-31 17:22:40 +00:00
# include <DB/DataStreams/ExpressionBlockInputStream.h>
2012-11-28 08:52:15 +00:00
# include <DB/DataStreams/ConcatBlockInputStream.h>
2012-07-21 06:47:17 +00:00
# include <DB/DataStreams/narrowBlockInputStreams.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/copyData.h>
2012-12-12 14:25:55 +00:00
# include <DB/DataStreams/FilterBlockInputStream.h>
2012-07-19 20:32:10 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/Parsers/ASTExpressionList.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTFunction.h>
# include <DB/Parsers/ASTLiteral.h>
2012-12-12 14:25:55 +00:00
# include <DB/Parsers/ASTIdentifier.h>
2012-07-21 03:45:48 +00:00
2012-07-17 20:04:39 +00:00
# include <DB/Interpreters/sortBlock.h>
# include <DB/Storages/StorageMergeTree.h>
2012-12-05 12:44:55 +00:00
# include <DB/Storages/PkCondition.h>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
# define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
2012-07-17 20:04:39 +00:00
namespace DB
{
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public :
2012-08-30 17:43:31 +00:00
MergeTreeBlockOutputStream ( StorageMergeTree & storage_ ) : storage ( storage_ ) , flags ( O_TRUNC | O_CREAT | O_WRONLY )
2012-07-17 20:04:39 +00:00
{
}
void write ( const Block & block )
{
storage . check ( block ) ;
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
size_t rows = block . rows ( ) ;
size_t columns = block . columns ( ) ;
/// Достаём столбец с датой.
2012-07-18 19:16:16 +00:00
const ColumnUInt16 : : Container_t & dates =
dynamic_cast < const ColumnUInt16 & > ( * block . getByName ( storage . date_column_name ) . column ) . getData ( ) ;
2012-07-17 20:04:39 +00:00
/// Минимальная и максимальная дата.
UInt16 min_date = std : : numeric_limits < UInt16 > : : max ( ) ;
UInt16 max_date = std : : numeric_limits < UInt16 > : : min ( ) ;
for ( ColumnUInt16 : : Container_t : : const_iterator it = dates . begin ( ) ; it ! = dates . end ( ) ; + + it )
{
if ( * it < min_date )
min_date = * it ;
if ( * it > max_date )
max_date = * it ;
}
/// Разделяем на блоки по месяцам. Для каждого ещё посчитаем минимальную и максимальную дату.
typedef std : : map < UInt16 , BlockWithDateInterval > BlocksByMonth ;
BlocksByMonth blocks_by_month ;
2012-08-31 20:38:05 +00:00
UInt16 min_month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( min_date ) ) ;
UInt16 max_month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( max_date ) ) ;
2012-07-17 20:04:39 +00:00
/// Типичный случай - когда месяц один (ничего разделять не нужно).
if ( min_month = = max_month )
blocks_by_month [ min_month ] = BlockWithDateInterval ( block , min_date , max_date ) ;
else
{
for ( size_t i = 0 ; i < rows ; + + i )
{
2012-08-31 20:38:05 +00:00
UInt16 month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( dates [ i ] ) ) ;
2012-08-31 18:40:21 +00:00
2012-07-17 20:04:39 +00:00
BlockWithDateInterval & block_for_month = blocks_by_month [ month ] ;
if ( ! block_for_month . block )
block_for_month . block = block . cloneEmpty ( ) ;
if ( dates [ i ] < block_for_month . min_date )
block_for_month . min_date = dates [ i ] ;
if ( dates [ i ] > block_for_month . max_date )
block_for_month . max_date = dates [ i ] ;
for ( size_t j = 0 ; j < columns ; + + j )
block_for_month . block . getByPosition ( j ) . column - > insert ( ( * block . getByPosition ( j ) . column ) [ i ] ) ;
}
}
/// Для каждого месяца.
2012-07-18 19:16:16 +00:00
for ( BlocksByMonth : : iterator it = blocks_by_month . begin ( ) ; it ! = blocks_by_month . end ( ) ; + + it )
2012-07-17 20:04:39 +00:00
writePart ( it - > second . block , it - > second . min_date , it - > second . max_date ) ;
}
BlockOutputStreamPtr clone ( ) { return new MergeTreeBlockOutputStream ( storage ) ; }
private :
StorageMergeTree & storage ;
2012-08-30 17:43:31 +00:00
const int flags ;
2012-07-17 20:04:39 +00:00
struct BlockWithDateInterval
{
Block block ;
UInt16 min_date ;
UInt16 max_date ;
BlockWithDateInterval ( ) : min_date ( std : : numeric_limits < UInt16 > : : max ( ) ) , max_date ( 0 ) { }
2012-07-18 19:16:16 +00:00
BlockWithDateInterval ( const Block & block_ , UInt16 min_date_ , UInt16 max_date_ )
: block ( block_ ) , min_date ( min_date_ ) , max_date ( max_date_ ) { }
2012-07-17 20:04:39 +00:00
} ;
void writePart ( Block & block , UInt16 min_date , UInt16 max_date )
{
2012-07-21 07:21:41 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-18 19:16:16 +00:00
size_t rows = block . rows ( ) ;
2012-07-17 20:04:39 +00:00
size_t columns = block . columns ( ) ;
UInt64 part_id = storage . increment . get ( true ) ;
2012-07-21 07:21:41 +00:00
String part_name = storage . getPartName (
Yandex : : DayNum_t ( min_date ) , Yandex : : DayNum_t ( max_date ) ,
part_id , part_id , 0 ) ;
2012-07-18 19:16:16 +00:00
2012-07-21 07:21:41 +00:00
String part_tmp_path = storage . full_path + " tmp_ " + part_name + " / " ;
String part_res_path = storage . full_path + part_name + " / " ;
2012-07-17 20:04:39 +00:00
Poco : : File ( part_tmp_path ) . createDirectories ( ) ;
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Calculating primary expression. " ) ;
2012-07-17 20:04:39 +00:00
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
2012-07-18 20:14:41 +00:00
storage . primary_expr - > execute ( block ) ;
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Sorting by primary key. " ) ;
2012-07-17 20:04:39 +00:00
/// Сортируем.
sortBlock ( block , storage . sort_descr ) ;
/// Наконец-то можно писать данные на диск.
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Writing index. " ) ;
2012-07-18 19:16:16 +00:00
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
{
WriteBufferFromFile index ( part_tmp_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
typedef std : : vector < const ColumnWithNameAndType * > PrimaryColumns ;
PrimaryColumns primary_columns ;
for ( size_t i = 0 , size = storage . sort_descr . size ( ) ; i < size ; + + i )
primary_columns . push_back (
! storage . sort_descr [ i ] . column_name . empty ( )
? & block . getByName ( storage . sort_descr [ i ] . column_name )
: & block . getByPosition ( storage . sort_descr [ i ] . column_number ) ) ;
for ( size_t i = 0 ; i < rows ; i + = storage . index_granularity )
for ( PrimaryColumns : : const_iterator it = primary_columns . begin ( ) ; it ! = primary_columns . end ( ) ; + + it )
( * it ) - > type - > serializeBinary ( ( * ( * it ) - > column ) [ i ] , index ) ;
}
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Writing data. " ) ;
2012-07-17 20:04:39 +00:00
for ( size_t i = 0 ; i < columns ; + + i )
{
const ColumnWithNameAndType & column = block . getByPosition ( i ) ;
2012-08-30 17:43:31 +00:00
writeData ( part_tmp_path , column . name , * column . type , * column . column ) ;
2012-07-18 19:16:16 +00:00
}
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Renaming. " ) ;
2012-07-17 20:04:39 +00:00
2012-07-18 19:16:16 +00:00
/// Переименовываем кусок.
Poco : : File ( part_tmp_path ) . renameTo ( part_res_path ) ;
2012-07-21 07:21:41 +00:00
/// Добавляем новый кусок в набор.
2012-07-23 06:23:29 +00:00
{
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( storage . data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( storage . all_data_parts_mutex ) ;
StorageMergeTree : : DataPartPtr new_data_part = new StorageMergeTree : : DataPart ( storage ) ;
new_data_part - > left_date = Yandex : : DayNum_t ( min_date ) ;
new_data_part - > right_date = Yandex : : DayNum_t ( max_date ) ;
new_data_part - > left = part_id ;
new_data_part - > right = part_id ;
new_data_part - > level = 0 ;
new_data_part - > name = part_name ;
2012-11-29 17:04:12 +00:00
new_data_part - > size = ( rows + storage . index_granularity - 1 ) / storage . index_granularity ;
2012-08-10 20:04:34 +00:00
new_data_part - > modification_time = time ( 0 ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-10 20:04:34 +00:00
storage . data_parts . insert ( new_data_part ) ;
2012-07-23 06:23:29 +00:00
storage . all_data_parts . insert ( new_data_part ) ;
}
2012-11-28 11:49:14 +00:00
/// Если на каждую запись делать по две итерации слияния, то дерево будет максимально компактно.
storage . merge ( 2 ) ;
2012-07-18 19:16:16 +00:00
}
2012-07-17 20:04:39 +00:00
2012-08-30 17:43:31 +00:00
/// Записать данные одного столбца.
void writeData ( const String & path , const String & name , const IDataType & type , const IColumn & column , size_t level = 0 )
{
String escaped_column_name = escapeForFileName ( name ) ;
2012-12-11 20:31:39 +00:00
size_t size = column . size ( ) ;
2012-08-30 17:43:31 +00:00
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
WriteBufferFromFile plain ( path + size_name + " .bin " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
WriteBufferFromFile marks ( path + size_name + " .mrk " , 4096 , flags ) ;
CompressedWriteBuffer compressed ( plain ) ;
size_t prev_mark = 0 ;
2012-12-11 20:31:39 +00:00
while ( prev_mark < size )
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary ( plain . count ( ) , marks ) ;
writeIntBinary ( compressed . offset ( ) , marks ) ;
type_arr - > serializeOffsets ( column , compressed , prev_mark , storage . index_granularity ) ;
prev_mark + = storage . index_granularity ;
}
2012-08-30 17:43:31 +00:00
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
WriteBufferFromFile plain ( path + escaped_column_name + " .bin " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
WriteBufferFromFile marks ( path + escaped_column_name + " .mrk " , 4096 , flags ) ;
CompressedWriteBuffer compressed ( plain ) ;
2012-12-11 20:31:39 +00:00
// TODO Для массивов здесь б а г - засечки сериализуются неправильно.
2012-08-30 17:43:31 +00:00
size_t prev_mark = 0 ;
2012-12-11 20:31:39 +00:00
while ( prev_mark < size )
{
writeIntBinary ( plain . count ( ) , marks ) ;
writeIntBinary ( compressed . offset ( ) , marks ) ;
type . serializeBinary ( column , compressed , prev_mark , storage . index_granularity ) ;
prev_mark + = storage . index_granularity ;
}
2012-08-30 17:43:31 +00:00
}
}
2012-07-17 20:04:39 +00:00
} ;
2012-07-30 20:32:36 +00:00
/** Для записи куска, полученного слиянием нескольких других.
* Д а н н ы е у ж е о т с о р т и р о в а н ы , о т н о с я т с я к о д н о м у м е с я ц у , и п и ш у т с я в о д и н к у с к о к .
*/
class MergedBlockOutputStream : public IBlockOutputStream
{
public :
MergedBlockOutputStream ( StorageMergeTree & storage_ ,
UInt16 min_date , UInt16 max_date , UInt64 min_part_id , UInt64 max_part_id , UInt32 level )
2012-12-03 09:39:04 +00:00
: storage ( storage_ ) , marks_count ( 0 ) , index_offset ( 0 )
2012-07-30 20:32:36 +00:00
{
part_name = storage . getPartName (
Yandex : : DayNum_t ( min_date ) , Yandex : : DayNum_t ( max_date ) ,
min_part_id , max_part_id , level ) ;
part_tmp_path = storage . full_path + " tmp_ " + part_name + " / " ;
part_res_path = storage . full_path + part_name + " / " ;
Poco : : File ( part_tmp_path ) . createDirectories ( ) ;
2012-08-21 15:37:29 +00:00
index_stream = new WriteBufferFromFile ( part_tmp_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , O_TRUNC | O_CREAT | O_WRONLY ) ;
2012-07-30 20:32:36 +00:00
for ( NamesAndTypesList : : const_iterator it = storage . columns - > begin ( ) ; it ! = storage . columns - > end ( ) ; + + it )
2012-08-30 17:43:31 +00:00
addStream ( it - > first , * it - > second ) ;
2012-07-30 20:32:36 +00:00
}
void write ( const Block & block )
{
size_t rows = block . rows ( ) ;
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
typedef std : : vector < const ColumnWithNameAndType * > PrimaryColumns ;
PrimaryColumns primary_columns ;
for ( size_t i = 0 , size = storage . sort_descr . size ( ) ; i < size ; + + i )
primary_columns . push_back (
! storage . sort_descr [ i ] . column_name . empty ( )
? & block . getByName ( storage . sort_descr [ i ] . column_name )
: & block . getByPosition ( storage . sort_descr [ i ] . column_number ) ) ;
for ( size_t i = index_offset ; i < rows ; i + = storage . index_granularity )
2012-12-03 08:52:58 +00:00
{
2012-07-30 20:32:36 +00:00
for ( PrimaryColumns : : const_iterator it = primary_columns . begin ( ) ; it ! = primary_columns . end ( ) ; + + it )
2012-12-03 08:52:58 +00:00
{
2012-07-30 20:32:36 +00:00
( * it ) - > type - > serializeBinary ( ( * ( * it ) - > column ) [ i ] , * index_stream ) ;
2012-12-03 08:52:58 +00:00
}
+ + marks_count ;
}
2012-07-30 20:32:36 +00:00
2012-08-30 17:43:31 +00:00
/// Теперь пишем данные.
2012-07-30 20:32:36 +00:00
for ( NamesAndTypesList : : const_iterator it = storage . columns - > begin ( ) ; it ! = storage . columns - > end ( ) ; + + it )
{
const ColumnWithNameAndType & column = block . getByName ( it - > first ) ;
2012-08-30 17:43:31 +00:00
writeData ( column . name , * column . type , * column . column ) ;
2012-07-30 20:32:36 +00:00
}
2012-07-31 18:15:51 +00:00
index_offset = rows % storage . index_granularity
? ( storage . index_granularity - rows % storage . index_granularity )
: 0 ;
2012-07-30 20:32:36 +00:00
}
void writeSuffix ( )
{
/// Заканчиваем запись.
index_stream = NULL ;
column_streams . clear ( ) ;
2012-12-03 08:52:58 +00:00
if ( marks_count = = 0 )
throw Exception ( " Empty part " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-07-30 20:32:36 +00:00
/// Переименовываем кусок.
Poco : : File ( part_tmp_path ) . renameTo ( part_res_path ) ;
/// А добавление нового куска в набор (и удаление исходных кусков) сделает вызывающая сторона.
}
BlockOutputStreamPtr clone ( ) { throw Exception ( " Cannot clone MergedBlockOutputStream " , ErrorCodes : : NOT_IMPLEMENTED ) ; }
2012-12-03 08:52:58 +00:00
/// Сколько засечек уже записано.
size_t marksCount ( )
{
return marks_count ;
}
2012-07-30 20:32:36 +00:00
private :
StorageMergeTree & storage ;
String part_name ;
String part_tmp_path ;
String part_res_path ;
2012-12-03 08:52:58 +00:00
size_t marks_count ;
2012-07-30 20:32:36 +00:00
struct ColumnStream
{
ColumnStream ( const String & data_path , const std : : string & marks_path ) :
2012-08-21 15:37:29 +00:00
plain ( data_path , DBMS_DEFAULT_BUFFER_SIZE , O_TRUNC | O_CREAT | O_WRONLY ) ,
2012-07-30 20:32:36 +00:00
compressed ( plain ) ,
2012-08-21 15:37:29 +00:00
marks ( marks_path , 4096 , O_TRUNC | O_CREAT | O_WRONLY ) { }
2012-07-30 20:32:36 +00:00
WriteBufferFromFile plain ;
CompressedWriteBuffer compressed ;
WriteBufferFromFile marks ;
} ;
typedef std : : map < String , SharedPtr < ColumnStream > > ColumnStreams ;
ColumnStreams column_streams ;
SharedPtr < WriteBuffer > index_stream ;
/// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset ;
2012-08-30 17:43:31 +00:00
void addStream ( const String & name , const IDataType & type , size_t level = 0 )
{
String escaped_column_name = escapeForFileName ( name ) ;
/// Для массивов используются отдельные потоки для размеров.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
column_streams [ size_name ] = new ColumnStream (
part_tmp_path + escaped_size_name + " .bin " ,
part_tmp_path + escaped_size_name + " .mrk " ) ;
addStream ( name , * type_arr - > getNestedType ( ) , level + 1 ) ;
}
else
column_streams [ name ] = new ColumnStream (
part_tmp_path + escaped_column_name + " .bin " ,
part_tmp_path + escaped_column_name + " .mrk " ) ;
}
/// Записать данные одного столбца.
void writeData ( const String & name , const IDataType & type , const IColumn & column , size_t level = 0 )
{
2012-12-11 20:31:39 +00:00
size_t size = column . size ( ) ;
2012-08-30 17:43:31 +00:00
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
ColumnStream & stream = * column_streams [ size_name ] ;
size_t prev_mark = 0 ;
2012-12-11 20:31:39 +00:00
while ( prev_mark < size )
{
size_t limit = 0 ;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if ( prev_mark = = 0 & & index_offset ! = 0 )
{
limit = index_offset ;
}
else
{
limit = storage . index_granularity ;
writeIntBinary ( stream . plain . count ( ) , stream . marks ) ;
writeIntBinary ( stream . compressed . offset ( ) , stream . marks ) ;
}
type_arr - > serializeOffsets ( column , stream . compressed , prev_mark , limit ) ;
prev_mark + = limit ;
}
2012-08-30 17:43:31 +00:00
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
ColumnStream & stream = * column_streams [ name ] ;
2012-12-11 20:31:39 +00:00
// TODO Для массивов здесь б а г - засечки сериализуются неправильно.
2012-08-30 17:43:31 +00:00
size_t prev_mark = 0 ;
2012-12-11 20:31:39 +00:00
while ( prev_mark < size )
{
size_t limit = 0 ;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if ( prev_mark = = 0 & & index_offset ! = 0 )
{
limit = index_offset ;
}
else
{
limit = storage . index_granularity ;
writeIntBinary ( stream . plain . count ( ) , stream . marks ) ;
writeIntBinary ( stream . compressed . offset ( ) , stream . marks ) ;
}
type . serializeBinary ( column , stream . compressed , prev_mark , limit ) ;
prev_mark + = limit ;
}
2012-07-30 20:32:36 +00:00
}
}
} ;
2012-12-03 08:52:58 +00:00
typedef Poco : : SharedPtr < MergedBlockOutputStream > MergedBlockOutputStreamPtr ;
2012-07-30 20:32:36 +00:00
2012-07-19 20:32:10 +00:00
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
{
public :
2012-07-21 06:47:17 +00:00
MergeTreeBlockInputStream ( const String & path_ , /// Путь к куску
2012-11-28 08:52:15 +00:00
size_t block_size_ , const Names & column_names_ ,
StorageMergeTree & storage_ , const StorageMergeTree : : DataPartPtr & owned_data_part_ ,
2012-12-06 09:45:09 +00:00
const MarkRanges & mark_ranges_ )
2012-07-21 06:47:17 +00:00
: path ( path_ ) , block_size ( block_size_ ) , column_names ( column_names_ ) ,
2012-08-10 20:04:34 +00:00
storage ( storage_ ) , owned_data_part ( owned_data_part_ ) ,
2012-12-06 09:45:09 +00:00
mark_ranges ( mark_ranges_ ) , current_range ( - 1 ) , rows_left_in_current_range ( 0 )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
LOG_TRACE ( storage . log , " Reading " < < mark_ranges . size ( ) < < " ranges from part " < < owned_data_part - > name
< < " , up to " < < ( mark_ranges . back ( ) . end - mark_ranges . front ( ) . begin ) * storage . index_granularity
< < " rows starting from " < < mark_ranges . front ( ) . begin * storage . index_granularity ) ;
2012-11-28 08:52:15 +00:00
}
String getName ( ) const { return " MergeTreeBlockInputStream " ; }
BlockInputStreamPtr clone ( )
{
2012-12-06 09:45:09 +00:00
return new MergeTreeBlockInputStream ( path , block_size , column_names , storage , owned_data_part , mark_ranges ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
static MarkRanges markRangesFromPkRange ( const String & path ,
2012-11-28 08:52:15 +00:00
size_t marks_count ,
StorageMergeTree & storage ,
2012-12-10 10:23:10 +00:00
PKCondition & key_condition )
2012-07-21 06:47:17 +00:00
{
2012-12-06 09:45:09 +00:00
MarkRanges res ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
/// Если индекс не используется.
2012-12-05 12:44:55 +00:00
if ( key_condition . alwaysTrue ( ) )
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
res . push_back ( MarkRange ( 0 , marks_count ) ) ;
2012-07-21 06:47:17 +00:00
}
else
{
2012-12-10 10:23:10 +00:00
/// Читаем индекс.
typedef std : : vector < Row > Index ;
size_t key_size = storage . sort_descr . size ( ) ;
Index index ( marks_count , Row ( key_size ) ) ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
String index_path = path + " primary.idx " ;
ReadBufferFromFile index_file ( index_path , std : : min ( static_cast < size_t > ( DBMS_DEFAULT_BUFFER_SIZE ) , Poco : : File ( index_path ) . getSize ( ) ) ) ;
2012-12-06 09:45:09 +00:00
2012-12-10 10:23:10 +00:00
for ( size_t i = 0 ; i < marks_count ; + + i )
2012-12-06 09:45:09 +00:00
{
2012-12-10 10:23:10 +00:00
for ( size_t j = 0 ; j < key_size ; + + j )
storage . primary_key_sample . getByPosition ( j ) . type - > deserializeBinary ( index [ i ] [ j ] , index_file ) ;
2012-12-06 09:45:09 +00:00
}
2012-12-10 10:23:10 +00:00
if ( ! index_file . eof ( ) )
throw Exception ( " index file " + index_path + " is unexpectedly long " , ErrorCodes : : EXPECTED_END_OF_FILE ) ;
}
/// В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
/// Н а каждом шаге берем левый отрезок и проверяем, подходит ли он.
/// Если подходит, разбиваем е г о на более мелкие и кладем их в стек. Если нет - выбрасываем е г о .
/// Если отрезок уже длиной в одну засечку, добавляем е г о в ответ и выбрасываем.
std : : vector < MarkRange > ranges_stack ;
ranges_stack . push_back ( MarkRange ( 0 , marks_count ) ) ;
while ( ! ranges_stack . empty ( ) )
{
MarkRange range = ranges_stack . back ( ) ;
ranges_stack . pop_back ( ) ;
bool may_be_true ;
if ( range . end = = marks_count )
may_be_true = key_condition . mayBeTrueAfter ( index [ range . begin ] ) ;
2012-12-06 09:45:09 +00:00
else
2012-12-10 10:23:10 +00:00
may_be_true = key_condition . mayBeTrueInRange ( index [ range . begin ] , index [ range . end ] ) ;
if ( ! may_be_true )
continue ;
2012-11-28 08:52:15 +00:00
2012-12-10 10:23:10 +00:00
if ( range . end = = range . begin + 1 )
2012-07-21 06:47:17 +00:00
{
2012-12-06 09:45:09 +00:00
/// Увидели полезный промежуток между соседними засечками. Либо добавим е г о к последнему диапазону, либо начнем новый диапазон.
2012-12-10 10:23:10 +00:00
if ( res . empty ( ) | | range . begin - res . back ( ) . end > storage . min_marks_for_seek )
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
res . push_back ( range ) ;
2012-12-06 09:45:09 +00:00
}
else
{
2012-12-10 10:23:10 +00:00
res . back ( ) . end = range . end ;
}
}
else
{
/// Разбиваем отрезок и кладем результат в стек справа налево.
size_t step = ( range . end - range . begin - 1 ) / storage . settings . coarse_index_granularity + 1 ;
size_t end ;
for ( end = range . end ; end > range . begin + step ; end - = step )
{
ranges_stack . push_back ( MarkRange ( end - step , end ) ) ;
2012-07-21 06:47:17 +00:00
}
2012-12-10 10:23:10 +00:00
ranges_stack . push_back ( MarkRange ( range . begin , end ) ) ;
2012-11-20 22:48:38 +00:00
}
2012-07-21 06:47:17 +00:00
}
2012-07-31 19:08:49 +00:00
}
2012-12-06 09:45:09 +00:00
return res ;
2012-07-21 06:47:17 +00:00
}
2012-10-20 02:10:47 +00:00
protected :
2012-07-19 20:32:10 +00:00
Block readImpl ( )
{
Block res ;
2012-12-06 09:45:09 +00:00
/// Если нужно, переходим к следующему диапазону.
if ( rows_left_in_current_range = = 0 )
{
+ + current_range ;
2012-12-06 17:36:51 +00:00
if ( static_cast < size_t > ( current_range ) = = mark_ranges . size ( ) )
2012-12-06 09:45:09 +00:00
return res ;
MarkRange & range = mark_ranges [ current_range ] ;
rows_left_in_current_range = ( range . end - range . begin ) * storage . index_granularity ;
2012-07-19 20:32:10 +00:00
for ( Names : : const_iterator it = column_names . begin ( ) ; it ! = column_names . end ( ) ; + + it )
2012-12-06 09:45:09 +00:00
addStream ( * it , * storage . getDataTypeByName ( * it ) , range . begin ) ;
}
2012-07-19 20:32:10 +00:00
/// Сколько строк читать для следующего блока.
2012-12-06 09:45:09 +00:00
size_t max_rows_to_read = std : : min ( block_size , rows_left_in_current_range ) ;
2012-07-19 20:32:10 +00:00
2012-11-30 00:52:45 +00:00
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Э т о б ы в а е т д л я с т а р ы х к у с к о в , п о с л е д о б а в л е н и я н о в ы х с т о л б ц о в в с т р у к т у р у т а б л и ц ы .
*/
bool has_missing_columns = false ;
bool has_normal_columns = false ;
2012-07-19 20:32:10 +00:00
for ( Names : : const_iterator it = column_names . begin ( ) ; it ! = column_names . end ( ) ; + + it )
{
2012-11-30 00:52:45 +00:00
if ( streams . end ( ) = = streams . find ( * it ) )
{
has_missing_columns = true ;
continue ;
}
has_normal_columns = true ;
2012-07-19 20:32:10 +00:00
ColumnWithNameAndType column ;
column . name = * it ;
column . type = storage . getDataTypeByName ( * it ) ;
column . column = column . type - > createColumn ( ) ;
2012-08-30 17:43:31 +00:00
readData ( * it , * column . type , * column . column , max_rows_to_read ) ;
2012-07-19 20:32:10 +00:00
if ( column . column - > size ( ) )
res . insert ( column ) ;
}
2012-11-30 00:52:45 +00:00
if ( has_missing_columns & & ! has_normal_columns )
throw Exception ( " All requested columns are missing " , ErrorCodes : : ALL_REQUESTED_COLUMNS_ARE_MISSING ) ;
2012-07-19 20:32:10 +00:00
if ( res )
2012-11-30 00:52:45 +00:00
{
2012-12-06 09:45:09 +00:00
rows_left_in_current_range - = res . rows ( ) ;
2012-07-19 20:32:10 +00:00
2012-11-30 00:52:45 +00:00
/// Заполним столбцы, для которых нет файлов, значениями по-умолчанию.
if ( has_missing_columns )
{
for ( Names : : const_iterator it = column_names . begin ( ) ; it ! = column_names . end ( ) ; + + it )
{
if ( streams . end ( ) = = streams . find ( * it ) )
{
ColumnWithNameAndType column ;
column . name = * it ;
column . type = storage . getDataTypeByName ( * it ) ;
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
* о н м о ж е т б ы т ь п о л н о ц е н н ы м ( а т о и н т е р п р е т а т о р м о ж е т п о с ч и т а т ь , ч т о о н к о н с т а н т н ы й в е з д е ) .
*/
column . column = dynamic_cast < IColumnConst & > ( * column . type - > createConstColumn (
res . rows ( ) , column . type - > getDefault ( ) ) ) . convertToFullColumn ( ) ;
res . insert ( column ) ;
}
}
}
}
2012-12-06 09:45:09 +00:00
if ( ! res | | rows_left_in_current_range = = 0 )
2012-07-19 20:32:10 +00:00
{
2012-12-06 09:45:09 +00:00
rows_left_in_current_range = 0 ;
2012-07-19 20:32:10 +00:00
/** Закрываем файлы (ещё до уничтожения объекта).
* Ч т о б ы п р и с о з д а н и и м н о г и х и с т о ч н и к о в , н о о д н о в р е м е н н о м ч т е н и и т о л ь к о и з н е с к о л ь к и х ,
* б у ф е р ы н е в и с е л и в п а м я т и .
*/
streams . clear ( ) ;
}
return res ;
}
private :
const String path ;
size_t block_size ;
Names column_names ;
StorageMergeTree & storage ;
2012-08-10 20:04:34 +00:00
const StorageMergeTree : : DataPartPtr owned_data_part ; /// Кусок не будет удалён, пока им владеет этот объект.
2012-12-06 09:45:09 +00:00
MarkRanges mark_ranges ; /// В каких диапазонах засечек читать.
int current_range ; /// Какой из mark_ranges сейчас читаем.
size_t rows_left_in_current_range ; /// Сколько строк уже прочитали из текущего элемента mark_ranges.
2012-07-19 20:32:10 +00:00
struct Stream
{
Stream ( const String & path_prefix , size_t mark_number )
: plain ( path_prefix + " .bin " , std : : min ( static_cast < size_t > ( DBMS_DEFAULT_BUFFER_SIZE ) , Poco : : File ( path_prefix + " .bin " ) . getSize ( ) ) ) ,
compressed ( plain )
{
if ( mark_number )
{
/// Прочитаем из файла с засечками смещение в файле с данными.
ReadBufferFromFile marks ( path_prefix + " .mrk " , MERGE_TREE_MARK_SIZE ) ;
marks . seek ( mark_number * MERGE_TREE_MARK_SIZE ) ;
size_t offset_in_compressed_file = 0 ;
size_t offset_in_decompressed_block = 0 ;
readIntBinary ( offset_in_compressed_file , marks ) ;
readIntBinary ( offset_in_decompressed_block , marks ) ;
plain . seek ( offset_in_compressed_file ) ;
compressed . next ( ) ;
compressed . position ( ) + = offset_in_decompressed_block ;
}
}
ReadBufferFromFile plain ;
CompressedReadBuffer compressed ;
} ;
typedef std : : map < std : : string , SharedPtr < Stream > > FileStreams ;
FileStreams streams ;
2012-08-30 17:43:31 +00:00
2012-12-06 09:45:09 +00:00
void addStream ( const String & name , const IDataType & type , size_t mark_number , size_t level = 0 )
2012-08-30 17:43:31 +00:00
{
String escaped_column_name = escapeForFileName ( name ) ;
2012-11-30 00:52:45 +00:00
/** Если файла с данными нет - то не будем пытаться открыть е г о .
* Э т о н у ж н о , ч т о б ы м о ж н о б ы л о д о б а в л я т ь н о в ы е с т о л б ц ы к с т р у к т у р е т а б л и ц ы б е з с о з д а н и я ф а й л о в д л я с т а р ы х к у с к о в .
*/
if ( ! Poco : : File ( path + escaped_column_name + " .bin " ) . exists ( ) )
return ;
2012-08-30 17:43:31 +00:00
/// Для массивов используются отдельные потоки для размеров.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
streams . insert ( std : : make_pair ( size_name , new Stream (
path + escaped_size_name ,
mark_number ) ) ) ;
2012-12-06 09:45:09 +00:00
addStream ( name , * type_arr - > getNestedType ( ) , mark_number , level + 1 ) ;
2012-08-30 17:43:31 +00:00
}
else
streams . insert ( std : : make_pair ( name , new Stream (
path + escaped_column_name ,
mark_number ) ) ) ;
}
void readData ( const String & name , const IDataType & type , IColumn & column , size_t max_rows_to_read , size_t level = 0 )
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
type_arr - > deserializeOffsets (
column ,
streams [ name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ] - > compressed ,
max_rows_to_read ) ;
2012-08-30 20:35:02 +00:00
if ( column . size ( ) )
readData (
name ,
* type_arr - > getNestedType ( ) ,
dynamic_cast < ColumnArray & > ( column ) . getData ( ) ,
dynamic_cast < const ColumnArray & > ( column ) . getOffsets ( ) [ column . size ( ) - 1 ] ,
level + 1 ) ;
2012-08-30 17:43:31 +00:00
}
else
type . deserializeBinary ( column , streams [ name ] - > compressed , max_rows_to_read ) ;
}
2012-07-19 20:32:10 +00:00
} ;
2012-07-17 20:04:39 +00:00
StorageMergeTree : : StorageMergeTree (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
Context & context_ ,
2012-12-12 14:25:55 +00:00
ASTPtr & primary_expr_ast_ ,
2012-12-12 15:45:08 +00:00
const String & date_column_name_ , const ASTPtr & sampling_expression_ ,
2012-08-16 17:27:40 +00:00
size_t index_granularity_ ,
2012-08-20 05:32:50 +00:00
const String & sign_column_ ,
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_ )
2012-07-17 20:04:39 +00:00
: path ( path_ ) , name ( name_ ) , full_path ( path + escapeForFileName ( name ) + ' / ' ) , columns ( columns_ ) ,
context ( context_ ) , primary_expr_ast ( primary_expr_ast_ - > clone ( ) ) ,
2012-12-12 15:45:08 +00:00
date_column_name ( date_column_name_ ) , sampling_expression ( sampling_expression_ ) ,
2012-12-12 14:25:55 +00:00
index_granularity ( index_granularity_ ) ,
2012-08-20 05:32:50 +00:00
sign_column ( sign_column_ ) ,
2012-08-29 20:23:19 +00:00
settings ( settings_ ) ,
2012-07-19 20:32:10 +00:00
increment ( full_path + " increment.txt " ) , log ( & Logger : : get ( " StorageMergeTree: " + name ) )
2012-07-17 20:04:39 +00:00
{
2012-12-06 13:07:29 +00:00
min_marks_for_seek = ( settings . min_rows_for_seek + index_granularity - 1 ) / index_granularity ;
min_marks_for_concurrent_read = ( settings . min_rows_for_concurrent_read + index_granularity - 1 ) / index_granularity ;
2012-07-17 20:04:39 +00:00
/// создаём директорию, если её нет
Poco : : File ( full_path ) . createDirectories ( ) ;
/// инициализируем описание сортировки
sort_descr . reserve ( primary_expr_ast - > children . size ( ) ) ;
for ( ASTs : : iterator it = primary_expr_ast - > children . begin ( ) ;
it ! = primary_expr_ast - > children . end ( ) ;
+ + it )
{
2012-07-18 20:14:41 +00:00
String name = ( * it ) - > getColumnName ( ) ;
2012-07-17 20:04:39 +00:00
sort_descr . push_back ( SortColumnDescription ( name , 1 ) ) ;
}
2012-07-18 20:14:41 +00:00
2012-08-02 17:33:31 +00:00
context . setColumns ( * columns ) ;
2012-07-18 20:14:41 +00:00
primary_expr = new Expression ( primary_expr_ast , context ) ;
2012-07-21 07:02:55 +00:00
primary_key_sample = primary_expr - > getSampleBlock ( ) ;
2012-07-19 20:32:10 +00:00
2012-11-28 08:52:15 +00:00
merge_threads = new boost : : threadpool : : pool ( settings . merging_threads ) ;
2012-07-19 20:32:10 +00:00
loadDataParts ( ) ;
2012-07-17 20:04:39 +00:00
}
2012-07-30 20:32:36 +00:00
StorageMergeTree : : ~ StorageMergeTree ( )
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-07-30 20:32:36 +00:00
}
2012-07-18 19:44:04 +00:00
BlockOutputStreamPtr StorageMergeTree : : write ( ASTPtr query )
{
return new MergeTreeBlockOutputStream ( * this ) ;
}
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree : : read (
2012-12-12 16:11:27 +00:00
const Names & column_names_to_return ,
2012-07-21 05:07:14 +00:00
ASTPtr query ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
2012-12-10 10:23:10 +00:00
PKCondition key_condition ( query , context , sort_descr ) ;
PKCondition date_condition ( query , context , SortDescription ( 1 , SortColumnDescription ( date_column_name , 1 ) ) ) ;
2012-07-21 03:45:48 +00:00
2012-12-13 10:08:54 +00:00
typedef std : : vector < DataPartPtr > PartsList ;
PartsList parts ;
/// Выберем куски, в которых могут быть данные, удовлетворяющие key_condition.
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
if ( date_condition . mayBeTrueInRange ( Row ( 1 , static_cast < UInt64 > ( ( * it ) - > left_date ) ) , Row ( 1 , static_cast < UInt64 > ( ( * it ) - > right_date ) ) ) )
parts . push_back ( * it ) ;
}
2012-12-12 16:26:18 +00:00
/// Семплирование.
2012-12-12 16:11:27 +00:00
Names column_names_to_read = column_names_to_return ;
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_value_limit = 0 ;
2012-12-12 16:26:18 +00:00
typedef Poco : : SharedPtr < ASTFunction > ASTFunctionPtr ;
ASTFunctionPtr filter_function ;
ExpressionPtr filter_expression ;
2012-12-12 14:25:55 +00:00
ASTSelectQuery & select = * dynamic_cast < ASTSelectQuery * > ( & * query ) ;
if ( select . sample_size )
{
2012-12-13 10:08:54 +00:00
double size = boost : : apply_visitor ( FieldVisitorConvertToNumber < double > ( ) ,
dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-12 14:25:55 +00:00
if ( size < 0 )
throw Exception ( " Negative sample size " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
if ( size > 1 )
{
2012-12-13 09:32:08 +00:00
size_t requested_count = boost : : apply_visitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , dynamic_cast < ASTLiteral & > ( * select . sample_size ) . value ) ;
2012-12-13 10:08:54 +00:00
/// Узнаем, сколько строк мы бы прочли без семплирования.
LOG_DEBUG ( log , " Preliminary index scan with condition: " < < key_condition . toString ( ) ) ;
2012-12-13 09:32:08 +00:00
size_t total_count = 0 ;
2012-12-13 10:08:54 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
2012-12-13 09:32:08 +00:00
{
2012-12-13 10:08:54 +00:00
DataPartPtr & part = parts [ i ] ;
MarkRanges ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( full_path + part - > name + ' / ' ,
part - > size ,
* this ,
key_condition ) ;
for ( size_t j = 0 ; j < ranges . size ( ) ; + + j )
total_count + = ranges [ j ] . end - ranges [ j ] . begin ;
2012-12-13 09:32:08 +00:00
}
total_count * = index_granularity ;
size = std : : min ( 1. , static_cast < double > ( requested_count ) / total_count ) ;
2012-12-13 09:38:36 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Selected relative sample size: " < < size ) ;
2012-12-12 14:25:55 +00:00
}
2012-12-13 09:32:08 +00:00
UInt64 sampling_column_max = 0 ;
DataTypePtr type = Expression ( sampling_expression , context ) . getReturnTypes ( ) [ 0 ] ;
if ( type - > getName ( ) = = " UInt64 " )
sampling_column_max = std : : numeric_limits < UInt64 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt32 " )
sampling_column_max = std : : numeric_limits < UInt32 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt16 " )
sampling_column_max = std : : numeric_limits < UInt16 > : : max ( ) ;
else if ( type - > getName ( ) = = " UInt8 " )
sampling_column_max = std : : numeric_limits < UInt8 > : : max ( ) ;
2012-12-12 14:25:55 +00:00
else
2012-12-13 09:32:08 +00:00
throw Exception ( " Invalid sampling column type in storage parameters: " + type - > getName ( ) + " . Must be unsigned integer type. " , ErrorCodes : : ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ) ;
2012-12-13 10:08:54 +00:00
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
2012-12-13 09:32:08 +00:00
sampling_column_value_limit = static_cast < UInt64 > ( size * sampling_column_max ) ;
if ( ! key_condition . addCondition ( sampling_expression - > getColumnName ( ) ,
Range : : RightBounded ( sampling_column_value_limit , true ) ) )
throw Exception ( " Sampling column not in primary key " , ErrorCodes : : ILLEGAL_COLUMN ) ;
/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit
2012-12-12 16:26:18 +00:00
ASTPtr filter_function_args = new ASTExpressionList ;
filter_function_args - > children . push_back ( sampling_expression ) ;
2012-12-13 09:32:08 +00:00
filter_function_args - > children . push_back ( new ASTLiteral ( StringRange ( ) , sampling_column_value_limit ) ) ;
2012-12-12 16:26:18 +00:00
filter_function = new ASTFunction ;
filter_function - > name = " lessOrEquals " ;
filter_function - > arguments = filter_function_args ;
filter_function - > children . push_back ( filter_function - > arguments ) ;
filter_expression = new Expression ( filter_function , context ) ;
std : : vector < String > add_columns = filter_expression - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
}
2012-12-12 14:25:55 +00:00
2012-12-13 10:08:54 +00:00
LOG_DEBUG ( log , " Key condition: " < < key_condition . toString ( ) ) ;
LOG_DEBUG ( log , " Date condition: " < < date_condition . toString ( ) ) ;
2012-11-28 08:52:15 +00:00
2012-12-06 11:48:41 +00:00
RangesInDataParts parts_with_ranges ;
2012-11-28 08:52:15 +00:00
/// Найдем, какой диапазон читать из каждого куска.
size_t sum_marks = 0 ;
2012-12-06 11:48:41 +00:00
size_t sum_ranges = 0 ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 11:48:41 +00:00
DataPartPtr & part = parts [ i ] ;
RangesInDataPart ranges ( part ) ;
ranges . ranges = MergeTreeBlockInputStream : : markRangesFromPkRange ( full_path + part - > name + ' / ' ,
2012-12-13 10:08:54 +00:00
part - > size ,
* this ,
key_condition ) ;
2012-12-06 11:48:41 +00:00
if ( ! ranges . ranges . empty ( ) )
{
parts_with_ranges . push_back ( ranges ) ;
sum_ranges + = ranges . ranges . size ( ) ;
for ( size_t j = 0 ; j < ranges . ranges . size ( ) ; + + j )
{
sum_marks + = ranges . ranges [ j ] . end - ranges . ranges [ j ] . begin ;
}
}
2012-11-28 08:52:15 +00:00
}
2012-12-06 12:24:08 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts by date, " < < parts_with_ranges . size ( ) < < " parts by key, "
2012-12-06 11:48:41 +00:00
< < sum_marks < < " marks to read from " < < sum_ranges < < " ranges " ) ;
2012-11-28 08:52:15 +00:00
2012-12-12 16:11:27 +00:00
BlockInputStreams res = spreadMarkRangesAmongThreads ( parts_with_ranges , threads , column_names_to_read , max_block_size ) ;
2012-12-12 14:25:55 +00:00
2012-12-13 09:32:08 +00:00
if ( select . sample_size )
2012-12-12 14:25:55 +00:00
{
for ( size_t i = 0 ; i < res . size ( ) ; + + i )
{
BlockInputStreamPtr original_stream = res [ i ] ;
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream ( original_stream , filter_expression ) ;
BlockInputStreamPtr filter_stream = new FilterBlockInputStream ( expression_stream , filter_function - > getColumnName ( ) ) ;
res [ i ] = filter_stream ;
}
}
return res ;
2012-12-06 09:45:09 +00:00
}
/// Примерно поровну распределить засечки между потоками.
BlockInputStreams StorageMergeTree : : spreadMarkRangesAmongThreads ( RangesInDataParts parts , size_t threads , const Names & column_names , size_t max_block_size )
{
/// Н а всякий случай перемешаем куски.
std : : random_shuffle ( parts . begin ( ) , parts . end ( ) ) ;
/// Посчитаем засечки для каждого куска.
std : : vector < size_t > sum_marks_in_parts ( parts . size ( ) ) ;
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std : : reverse ( parts [ i ] . ranges . begin ( ) , parts [ i ] . ranges . end ( ) ) ;
sum_marks_in_parts [ i ] = 0 ;
for ( size_t j = 0 ; j < parts [ i ] . ranges . size ( ) ; + + j )
{
MarkRange & range = parts [ i ] . ranges [ j ] ;
sum_marks_in_parts [ i ] + = range . end - range . begin ;
}
sum_marks + = sum_marks_in_parts [ i ] ;
}
2012-11-28 08:52:15 +00:00
BlockInputStreams res ;
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( sum_marks > 0 )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t min_marks_per_thread = ( sum_marks - 1 ) / threads + 1 ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
for ( size_t i = 0 ; i < threads & & ! parts . empty ( ) ; + + i )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t need_marks = min_marks_per_thread ;
2012-11-30 08:33:36 +00:00
BlockInputStreams streams ;
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по кускам.
while ( need_marks > 0 & & ! parts . empty ( ) )
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
RangesInDataPart & part = parts . back ( ) ;
size_t & marks_in_part = sum_marks_in_parts . back ( ) ;
/// Н е будем брать из куска слишком мало строк.
2012-12-06 13:07:29 +00:00
if ( marks_in_part > = min_marks_for_concurrent_read & &
need_marks < min_marks_for_concurrent_read )
need_marks = min_marks_for_concurrent_read ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Н е будем оставлять в куске слишком мало строк.
if ( marks_in_part > need_marks & &
2012-12-06 13:07:29 +00:00
marks_in_part - need_marks < min_marks_for_concurrent_read )
2012-12-06 09:45:09 +00:00
need_marks = marks_in_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Возьмем весь кусок, если он достаточно мал.
if ( marks_in_part < = need_marks )
2012-11-30 08:33:36 +00:00
{
2012-12-06 09:45:09 +00:00
/// Восстановим порядок отрезков.
std : : reverse ( part . ranges . begin ( ) , part . ranges . end ( ) ) ;
streams . push_back ( new MergeTreeBlockInputStream ( full_path + part . data_part - > name + ' / ' ,
max_block_size , column_names , * this ,
part . data_part , part . ranges ) ) ;
need_marks - = marks_in_part ;
parts . pop_back ( ) ;
sum_marks_in_parts . pop_back ( ) ;
2012-11-30 08:33:36 +00:00
continue ;
}
2012-12-06 09:45:09 +00:00
MarkRanges ranges_to_get_from_part ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по отрезкам куска.
while ( need_marks > 0 )
{
if ( part . ranges . empty ( ) )
throw Exception ( " Unexpected end of ranges while spreading marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
MarkRange & range = part . ranges . back ( ) ;
size_t marks_in_range = range . end - range . begin ;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
size_t marks_to_get_from_range = std : : min ( marks_in_range , need_marks ) ;
ranges_to_get_from_part . push_back ( MarkRange ( range . begin , range . begin + marks_to_get_from_range ) ) ;
range . begin + = marks_to_get_from_range ;
marks_in_part - = marks_to_get_from_range ;
need_marks - = marks_to_get_from_range ;
if ( range . begin = = range . end )
part . ranges . pop_back ( ) ;
}
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
streams . push_back ( new MergeTreeBlockInputStream ( full_path + part . data_part - > name + ' / ' ,
max_block_size , column_names , * this ,
part . data_part , ranges_to_get_from_part ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if ( streams . size ( ) = = 1 )
res . push_back ( streams [ 0 ] ) ;
2012-11-29 08:41:20 +00:00
else
2012-11-30 08:33:36 +00:00
res . push_back ( new ConcatBlockInputStream ( streams ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
if ( ! parts . empty ( ) )
2012-12-03 09:39:04 +00:00
throw Exception ( " Couldn't spread marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-11-28 08:52:15 +00:00
}
2012-12-06 11:10:05 +00:00
return res ;
2012-07-21 03:45:48 +00:00
}
2012-07-19 20:32:10 +00:00
String StorageMergeTree : : getPartName ( Yandex : : DayNum_t left_date , Yandex : : DayNum_t right_date , UInt64 left_id , UInt64 right_id , UInt64 level )
2012-07-17 20:04:39 +00:00
{
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-19 20:32:10 +00:00
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
2012-07-17 20:04:39 +00:00
String res ;
{
2012-07-19 20:32:10 +00:00
unsigned left_date_id = Yandex : : Date2OrderedIdentifier ( date_lut . fromDayNum ( left_date ) ) ;
unsigned right_date_id = Yandex : : Date2OrderedIdentifier ( date_lut . fromDayNum ( right_date ) ) ;
2012-07-17 20:04:39 +00:00
WriteBufferFromString wb ( res ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( left_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( right_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
writeIntText ( left_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( right_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( level , wb ) ;
}
return res ;
}
2012-07-19 20:32:10 +00:00
void StorageMergeTree : : loadDataParts ( )
{
LOG_DEBUG ( log , " Loading data parts " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-19 20:32:10 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-08-10 20:04:34 +00:00
data_parts . clear ( ) ;
2012-07-19 20:32:10 +00:00
static Poco : : RegularExpression file_name_regexp ( " ^( \\ d{8})_( \\ d{8})_( \\ d+)_( \\ d+)_( \\ d+) " ) ;
Poco : : DirectoryIterator end ;
Poco : : RegularExpression : : MatchVec matches ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
std : : string file_name = it . name ( ) ;
if ( ! ( file_name_regexp . match ( file_name , 0 , matches ) & & 6 = = matches . size ( ) ) )
continue ;
2012-07-31 20:03:53 +00:00
DataPartPtr part = new DataPart ( * this ) ;
2012-07-23 06:23:29 +00:00
part - > left_date = date_lut . toDayNum ( Yandex : : OrderedIdentifier2Date ( file_name . substr ( matches [ 1 ] . offset , matches [ 1 ] . length ) ) ) ;
part - > right_date = date_lut . toDayNum ( Yandex : : OrderedIdentifier2Date ( file_name . substr ( matches [ 2 ] . offset , matches [ 2 ] . length ) ) ) ;
part - > left = Poco : : NumberParser : : parseUnsigned64 ( file_name . substr ( matches [ 3 ] . offset , matches [ 3 ] . length ) ) ;
part - > right = Poco : : NumberParser : : parseUnsigned64 ( file_name . substr ( matches [ 4 ] . offset , matches [ 4 ] . length ) ) ;
part - > level = Poco : : NumberParser : : parseUnsigned ( file_name . substr ( matches [ 5 ] . offset , matches [ 5 ] . length ) ) ;
part - > name = file_name ;
2012-07-19 20:32:10 +00:00
/// Размер - в количестве засечек.
2012-07-23 06:23:29 +00:00
part - > size = Poco : : File ( full_path + file_name + " / " + escapeForFileName ( columns - > front ( ) . first ) + " .mrk " ) . getSize ( )
2012-07-19 20:32:10 +00:00
/ MERGE_TREE_MARK_SIZE ;
2012-07-23 06:23:29 +00:00
part - > modification_time = it - > getLastModified ( ) . epochTime ( ) ;
2012-07-19 20:32:10 +00:00
2012-08-31 20:38:05 +00:00
part - > left_month = date_lut . toFirstDayNumOfMonth ( part - > left_date ) ;
part - > right_month = date_lut . toFirstDayNumOfMonth ( part - > right_date ) ;
2012-07-19 20:32:10 +00:00
2012-08-10 20:04:34 +00:00
data_parts . insert ( part ) ;
2012-07-19 20:32:10 +00:00
}
2012-08-10 20:04:34 +00:00
all_data_parts = data_parts ;
2012-07-31 20:03:53 +00:00
2012-07-31 20:13:14 +00:00
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* н о п о к а к и м - т о п р и ч и н а м о с т а л и с ь л е ж а т ь в ф а й л о в о й с и с т е м е .
* У д а л е н и е ф а й л о в б у д е т п р о и з в е д е н о п о т о м в м е т о д е clearOldParts .
*/
2012-08-10 20:04:34 +00:00
if ( data_parts . size ( ) > = 2 )
2012-07-31 20:13:14 +00:00
{
2012-08-10 20:04:34 +00:00
DataParts : : iterator prev_jt = data_parts . begin ( ) ;
2012-08-07 20:37:45 +00:00
DataParts : : iterator curr_jt = prev_jt ;
+ + curr_jt ;
2012-08-10 20:04:34 +00:00
while ( curr_jt ! = data_parts . end ( ) )
2012-07-31 20:13:14 +00:00
{
2012-08-07 20:37:45 +00:00
/// Куски данных за разные месяцы рассматривать не будем
if ( ( * curr_jt ) - > left_month ! = ( * curr_jt ) - > right_month
| | ( * curr_jt ) - > right_month ! = ( * prev_jt ) - > left_month
| | ( * prev_jt ) - > left_month ! = ( * prev_jt ) - > right_month )
{
+ + prev_jt ;
+ + curr_jt ;
continue ;
}
2012-07-31 20:13:14 +00:00
2012-08-07 20:37:45 +00:00
if ( ( * curr_jt ) - > contains ( * * prev_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * curr_jt ) - > name < < " contains " < < ( * prev_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( prev_jt ) ;
2012-08-07 20:37:45 +00:00
prev_jt = curr_jt ;
+ + curr_jt ;
}
else if ( ( * prev_jt ) - > contains ( * * curr_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * prev_jt ) - > name < < " contains " < < ( * curr_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( curr_jt + + ) ;
2012-08-07 20:37:45 +00:00
}
else
{
+ + prev_jt ;
+ + curr_jt ;
}
2012-07-31 20:13:14 +00:00
}
}
2012-07-31 20:03:53 +00:00
2012-08-10 20:04:34 +00:00
LOG_DEBUG ( log , " Loaded data parts ( " < < data_parts . size ( ) < < " items) " ) ;
2012-07-19 20:32:10 +00:00
}
2012-07-23 06:23:29 +00:00
void StorageMergeTree : : clearOldParts ( )
{
Poco : : ScopedTry < Poco : : FastMutex > lock ;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if ( ! lock . lock ( & all_data_parts_mutex ) )
{
LOG_TRACE ( log , " Already clearing or modifying old parts " ) ;
return ;
}
LOG_TRACE ( log , " Clearing old parts " ) ;
for ( DataParts : : iterator it = all_data_parts . begin ( ) ; it ! = all_data_parts . end ( ) ; )
{
2012-07-31 18:25:16 +00:00
int ref_count = it - > referenceCount ( ) ;
LOG_TRACE ( log , ( * it ) - > name < < " : ref_count = " < < ref_count ) ;
2012-08-10 20:04:34 +00:00
if ( ref_count = = 1 ) /// После этого ref_count не может увеличиться.
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Removing part " < < ( * it ) - > name ) ;
( * it ) - > remove ( ) ;
all_data_parts . erase ( it + + ) ;
}
else
+ + it ;
}
}
2012-09-10 19:05:06 +00:00
void StorageMergeTree : : merge ( size_t iterations , bool async )
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
bool while_can = false ;
if ( iterations = = 0 ) {
while_can = true ;
iterations = settings . merging_threads ;
2012-08-01 20:08:59 +00:00
}
2012-11-28 17:17:17 +00:00
for ( size_t i = 0 ; i < iterations ; + + i )
2012-11-28 08:52:15 +00:00
merge_threads - > schedule ( boost : : bind ( & StorageMergeTree : : mergeThread , this , while_can ) ) ;
if ( ! async )
joinMergeThreads ( ) ;
2012-08-13 19:13:11 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeThread ( bool while_can )
2012-08-13 19:13:11 +00:00
{
try
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
std : : vector < DataPartPtr > parts ;
while ( selectPartsToMerge ( parts ) )
2012-08-13 19:13:11 +00:00
{
2012-11-28 08:52:15 +00:00
mergeParts ( parts ) ;
2012-08-13 19:13:11 +00:00
/// Удаляем старые куски.
2012-11-28 08:52:15 +00:00
parts . clear ( ) ;
2012-08-13 19:13:11 +00:00
clearOldParts ( ) ;
2012-11-28 08:52:15 +00:00
if ( ! while_can )
break ;
2012-08-13 19:13:11 +00:00
}
}
catch ( const Exception & e )
{
LOG_ERROR ( log , " Code: " < < e . code ( ) < < " . " < < e . displayText ( ) < < std : : endl
< < std : : endl
< < " Stack trace: " < < std : : endl
< < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
LOG_ERROR ( log , " Poco::Exception: " < < e . code ( ) < < " . " < < e . displayText ( ) ) ;
}
catch ( const std : : exception & e )
{
LOG_ERROR ( log , " std::exception: " < < e . what ( ) ) ;
}
catch ( . . . )
{
LOG_ERROR ( log , " Unknown exception " ) ;
}
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : joinMergeThreads ( )
{
LOG_DEBUG ( log , " Waiting for merge thread to finish. " ) ;
merge_threads - > wait ( ) ;
}
2012-11-29 10:50:17 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
2012-11-29 17:43:23 +00:00
/// При max_parts_to_merge_at_once >= log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts),
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_rows_to_merge_parts).
2012-11-29 12:24:08 +00:00
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
/// Из всех таких выбираем отрезок с минимальным минимумом размера.
/// Из всех таких выбираем отрезок с максимальной длиной.
2012-11-28 08:52:15 +00:00
bool StorageMergeTree : : selectPartsToMerge ( std : : vector < DataPartPtr > & parts )
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Selecting parts to merge " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
2012-07-23 06:23:29 +00:00
2012-11-29 11:32:29 +00:00
size_t min_max = - 1U ;
size_t min_min = - 1U ;
int max_len = 0 ;
2012-11-29 10:50:17 +00:00
DataParts : : iterator best_begin ;
bool found = false ;
2012-11-29 12:24:08 +00:00
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
2012-11-29 12:26:34 +00:00
int max_count_from_left = 0 ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Левый конец отрезка.
2012-11-29 10:50:17 +00:00
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & first_part = * it ;
2012-11-29 12:26:34 +00:00
max_count_from_left = std : : max ( 0 , max_count_from_left - 1 ) ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Кусок не занят и достаточно мал.
if ( first_part - > currently_merging | |
first_part - > size * index_granularity > settings . max_rows_to_merge_parts )
continue ;
/// Кусок в одном месяце.
if ( first_part - > left_month ! = first_part - > right_month )
{
LOG_WARNING ( log , " Part " < < first_part - > name < < " spans more than one month " ) ;
2012-11-29 10:50:17 +00:00
continue ;
2012-11-29 12:24:08 +00:00
}
/// Самый длинный валидный отрезок, начинающийся здесь.
2012-11-29 16:39:29 +00:00
size_t cur_longest_max = - 1U ;
size_t cur_longest_min = - 1U ;
2012-11-29 12:24:08 +00:00
int cur_longest_len = 0 ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part - > size ;
size_t cur_min = first_part - > size ;
size_t cur_sum = first_part - > size ;
2012-11-29 10:50:17 +00:00
int cur_len = 1 ;
2012-11-29 12:24:08 +00:00
Yandex : : DayNum_t month = first_part - > left_month ;
UInt64 cur_id = first_part - > right ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Правый конец отрезка.
2012-11-29 10:50:17 +00:00
DataParts : : iterator jt = it ;
2012-11-29 11:32:29 +00:00
for ( + + jt ; jt ! = data_parts . end ( ) & & cur_len < static_cast < int > ( settings . max_parts_to_merge_at_once ) ; + + jt )
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & last_part = * jt ;
/// Кусок не занят, достаточно мал и в одном правильном месяце.
if ( last_part - > currently_merging | |
last_part - > size * index_granularity > settings . max_rows_to_merge_parts | |
last_part - > left_month ! = last_part - > right_month | |
last_part - > left_month ! = month )
2012-11-29 10:50:17 +00:00
break ;
2012-11-29 12:24:08 +00:00
/// Кусок правее предыдущего.
2012-11-30 00:52:45 +00:00
if ( last_part - > left < cur_id )
2012-11-29 12:24:08 +00:00
{
LOG_WARNING ( log , " Part " < < last_part - > name < < " intersects previous part " ) ;
break ;
}
cur_max = std : : max ( cur_max , last_part - > size ) ;
cur_min = std : : min ( cur_min , last_part - > size ) ;
cur_sum + = last_part - > size ;
2012-11-29 10:50:17 +00:00
+ + cur_len ;
2012-11-29 12:24:08 +00:00
cur_id = last_part - > right ;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
2012-11-29 10:50:17 +00:00
if ( cur_len > = 2 & &
2012-11-29 12:24:08 +00:00
static_cast < double > ( cur_max ) / ( cur_sum - cur_max ) < settings . max_size_ratio_to_merge_parts )
{
cur_longest_max = cur_max ;
cur_longest_min = cur_min ;
cur_longest_len = cur_len ;
}
}
2012-11-29 12:26:34 +00:00
/// Это максимальный по включению валидный отрезок.
2012-11-29 12:24:08 +00:00
if ( cur_longest_len > max_count_from_left )
{
max_count_from_left = cur_longest_len ;
if ( ! found | |
std : : make_pair ( std : : make_pair ( cur_longest_max , cur_longest_min ) , - cur_longest_len ) <
std : : make_pair ( std : : make_pair ( min_max , min_min ) , - max_len ) )
2012-11-29 10:50:17 +00:00
{
found = true ;
2012-11-29 12:24:08 +00:00
min_max = cur_longest_max ;
min_min = cur_longest_min ;
max_len = cur_longest_len ;
2012-11-29 10:50:17 +00:00
best_begin = it ;
}
2012-07-23 06:23:29 +00:00
}
}
2012-11-29 10:50:17 +00:00
if ( found )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . clear ( ) ;
DataParts : : iterator it = best_begin ;
for ( int i = 0 ; i < max_len ; + + i )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . push_back ( * it ) ;
2012-11-29 11:48:27 +00:00
parts . back ( ) - > currently_merging = true ;
2012-11-29 10:50:17 +00:00
+ + it ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
2012-11-30 02:01:02 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
else
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
LOG_DEBUG ( log , " No parts to merge " ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
return found ;
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeParts ( std : : vector < DataPartPtr > parts )
2012-07-23 06:23:29 +00:00
{
2012-11-30 02:01:02 +00:00
LOG_DEBUG ( log , " Merging " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Names all_column_names ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
all_column_names . push_back ( it - > first ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
StorageMergeTree : : DataPartPtr new_data_part = new DataPart ( * this ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : numeric_limits < UInt16 > : : max ( ) ;
new_data_part - > right_date = std : : numeric_limits < UInt16 > : : min ( ) ;
2012-11-30 02:01:02 +00:00
new_data_part - > left = parts . front ( ) - > left ;
2012-11-28 08:52:15 +00:00
new_data_part - > right = parts . back ( ) - > right ;
new_data_part - > level = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
new_data_part - > level = std : : max ( new_data_part - > level , parts [ i ] - > level ) ;
2012-12-06 12:51:15 +00:00
new_data_part - > left_date = std : : min ( new_data_part - > left_date , parts [ i ] - > left_date ) ;
new_data_part - > right_date = std : : max ( new_data_part - > right_date , parts [ i ] - > right_date ) ;
2012-11-28 08:52:15 +00:00
}
+ + new_data_part - > level ;
2012-08-13 19:13:11 +00:00
new_data_part - > name = getPartName (
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-13 19:13:11 +00:00
2012-11-29 16:39:29 +00:00
/** Читаем из всех кусков, сливаем и пишем в новый.
2012-08-13 19:13:11 +00:00
* П о п у т н о в ы ч и с л я е м в ы р а ж е н и е д л я с о р т и р о в к и .
*/
BlockInputStreams src_streams ;
2012-07-30 20:32:36 +00:00
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2012-12-06 09:45:09 +00:00
MarkRanges ranges ( 1 , MarkRange ( 0 , parts [ i ] - > size ) ) ;
2012-11-28 08:52:15 +00:00
src_streams . push_back ( new ExpressionBlockInputStream ( new MergeTreeBlockInputStream (
2012-12-06 09:45:09 +00:00
full_path + parts [ i ] - > name + ' / ' , DEFAULT_BLOCK_SIZE , all_column_names , * this , parts [ i ] , ranges ) , primary_expr ) ) ;
2012-11-28 08:52:15 +00:00
}
2012-07-30 20:32:36 +00:00
2012-11-30 00:52:45 +00:00
BlockInputStreamPtr merged_stream = sign_column . empty ( )
? new MergingSortedBlockInputStream ( src_streams , sort_descr , DEFAULT_BLOCK_SIZE )
: new CollapsingSortedBlockInputStream ( src_streams , sort_descr , sign_column , DEFAULT_BLOCK_SIZE ) ;
2012-08-13 20:16:06 +00:00
2012-12-03 08:52:58 +00:00
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream ( * this ,
2012-11-28 08:52:15 +00:00
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
copyData ( * merged_stream , * to ) ;
2012-07-31 20:03:53 +00:00
2012-12-03 08:52:58 +00:00
new_data_part - > size = to - > marksCount ( ) ;
2012-08-13 19:13:11 +00:00
new_data_part - > modification_time = time ( 0 ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
/// Добавляем новый кусок в набор.
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
if ( data_parts . end ( ) = = data_parts . find ( parts [ i ] ) )
throw Exception ( " Logical error: cannot find data part " + parts [ i ] - > name + " in list " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
data_parts . insert ( new_data_part ) ;
all_data_parts . insert ( new_data_part ) ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
data_parts . erase ( data_parts . find ( parts [ i ] ) ) ;
}
2012-07-31 17:07:20 +00:00
}
2012-08-13 19:13:11 +00:00
2012-11-30 02:01:02 +00:00
LOG_TRACE ( log , " Merged " < < parts . size ( ) < < " parts: from " < < parts . front ( ) - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-08-16 18:17:01 +00:00
void StorageMergeTree : : drop ( )
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-08-16 18:17:01 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
data_parts . clear ( ) ;
all_data_parts . clear ( ) ;
Poco : : File ( full_path ) . remove ( true ) ;
}
2012-07-17 20:04:39 +00:00
}