2012-07-18 19:16:16 +00:00
# include <boost/bind.hpp>
2012-11-28 08:52:15 +00:00
# include <numeric>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
# include <Poco/DirectoryIterator.h>
# include <Poco/NumberParser.h>
2012-11-28 08:52:15 +00:00
# include <Poco/Ext/scopedTry.h>
2012-07-19 20:32:10 +00:00
# include <Yandex/time2str.h>
2012-07-17 20:04:39 +00:00
# include <DB/Common/escapeForFileName.h>
# include <DB/IO/WriteBufferFromString.h>
# include <DB/IO/WriteBufferFromFile.h>
# include <DB/IO/CompressedWriteBuffer.h>
2012-07-21 03:45:48 +00:00
# include <DB/IO/ReadBufferFromString.h>
2012-07-19 20:32:10 +00:00
# include <DB/IO/ReadBufferFromFile.h>
# include <DB/IO/CompressedReadBuffer.h>
2012-07-17 20:04:39 +00:00
# include <DB/Columns/ColumnsNumber.h>
2012-08-30 17:43:31 +00:00
# include <DB/Columns/ColumnArray.h>
2012-07-17 20:04:39 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/DataTypes/DataTypesNumberFixed.h>
2012-08-30 17:43:31 +00:00
# include <DB/DataTypes/DataTypeArray.h>
2012-07-21 03:45:48 +00:00
2012-07-19 20:32:10 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/MergingSortedBlockInputStream.h>
2012-08-16 17:27:40 +00:00
# include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2012-07-31 17:22:40 +00:00
# include <DB/DataStreams/ExpressionBlockInputStream.h>
2012-08-13 20:16:06 +00:00
# include <DB/DataStreams/AddingDefaultBlockInputStream.h>
2012-11-28 08:52:15 +00:00
# include <DB/DataStreams/ConcatBlockInputStream.h>
2012-07-21 06:47:17 +00:00
# include <DB/DataStreams/narrowBlockInputStreams.h>
2012-07-30 20:32:36 +00:00
# include <DB/DataStreams/copyData.h>
2012-07-19 20:32:10 +00:00
2012-07-21 03:45:48 +00:00
# include <DB/Parsers/ASTExpressionList.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTFunction.h>
# include <DB/Parsers/ASTLiteral.h>
2012-07-17 20:04:39 +00:00
# include <DB/Interpreters/sortBlock.h>
# include <DB/Storages/StorageMergeTree.h>
2012-07-19 20:32:10 +00:00
# define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
2012-07-17 20:04:39 +00:00
namespace DB
{
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public :
2012-08-30 17:43:31 +00:00
MergeTreeBlockOutputStream ( StorageMergeTree & storage_ ) : storage ( storage_ ) , flags ( O_TRUNC | O_CREAT | O_WRONLY )
2012-07-17 20:04:39 +00:00
{
}
void write ( const Block & block )
{
storage . check ( block ) ;
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
size_t rows = block . rows ( ) ;
size_t columns = block . columns ( ) ;
/// Достаём столбец с датой.
2012-07-18 19:16:16 +00:00
const ColumnUInt16 : : Container_t & dates =
dynamic_cast < const ColumnUInt16 & > ( * block . getByName ( storage . date_column_name ) . column ) . getData ( ) ;
2012-07-17 20:04:39 +00:00
/// Минимальная и максимальная дата.
UInt16 min_date = std : : numeric_limits < UInt16 > : : max ( ) ;
UInt16 max_date = std : : numeric_limits < UInt16 > : : min ( ) ;
for ( ColumnUInt16 : : Container_t : : const_iterator it = dates . begin ( ) ; it ! = dates . end ( ) ; + + it )
{
if ( * it < min_date )
min_date = * it ;
if ( * it > max_date )
max_date = * it ;
}
/// Разделяем на блоки по месяцам. Для каждого ещё посчитаем минимальную и максимальную дату.
typedef std : : map < UInt16 , BlockWithDateInterval > BlocksByMonth ;
BlocksByMonth blocks_by_month ;
2012-08-31 20:38:05 +00:00
UInt16 min_month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( min_date ) ) ;
UInt16 max_month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( max_date ) ) ;
2012-07-17 20:04:39 +00:00
/// Типичный случай - когда месяц один (ничего разделять не нужно).
if ( min_month = = max_month )
blocks_by_month [ min_month ] = BlockWithDateInterval ( block , min_date , max_date ) ;
else
{
for ( size_t i = 0 ; i < rows ; + + i )
{
2012-08-31 20:38:05 +00:00
UInt16 month = date_lut . toFirstDayNumOfMonth ( Yandex : : DayNum_t ( dates [ i ] ) ) ;
2012-08-31 18:40:21 +00:00
2012-07-17 20:04:39 +00:00
BlockWithDateInterval & block_for_month = blocks_by_month [ month ] ;
if ( ! block_for_month . block )
block_for_month . block = block . cloneEmpty ( ) ;
if ( dates [ i ] < block_for_month . min_date )
block_for_month . min_date = dates [ i ] ;
if ( dates [ i ] > block_for_month . max_date )
block_for_month . max_date = dates [ i ] ;
for ( size_t j = 0 ; j < columns ; + + j )
block_for_month . block . getByPosition ( j ) . column - > insert ( ( * block . getByPosition ( j ) . column ) [ i ] ) ;
}
}
/// Для каждого месяца.
2012-07-18 19:16:16 +00:00
for ( BlocksByMonth : : iterator it = blocks_by_month . begin ( ) ; it ! = blocks_by_month . end ( ) ; + + it )
2012-07-17 20:04:39 +00:00
writePart ( it - > second . block , it - > second . min_date , it - > second . max_date ) ;
}
BlockOutputStreamPtr clone ( ) { return new MergeTreeBlockOutputStream ( storage ) ; }
private :
StorageMergeTree & storage ;
2012-08-30 17:43:31 +00:00
const int flags ;
2012-07-17 20:04:39 +00:00
struct BlockWithDateInterval
{
Block block ;
UInt16 min_date ;
UInt16 max_date ;
BlockWithDateInterval ( ) : min_date ( std : : numeric_limits < UInt16 > : : max ( ) ) , max_date ( 0 ) { }
2012-07-18 19:16:16 +00:00
BlockWithDateInterval ( const Block & block_ , UInt16 min_date_ , UInt16 max_date_ )
: block ( block_ ) , min_date ( min_date_ ) , max_date ( max_date_ ) { }
2012-07-17 20:04:39 +00:00
} ;
void writePart ( Block & block , UInt16 min_date , UInt16 max_date )
{
2012-07-21 07:21:41 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-18 19:16:16 +00:00
size_t rows = block . rows ( ) ;
2012-07-17 20:04:39 +00:00
size_t columns = block . columns ( ) ;
UInt64 part_id = storage . increment . get ( true ) ;
2012-07-21 07:21:41 +00:00
String part_name = storage . getPartName (
Yandex : : DayNum_t ( min_date ) , Yandex : : DayNum_t ( max_date ) ,
part_id , part_id , 0 ) ;
2012-07-18 19:16:16 +00:00
2012-07-21 07:21:41 +00:00
String part_tmp_path = storage . full_path + " tmp_ " + part_name + " / " ;
String part_res_path = storage . full_path + part_name + " / " ;
2012-07-17 20:04:39 +00:00
Poco : : File ( part_tmp_path ) . createDirectories ( ) ;
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Calculating primary expression. " ) ;
2012-07-17 20:04:39 +00:00
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
2012-07-18 20:14:41 +00:00
storage . primary_expr - > execute ( block ) ;
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Sorting by primary key. " ) ;
2012-07-17 20:04:39 +00:00
/// Сортируем.
sortBlock ( block , storage . sort_descr ) ;
/// Наконец-то можно писать данные на диск.
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Writing index. " ) ;
2012-07-18 19:16:16 +00:00
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
{
WriteBufferFromFile index ( part_tmp_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
typedef std : : vector < const ColumnWithNameAndType * > PrimaryColumns ;
PrimaryColumns primary_columns ;
for ( size_t i = 0 , size = storage . sort_descr . size ( ) ; i < size ; + + i )
primary_columns . push_back (
! storage . sort_descr [ i ] . column_name . empty ( )
? & block . getByName ( storage . sort_descr [ i ] . column_name )
: & block . getByPosition ( storage . sort_descr [ i ] . column_number ) ) ;
for ( size_t i = 0 ; i < rows ; i + = storage . index_granularity )
for ( PrimaryColumns : : const_iterator it = primary_columns . begin ( ) ; it ! = primary_columns . end ( ) ; + + it )
( * it ) - > type - > serializeBinary ( ( * ( * it ) - > column ) [ i ] , index ) ;
}
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Writing data. " ) ;
2012-07-17 20:04:39 +00:00
for ( size_t i = 0 ; i < columns ; + + i )
{
const ColumnWithNameAndType & column = block . getByPosition ( i ) ;
2012-08-30 17:43:31 +00:00
writeData ( part_tmp_path , column . name , * column . type , * column . column ) ;
2012-07-18 19:16:16 +00:00
}
2012-07-31 16:58:37 +00:00
LOG_TRACE ( storage . log , " Renaming. " ) ;
2012-07-17 20:04:39 +00:00
2012-07-18 19:16:16 +00:00
/// Переименовываем кусок.
Poco : : File ( part_tmp_path ) . renameTo ( part_res_path ) ;
2012-07-21 07:21:41 +00:00
/// Добавляем новый кусок в набор.
2012-07-23 06:23:29 +00:00
{
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( storage . data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( storage . all_data_parts_mutex ) ;
StorageMergeTree : : DataPartPtr new_data_part = new StorageMergeTree : : DataPart ( storage ) ;
new_data_part - > left_date = Yandex : : DayNum_t ( min_date ) ;
new_data_part - > right_date = Yandex : : DayNum_t ( max_date ) ;
new_data_part - > left = part_id ;
new_data_part - > right = part_id ;
new_data_part - > level = 0 ;
new_data_part - > name = part_name ;
new_data_part - > size = rows / storage . index_granularity ;
new_data_part - > modification_time = time ( 0 ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-10 20:04:34 +00:00
storage . data_parts . insert ( new_data_part ) ;
2012-07-23 06:23:29 +00:00
storage . all_data_parts . insert ( new_data_part ) ;
}
2012-11-28 11:49:14 +00:00
/// Если на каждую запись делать по две итерации слияния, то дерево будет максимально компактно.
storage . merge ( 2 ) ;
2012-07-18 19:16:16 +00:00
}
2012-07-17 20:04:39 +00:00
2012-08-30 17:43:31 +00:00
/// Записать данные одного столбца.
void writeData ( const String & path , const String & name , const IDataType & type , const IColumn & column , size_t level = 0 )
{
String escaped_column_name = escapeForFileName ( name ) ;
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
WriteBufferFromFile plain ( path + size_name + " .bin " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
WriteBufferFromFile marks ( path + size_name + " .mrk " , 4096 , flags ) ;
CompressedWriteBuffer compressed ( plain ) ;
size_t prev_mark = 0 ;
type_arr - > serializeOffsets ( column , compressed ,
boost : : bind ( & MergeTreeBlockOutputStream : : writeCallback , this ,
boost : : ref ( prev_mark ) , boost : : ref ( plain ) , boost : : ref ( compressed ) , boost : : ref ( marks ) ) ) ;
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
WriteBufferFromFile plain ( path + escaped_column_name + " .bin " , DBMS_DEFAULT_BUFFER_SIZE , flags ) ;
WriteBufferFromFile marks ( path + escaped_column_name + " .mrk " , 4096 , flags ) ;
CompressedWriteBuffer compressed ( plain ) ;
size_t prev_mark = 0 ;
type . serializeBinary ( column , compressed ,
boost : : bind ( & MergeTreeBlockOutputStream : : writeCallback , this ,
boost : : ref ( prev_mark ) , boost : : ref ( plain ) , boost : : ref ( compressed ) , boost : : ref ( marks ) ) ) ;
}
}
2012-07-18 19:16:16 +00:00
/// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk).
size_t writeCallback ( size_t & prev_mark ,
WriteBufferFromFile & plain ,
CompressedWriteBuffer & compressed ,
WriteBufferFromFile & marks )
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
2012-08-30 17:43:31 +00:00
2012-07-18 19:16:16 +00:00
writeIntBinary ( plain . count ( ) , marks ) ;
writeIntBinary ( compressed . offset ( ) , marks ) ;
prev_mark + = storage . index_granularity ;
return prev_mark ;
2012-07-17 20:04:39 +00:00
}
} ;
2012-07-30 20:32:36 +00:00
/** Для записи куска, полученного слиянием нескольких других.
* Д а н н ы е у ж е о т с о р т и р о в а н ы , о т н о с я т с я к о д н о м у м е с я ц у , и п и ш у т с я в о д и н к у с к о к .
*/
class MergedBlockOutputStream : public IBlockOutputStream
{
public :
MergedBlockOutputStream ( StorageMergeTree & storage_ ,
UInt16 min_date , UInt16 max_date , UInt64 min_part_id , UInt64 max_part_id , UInt32 level )
: storage ( storage_ ) , index_offset ( 0 )
{
part_name = storage . getPartName (
Yandex : : DayNum_t ( min_date ) , Yandex : : DayNum_t ( max_date ) ,
min_part_id , max_part_id , level ) ;
part_tmp_path = storage . full_path + " tmp_ " + part_name + " / " ;
part_res_path = storage . full_path + part_name + " / " ;
Poco : : File ( part_tmp_path ) . createDirectories ( ) ;
2012-08-21 15:37:29 +00:00
index_stream = new WriteBufferFromFile ( part_tmp_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , O_TRUNC | O_CREAT | O_WRONLY ) ;
2012-07-30 20:32:36 +00:00
for ( NamesAndTypesList : : const_iterator it = storage . columns - > begin ( ) ; it ! = storage . columns - > end ( ) ; + + it )
2012-08-30 17:43:31 +00:00
addStream ( it - > first , * it - > second ) ;
2012-07-30 20:32:36 +00:00
}
void write ( const Block & block )
{
size_t rows = block . rows ( ) ;
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
typedef std : : vector < const ColumnWithNameAndType * > PrimaryColumns ;
PrimaryColumns primary_columns ;
for ( size_t i = 0 , size = storage . sort_descr . size ( ) ; i < size ; + + i )
primary_columns . push_back (
! storage . sort_descr [ i ] . column_name . empty ( )
? & block . getByName ( storage . sort_descr [ i ] . column_name )
: & block . getByPosition ( storage . sort_descr [ i ] . column_number ) ) ;
for ( size_t i = index_offset ; i < rows ; i + = storage . index_granularity )
for ( PrimaryColumns : : const_iterator it = primary_columns . begin ( ) ; it ! = primary_columns . end ( ) ; + + it )
( * it ) - > type - > serializeBinary ( ( * ( * it ) - > column ) [ i ] , * index_stream ) ;
2012-08-30 17:43:31 +00:00
/// Теперь пишем данные.
2012-07-30 20:32:36 +00:00
for ( NamesAndTypesList : : const_iterator it = storage . columns - > begin ( ) ; it ! = storage . columns - > end ( ) ; + + it )
{
const ColumnWithNameAndType & column = block . getByName ( it - > first ) ;
2012-08-30 17:43:31 +00:00
writeData ( column . name , * column . type , * column . column ) ;
2012-07-30 20:32:36 +00:00
}
2012-07-31 18:15:51 +00:00
index_offset = rows % storage . index_granularity
? ( storage . index_granularity - rows % storage . index_granularity )
: 0 ;
2012-07-30 20:32:36 +00:00
}
void writeSuffix ( )
{
/// Заканчиваем запись.
index_stream = NULL ;
column_streams . clear ( ) ;
/// Переименовываем кусок.
Poco : : File ( part_tmp_path ) . renameTo ( part_res_path ) ;
/// А добавление нового куска в набор (и удаление исходных кусков) сделает вызывающая сторона.
}
BlockOutputStreamPtr clone ( ) { throw Exception ( " Cannot clone MergedBlockOutputStream " , ErrorCodes : : NOT_IMPLEMENTED ) ; }
private :
StorageMergeTree & storage ;
String part_name ;
String part_tmp_path ;
String part_res_path ;
struct ColumnStream
{
ColumnStream ( const String & data_path , const std : : string & marks_path ) :
2012-08-21 15:37:29 +00:00
plain ( data_path , DBMS_DEFAULT_BUFFER_SIZE , O_TRUNC | O_CREAT | O_WRONLY ) ,
2012-07-30 20:32:36 +00:00
compressed ( plain ) ,
2012-08-21 15:37:29 +00:00
marks ( marks_path , 4096 , O_TRUNC | O_CREAT | O_WRONLY ) { }
2012-07-30 20:32:36 +00:00
WriteBufferFromFile plain ;
CompressedWriteBuffer compressed ;
WriteBufferFromFile marks ;
} ;
typedef std : : map < String , SharedPtr < ColumnStream > > ColumnStreams ;
ColumnStreams column_streams ;
SharedPtr < WriteBuffer > index_stream ;
/// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset ;
2012-08-30 17:43:31 +00:00
void addStream ( const String & name , const IDataType & type , size_t level = 0 )
{
String escaped_column_name = escapeForFileName ( name ) ;
/// Для массивов используются отдельные потоки для размеров.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
column_streams [ size_name ] = new ColumnStream (
part_tmp_path + escaped_size_name + " .bin " ,
part_tmp_path + escaped_size_name + " .mrk " ) ;
addStream ( name , * type_arr - > getNestedType ( ) , level + 1 ) ;
}
else
column_streams [ name ] = new ColumnStream (
part_tmp_path + escaped_column_name + " .bin " ,
part_tmp_path + escaped_column_name + " .mrk " ) ;
}
/// Записать данные одного столбца.
void writeData ( const String & name , const IDataType & type , const IColumn & column , size_t level = 0 )
{
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
ColumnStream & stream = * column_streams [ size_name ] ;
size_t prev_mark = 0 ;
2012-08-31 18:40:21 +00:00
type_arr - > serializeOffsets ( column , stream . compressed ,
2012-08-30 17:43:31 +00:00
boost : : bind ( & MergedBlockOutputStream : : writeCallback , this ,
boost : : ref ( prev_mark ) , boost : : ref ( stream . plain ) , boost : : ref ( stream . compressed ) , boost : : ref ( stream . marks ) ) ) ;
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
ColumnStream & stream = * column_streams [ name ] ;
size_t prev_mark = 0 ;
type . serializeBinary ( column , stream . compressed ,
boost : : bind ( & MergedBlockOutputStream : : writeCallback , this ,
boost : : ref ( prev_mark ) , boost : : ref ( stream . plain ) , boost : : ref ( stream . compressed ) , boost : : ref ( stream . marks ) ) ) ;
}
}
2012-07-30 20:32:36 +00:00
/// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk).
size_t writeCallback ( size_t & prev_mark ,
WriteBufferFromFile & plain ,
CompressedWriteBuffer & compressed ,
WriteBufferFromFile & marks )
{
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if ( prev_mark = = 0 & & index_offset ! = 0 )
{
prev_mark = index_offset ;
return prev_mark ;
}
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary ( plain . count ( ) , marks ) ;
writeIntBinary ( compressed . offset ( ) , marks ) ;
prev_mark + = storage . index_granularity ;
return prev_mark ;
}
} ;
2012-07-21 06:47:17 +00:00
/** Диапазон с открытыми или закрытыми концами; возможно, неограниченный.
* О п р е д е л я е т , к а к у ю ч а с т ь д а н н ы х ч и т а т ь , п р и н а л и ч и и и н д е к с а .
*/
struct Range
{
Field left ; /// левая граница, если есть
Field right ; /// правая граница, если есть
bool left_bounded ; /// ограничен ли слева
bool right_bounded ; /// ограничен ли справа
bool left_included ; /// включает левую границу, если есть
bool right_included ; /// включает правую границу, если есть
/// Всё множество.
Range ( ) : left ( ) , right ( ) , left_bounded ( false ) , right_bounded ( false ) , left_included ( false ) , right_included ( false ) { }
/// Одна точка.
Range ( const Field & point ) : left ( point ) , right ( point ) , left_bounded ( true ) , right_bounded ( true ) , left_included ( true ) , right_included ( true ) { }
/// Установить левую границу.
void setLeft ( const Field & point , bool included )
{
left = point ;
left_bounded = true ;
left_included = included ;
}
/// Установить правую границу.
void setRight ( const Field & point , bool included )
{
right = point ;
right_bounded = true ;
right_included = included ;
}
/// x входит в range
bool contains ( const Field & x )
{
return ! leftThan ( x ) & & ! rightThan ( x ) ;
}
/// x находится левее
bool rightThan ( const Field & x )
{
return ( left_bounded
2012-07-21 07:08:38 +00:00
? ! ( boost : : apply_visitor ( FieldVisitorGreater ( ) , x , left ) | | ( left_included & & x = = left ) )
: false ) ;
2012-07-21 06:47:17 +00:00
}
/// x находится правее
bool leftThan ( const Field & x )
{
return ( right_bounded
2012-07-21 07:08:38 +00:00
? ! ( boost : : apply_visitor ( FieldVisitorLess ( ) , x , right ) | | ( right_included & & x = = right ) )
: false ) ;
2012-07-21 06:47:17 +00:00
}
/// Пересекает отрезок
bool intersectsSegment ( const Field & segment_left , const Field & segment_right )
{
if ( ! left_bounded )
return contains ( segment_left ) ;
if ( ! right_bounded )
return contains ( segment_right ) ;
2012-07-21 07:08:38 +00:00
return ( boost : : apply_visitor ( FieldVisitorLess ( ) , segment_left , right ) | | ( right_included & & segment_left = = right ) )
& & ( boost : : apply_visitor ( FieldVisitorGreater ( ) , segment_right , left ) | | ( left_included & & segment_right = = left ) ) ;
2012-07-21 06:47:17 +00:00
}
String toString ( )
{
std : : stringstream str ;
if ( ! left_bounded )
str < < " (-inf, " ;
else
str < < ( left_included ? ' [ ' : ' ( ' ) < < boost : : apply_visitor ( FieldVisitorToString ( ) , left ) < < " , " ;
if ( ! right_bounded )
str < < " +inf) " ;
else
str < < boost : : apply_visitor ( FieldVisitorToString ( ) , right ) < < ( right_included ? ' ] ' : ' ) ' ) ;
return str . str ( ) ;
}
} ;
2012-07-19 20:32:10 +00:00
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
{
public :
2012-07-21 06:47:17 +00:00
MergeTreeBlockInputStream ( const String & path_ , /// Путь к куску
2012-11-28 08:52:15 +00:00
size_t block_size_ , const Names & column_names_ ,
StorageMergeTree & storage_ , const StorageMergeTree : : DataPartPtr & owned_data_part_ ,
size_t mark_number_ , size_t rows_limit_ )
2012-07-21 06:47:17 +00:00
: path ( path_ ) , block_size ( block_size_ ) , column_names ( column_names_ ) ,
2012-08-10 20:04:34 +00:00
storage ( storage_ ) , owned_data_part ( owned_data_part_ ) ,
2012-11-28 08:52:15 +00:00
mark_number ( mark_number_ ) , rows_limit ( rows_limit_ ) , rows_read ( 0 )
{
}
String getName ( ) const { return " MergeTreeBlockInputStream " ; }
BlockInputStreamPtr clone ( )
{
return new MergeTreeBlockInputStream ( path , block_size , column_names , storage , owned_data_part , mark_number , rows_limit ) ;
}
/// Получает диапазон засечек, вне которого не могут находиться ключи из заданного диапазона.
static void markRangeFromPkRange ( const String & path ,
size_t marks_count ,
StorageMergeTree & storage ,
Row & requested_pk_prefix ,
Range & requested_pk_range ,
size_t & out_first_mark ,
size_t & out_last_mark )
2012-07-21 06:47:17 +00:00
{
2012-11-28 08:52:15 +00:00
size_t last_mark_in_file = ( marks_count = = 0 ) ? 0 : ( marks_count - 1 ) ;
2012-07-21 06:47:17 +00:00
/// Если индекс не используется.
if ( requested_pk_prefix . size ( ) = = 0 & & ! requested_pk_range . left_bounded & & ! requested_pk_range . right_bounded )
{
2012-11-28 08:52:15 +00:00
out_first_mark = 0 ;
out_last_mark = last_mark_in_file ;
2012-07-21 06:47:17 +00:00
}
else
{
/// Читаем PK, и на основе primary_prefix, primary_range определим mark_number и rows_limit.
2012-11-28 08:52:15 +00:00
2012-09-22 07:30:40 +00:00
ssize_t min_mark_number = - 1 ;
ssize_t max_mark_number = - 1 ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
String index_path = path + " primary.idx " ;
ReadBufferFromFile index ( index_path , std : : min ( static_cast < size_t > ( DBMS_DEFAULT_BUFFER_SIZE ) , Poco : : File ( index_path ) . getSize ( ) ) ) ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
size_t prefix_size = requested_pk_prefix . size ( ) ;
2012-07-21 07:02:55 +00:00
Row pk ( storage . sort_descr . size ( ) ) ;
2012-07-21 06:47:17 +00:00
Row pk_prefix ( prefix_size ) ;
for ( size_t current_mark_number = 0 ; ! index . eof ( ) ; + + current_mark_number )
{
/// Читаем очередное значение PK
2012-07-21 07:02:55 +00:00
Row pk ( storage . sort_descr . size ( ) ) ;
for ( size_t i = 0 , size = pk . size ( ) ; i < size ; + + i )
storage . primary_key_sample . getByPosition ( i ) . type - > deserializeBinary ( pk [ i ] , index ) ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
pk_prefix . assign ( pk . begin ( ) , pk . begin ( ) + pk_prefix . size ( ) ) ;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
if ( pk_prefix < requested_pk_prefix )
{
min_mark_number = current_mark_number ;
}
else if ( pk_prefix = = requested_pk_prefix )
{
if ( requested_pk_range . rightThan ( pk [ prefix_size ] ) )
{
min_mark_number = current_mark_number ;
}
else if ( requested_pk_range . leftThan ( pk [ prefix_size ] ) )
{
max_mark_number = current_mark_number = = 0 ? 0 : ( current_mark_number - 1 ) ;
2012-11-19 18:04:50 +00:00
break ;
2012-07-21 06:47:17 +00:00
}
}
else
{
max_mark_number = current_mark_number = = 0 ? 0 : ( current_mark_number - 1 ) ;
break ;
2012-11-20 22:48:38 +00:00
}
2012-07-21 06:47:17 +00:00
}
2012-11-28 08:52:15 +00:00
out_first_mark = min_mark_number = = - 1 ? 0 : min_mark_number ;
2012-11-28 13:29:30 +00:00
out_last_mark = max_mark_number = = - 1 ? last_mark_in_file : max_mark_number ;
2012-07-31 19:08:49 +00:00
}
2012-07-21 06:47:17 +00:00
}
2012-10-20 02:10:47 +00:00
protected :
2012-07-19 20:32:10 +00:00
Block readImpl ( )
{
Block res ;
if ( rows_read = = rows_limit )
return res ;
/// Если файлы не открыты, то открываем их.
if ( streams . empty ( ) )
for ( Names : : const_iterator it = column_names . begin ( ) ; it ! = column_names . end ( ) ; + + it )
2012-08-30 17:43:31 +00:00
addStream ( * it , * storage . getDataTypeByName ( * it ) ) ;
2012-07-19 20:32:10 +00:00
/// Сколько строк читать для следующего блока.
size_t max_rows_to_read = std : : min ( block_size , rows_limit - rows_read ) ;
for ( Names : : const_iterator it = column_names . begin ( ) ; it ! = column_names . end ( ) ; + + it )
{
ColumnWithNameAndType column ;
column . name = * it ;
column . type = storage . getDataTypeByName ( * it ) ;
column . column = column . type - > createColumn ( ) ;
2012-08-30 17:43:31 +00:00
readData ( * it , * column . type , * column . column , max_rows_to_read ) ;
2012-07-19 20:32:10 +00:00
if ( column . column - > size ( ) )
res . insert ( column ) ;
}
if ( res )
rows_read + = res . rows ( ) ;
if ( ! res | | rows_read = = rows_limit )
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Ч т о б ы п р и с о з д а н и и м н о г и х и с т о ч н и к о в , н о о д н о в р е м е н н о м ч т е н и и т о л ь к о и з н е с к о л ь к и х ,
* б у ф е р ы н е в и с е л и в п а м я т и .
*/
streams . clear ( ) ;
}
return res ;
}
private :
const String path ;
size_t block_size ;
Names column_names ;
StorageMergeTree & storage ;
2012-08-10 20:04:34 +00:00
const StorageMergeTree : : DataPartPtr owned_data_part ; /// Кусок не будет удалён, пока им владеет этот объект.
2012-07-19 20:32:10 +00:00
size_t mark_number ; /// С какой засечки читать данные
size_t rows_limit ; /// Максимальное количество строк, которых можно прочитать
size_t rows_read ;
struct Stream
{
Stream ( const String & path_prefix , size_t mark_number )
: plain ( path_prefix + " .bin " , std : : min ( static_cast < size_t > ( DBMS_DEFAULT_BUFFER_SIZE ) , Poco : : File ( path_prefix + " .bin " ) . getSize ( ) ) ) ,
compressed ( plain )
{
if ( mark_number )
{
/// Прочитаем из файла с засечками смещение в файле с данными.
ReadBufferFromFile marks ( path_prefix + " .mrk " , MERGE_TREE_MARK_SIZE ) ;
marks . seek ( mark_number * MERGE_TREE_MARK_SIZE ) ;
size_t offset_in_compressed_file = 0 ;
size_t offset_in_decompressed_block = 0 ;
readIntBinary ( offset_in_compressed_file , marks ) ;
readIntBinary ( offset_in_decompressed_block , marks ) ;
plain . seek ( offset_in_compressed_file ) ;
compressed . next ( ) ;
compressed . position ( ) + = offset_in_decompressed_block ;
}
}
ReadBufferFromFile plain ;
CompressedReadBuffer compressed ;
} ;
typedef std : : map < std : : string , SharedPtr < Stream > > FileStreams ;
FileStreams streams ;
2012-08-30 17:43:31 +00:00
void addStream ( const String & name , const IDataType & type , size_t level = 0 )
{
String escaped_column_name = escapeForFileName ( name ) ;
/// Для массивов используются отдельные потоки для размеров.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ;
streams . insert ( std : : make_pair ( size_name , new Stream (
path + escaped_size_name ,
mark_number ) ) ) ;
addStream ( name , * type_arr - > getNestedType ( ) , level + 1 ) ;
}
else
streams . insert ( std : : make_pair ( name , new Stream (
path + escaped_column_name ,
mark_number ) ) ) ;
}
void readData ( const String & name , const IDataType & type , IColumn & column , size_t max_rows_to_read , size_t level = 0 )
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if ( const DataTypeArray * type_arr = dynamic_cast < const DataTypeArray * > ( & type ) )
{
type_arr - > deserializeOffsets (
column ,
streams [ name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco : : NumberFormatter : : format ( level ) ] - > compressed ,
max_rows_to_read ) ;
2012-08-30 20:35:02 +00:00
if ( column . size ( ) )
readData (
name ,
* type_arr - > getNestedType ( ) ,
dynamic_cast < ColumnArray & > ( column ) . getData ( ) ,
dynamic_cast < const ColumnArray & > ( column ) . getOffsets ( ) [ column . size ( ) - 1 ] ,
level + 1 ) ;
2012-08-30 17:43:31 +00:00
}
else
type . deserializeBinary ( column , streams [ name ] - > compressed , max_rows_to_read ) ;
}
2012-07-19 20:32:10 +00:00
} ;
2012-07-17 20:04:39 +00:00
StorageMergeTree : : StorageMergeTree (
const String & path_ , const String & name_ , NamesAndTypesListPtr columns_ ,
Context & context_ ,
ASTPtr & primary_expr_ast_ , const String & date_column_name_ ,
2012-08-16 17:27:40 +00:00
size_t index_granularity_ ,
2012-08-20 05:32:50 +00:00
const String & sign_column_ ,
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_ )
2012-07-17 20:04:39 +00:00
: path ( path_ ) , name ( name_ ) , full_path ( path + escapeForFileName ( name ) + ' / ' ) , columns ( columns_ ) ,
context ( context_ ) , primary_expr_ast ( primary_expr_ast_ - > clone ( ) ) ,
date_column_name ( date_column_name_ ) , index_granularity ( index_granularity_ ) ,
2012-08-20 05:32:50 +00:00
sign_column ( sign_column_ ) ,
2012-08-29 20:23:19 +00:00
settings ( settings_ ) ,
2012-07-19 20:32:10 +00:00
increment ( full_path + " increment.txt " ) , log ( & Logger : : get ( " StorageMergeTree: " + name ) )
2012-07-17 20:04:39 +00:00
{
/// создаём директорию, если её нет
Poco : : File ( full_path ) . createDirectories ( ) ;
/// инициализируем описание сортировки
sort_descr . reserve ( primary_expr_ast - > children . size ( ) ) ;
for ( ASTs : : iterator it = primary_expr_ast - > children . begin ( ) ;
it ! = primary_expr_ast - > children . end ( ) ;
+ + it )
{
2012-07-18 20:14:41 +00:00
String name = ( * it ) - > getColumnName ( ) ;
2012-07-17 20:04:39 +00:00
sort_descr . push_back ( SortColumnDescription ( name , 1 ) ) ;
}
2012-07-18 20:14:41 +00:00
2012-08-02 17:33:31 +00:00
context . setColumns ( * columns ) ;
2012-07-18 20:14:41 +00:00
primary_expr = new Expression ( primary_expr_ast , context ) ;
2012-07-21 07:02:55 +00:00
primary_key_sample = primary_expr - > getSampleBlock ( ) ;
2012-07-19 20:32:10 +00:00
2012-11-28 08:52:15 +00:00
merge_threads = new boost : : threadpool : : pool ( settings . merging_threads ) ;
2012-07-19 20:32:10 +00:00
loadDataParts ( ) ;
2012-07-17 20:04:39 +00:00
}
2012-07-30 20:32:36 +00:00
StorageMergeTree : : ~ StorageMergeTree ( )
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-07-30 20:32:36 +00:00
}
2012-07-18 19:44:04 +00:00
BlockOutputStreamPtr StorageMergeTree : : write ( ASTPtr query )
{
return new MergeTreeBlockOutputStream ( * this ) ;
}
2012-07-21 03:45:48 +00:00
/// Собирает список отношений в конъюнкции в секции WHERE для определения того, можно ли использовать индекс.
static void getRelationsFromConjunction ( ASTPtr & node , ASTs & relations )
{
if ( ASTFunction * func = dynamic_cast < ASTFunction * > ( & * node ) )
{
if ( func - > name = = " equals "
| | func - > name = = " less " | | func - > name = = " greater "
| | func - > name = = " lessOrEquals " | | func - > name = = " greaterOrEquals " )
{
relations . push_back ( node ) ;
}
else if ( func - > name = = " and " )
{
/// Обходим рекурсивно.
ASTs & args = dynamic_cast < ASTExpressionList & > ( * func - > arguments ) . children ;
getRelationsFromConjunction ( args . at ( 0 ) , relations ) ;
getRelationsFromConjunction ( args . at ( 1 ) , relations ) ;
}
}
}
2012-07-21 05:16:14 +00:00
/** Получить значение константного выражения.
* В е р н у т ь false , е с л и в ы р а ж е н и е н е к о н с т а н т н о .
2012-07-21 05:07:14 +00:00
*/
2012-07-21 05:16:14 +00:00
static bool getConstant ( ASTPtr & expr , Block & block_with_constants , Field & value )
2012-07-21 05:07:14 +00:00
{
2012-07-21 05:16:14 +00:00
String column_name = expr - > getColumnName ( ) ;
2012-07-21 05:07:14 +00:00
2012-07-21 05:16:14 +00:00
if ( ASTLiteral * lit = dynamic_cast < ASTLiteral * > ( & * expr ) )
2012-07-21 05:07:14 +00:00
{
2012-07-21 05:16:14 +00:00
/// литерал
value = lit - > value ;
2012-07-21 05:07:14 +00:00
return true ;
}
2012-07-21 05:16:14 +00:00
else if ( block_with_constants . has ( column_name ) & & block_with_constants . getByName ( column_name ) . column - > isConst ( ) )
2012-07-21 05:07:14 +00:00
{
2012-07-21 05:16:14 +00:00
/// выражение, вычислившееся в константу
value = ( * block_with_constants . getByName ( column_name ) . column ) [ 0 ] ;
2012-07-21 05:07:14 +00:00
return true ;
}
else
return false ;
}
2012-07-21 05:16:14 +00:00
/** Получить значение константного аргумента функции вида f(name, const_expr) или f(const_expr, name).
* block_with_constants с о д е р ж и т в ы ч и с л е н н ы е з н а ч е н и я к о н с т а н т н ы х в ы р а ж е н и й .
* В е р н у т ь false , е с л и т а к о г о н е т .
*/
static bool getConstantArgument ( ASTs & args , Block & block_with_constants , Field & rhs )
{
if ( args . size ( ) ! = 2 )
return false ;
return getConstant ( args [ 0 ] , block_with_constants , rhs )
| | getConstant ( args [ 1 ] , block_with_constants , rhs ) ;
}
2012-07-21 05:07:14 +00:00
/// Составить диапазон возможных значений для столбца на основе секции WHERE с вычисленными константными выражениями.
static Range getRangeForColumn ( ASTs & relations , const String & column_name , Block & block_with_constants )
{
Range range ;
for ( ASTs : : iterator jt = relations . begin ( ) ; jt ! = relations . end ( ) ; + + jt )
{
ASTFunction * func = dynamic_cast < ASTFunction * > ( & * * jt ) ;
if ( ! func )
continue ;
ASTs & args = dynamic_cast < ASTExpressionList & > ( * func - > arguments ) . children ;
if ( args . size ( ) ! = 2 )
continue ;
/// Шаблон: col rel const или const rel col
bool inverted ;
2012-07-21 05:16:14 +00:00
if ( column_name = = args [ 0 ] - > getColumnName ( ) )
2012-07-21 05:07:14 +00:00
inverted = false ;
2012-07-21 05:16:14 +00:00
else if ( column_name = = args [ 1 ] - > getColumnName ( ) )
2012-07-21 05:07:14 +00:00
inverted = true ;
else
continue ;
Field rhs ;
if ( ! getConstantArgument ( args , block_with_constants , rhs ) )
continue ;
if ( func - > name = = " equals " )
{
range = Range ( rhs ) ;
break ;
}
else if ( func - > name = = " greater " )
! inverted ? range . setLeft ( rhs , false ) : range . setRight ( rhs , false ) ;
else if ( func - > name = = " greaterOrEquals " )
! inverted ? range . setLeft ( rhs , true ) : range . setRight ( rhs , true ) ;
else if ( func - > name = = " less " )
! inverted ? range . setRight ( rhs , false ) : range . setLeft ( rhs , false ) ;
else if ( func - > name = = " lessOrEquals " )
! inverted ? range . setRight ( rhs , true ) : range . setLeft ( rhs , true ) ;
}
return range ;
}
/** Выделяет значение, которому должен быть равен столбец на основе секции WHERE с вычисленными константными выражениями.
* Е с л и т а к о г о н е т - в о з в р а щ а е т false .
*/
static bool getEqualityForColumn ( ASTs & relations , const String & column_name , Block & block_with_constants , Field & value )
{
for ( ASTs : : iterator jt = relations . begin ( ) ; jt ! = relations . end ( ) ; + + jt )
{
ASTFunction * func = dynamic_cast < ASTFunction * > ( & * * jt ) ;
if ( ! func | | func - > name ! = " equals " )
continue ;
ASTs & args = dynamic_cast < ASTExpressionList & > ( * func - > arguments ) . children ;
if ( args . size ( ) ! = 2 )
continue ;
2012-07-21 05:16:14 +00:00
if ( args [ 0 ] - > getColumnName ( ) ! = column_name & & args [ 1 ] - > getColumnName ( ) ! = column_name )
2012-07-21 05:07:14 +00:00
continue ;
if ( getConstantArgument ( args , block_with_constants , value ) )
return true ;
else
continue ;
}
return false ;
}
void StorageMergeTree : : getIndexRanges ( ASTPtr & query , Range & date_range , Row & primary_prefix , Range & primary_range )
2012-07-21 03:45:48 +00:00
{
/** Вычисление выражений, зависящих только от констант.
* Ч т о б ы и н д е к с м о г и с п о л ь з о в а т ь с я , е с л и н а п и с а н о , н а п р и м е р WHERE Date = toDate ( now ( ) ) .
*/
Expression expr_for_constant_folding ( query , context ) ;
Block block_with_constants ;
/// В блоке должен быть хотя бы один столбец, чтобы у него было известно число строк.
ColumnWithNameAndType dummy_column ;
dummy_column . name = " _dummy " ;
dummy_column . type = new DataTypeUInt8 ;
dummy_column . column = new ColumnConstUInt8 ( 1 , 0 ) ;
block_with_constants . insert ( dummy_column ) ;
2012-07-21 05:07:14 +00:00
expr_for_constant_folding . execute ( block_with_constants , 0 , true ) ;
2012-07-21 03:45:48 +00:00
/// Выделяем из конъюнкции в секции WHERE все отношения.
ASTSelectQuery & select = dynamic_cast < ASTSelectQuery & > ( * query ) ;
if ( select . where_expression )
{
ASTs relations ;
getRelationsFromConjunction ( select . where_expression , relations ) ;
/// Ищем отношения, которые могут быть использованы для индекса по дате.
2012-07-21 05:07:14 +00:00
date_range = getRangeForColumn ( relations , date_column_name , block_with_constants ) ;
2012-07-21 03:45:48 +00:00
2012-07-21 05:07:14 +00:00
/** Теперь ищем отношения, которые могут быть использованы для первичного ключа.
* С н а ч а л а н а х о д и м м а к с и м а л ь н о е к о л и ч е с т в о о т н о ш е н и й р а в е н с т в а к о н с т а н т е д л я п е р в ы х с т о л б ц о в PK .
*/
2012-07-21 03:45:48 +00:00
for ( SortDescription : : const_iterator it = sort_descr . begin ( ) ; it ! = sort_descr . end ( ) ; + + it )
{
2012-07-21 05:07:14 +00:00
Field rhs ;
if ( getEqualityForColumn ( relations , it - > column_name , block_with_constants , rhs ) )
primary_prefix . push_back ( rhs ) ;
else
2012-07-21 03:45:48 +00:00
break ;
}
/// Если не для всех столбцов PK записано равенство, то ищем отношения для следующего столбца PK.
if ( primary_prefix . size ( ) < sort_descr . size ( ) )
2012-07-21 05:07:14 +00:00
primary_range = getRangeForColumn ( relations , sort_descr [ primary_prefix . size ( ) ] . column_name , block_with_constants ) ;
2012-07-21 03:45:48 +00:00
}
LOG_DEBUG ( log , " Date range: " < < date_range . toString ( ) ) ;
std : : stringstream primary_prefix_str ;
for ( Row : : const_iterator it = primary_prefix . begin ( ) ; it ! = primary_prefix . end ( ) ; + + it )
primary_prefix_str < < ( it ! = primary_prefix . begin ( ) ? " , " : " " ) < < boost : : apply_visitor ( FieldVisitorToString ( ) , * it ) ;
LOG_DEBUG ( log , " Primary key prefix: ( " < < primary_prefix_str . str ( ) < < " ) " ) ;
if ( primary_prefix . size ( ) < sort_descr . size ( ) )
{
2012-07-21 06:47:17 +00:00
LOG_DEBUG ( log , " Primary key range for column " < < sort_descr [ primary_prefix . size ( ) ] . column_name < < " : " < < primary_range . toString ( ) ) ;
2012-07-21 03:45:48 +00:00
}
2012-07-21 05:07:14 +00:00
}
BlockInputStreams StorageMergeTree : : read (
const Names & column_names ,
ASTPtr query ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
/// Диапазон дат.
Range date_range ;
/// Префикс первичного ключа, для которого требуется равенство. Может быть пустым.
Row primary_prefix ;
/// Диапазон следующего после префикса столбца первичного ключа.
Range primary_range ;
getIndexRanges ( query , date_range , primary_prefix , primary_range ) ;
2012-07-21 03:45:48 +00:00
2012-11-28 08:52:15 +00:00
typedef std : : vector < DataPartRange > PartsRanges ;
PartsRanges parts ;
2012-07-21 06:47:17 +00:00
/// Выберем куски, в которых могут быть данные для date_range.
2012-08-10 20:04:34 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
if ( date_range . intersectsSegment ( static_cast < UInt64 > ( ( * it ) - > left_date ) , static_cast < UInt64 > ( ( * it ) - > right_date ) ) )
2012-11-28 08:52:15 +00:00
parts . push_back ( DataPartRange ( * it , 0 , 0 ) ) ;
2012-08-10 20:04:34 +00:00
}
2012-11-28 08:52:15 +00:00
/// Найдем, какой диапазон читать из каждого куска.
size_t sum_marks = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
DataPartRange & part = parts [ i ] ;
MergeTreeBlockInputStream : : markRangeFromPkRange ( full_path + part . data_part - > name + ' / ' ,
part . data_part - > size ,
* this ,
primary_prefix ,
primary_range ,
part . first_mark ,
part . last_mark ) ;
sum_marks + = part . last_mark - part . first_mark + 1 ;
}
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts, " < < sum_marks < < " marks to read " ) ;
/// В случайном порядке поровну поделим засечки между потоками.
size_t effective_threads = std : : min ( static_cast < size_t > ( threads ) , sum_marks ) ;
std : : random_shuffle ( parts . begin ( ) , parts . end ( ) ) ;
BlockInputStreams res ;
size_t cur_part = 0 ;
/// Сколько зесечек уже забрали из parts[cur_part].
size_t cur_pos = 0 ;
2012-11-29 08:41:20 +00:00
size_t marks_spread = 0 ;
for ( size_t i = 0 ; i < effective_threads & & marks_spread < sum_marks ; + + i )
2012-11-28 08:52:15 +00:00
{
2012-11-29 08:41:20 +00:00
size_t need_marks = sum_marks * ( i + 1 ) / effective_threads - marks_spread ;
2012-11-28 08:52:15 +00:00
BlockInputStreams streams ;
while ( need_marks > 0 )
{
if ( cur_part > = parts . size ( ) )
throw Exception ( " Can't spread marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
DataPartRange & part = parts [ cur_part ] ;
size_t marks_left_in_part = part . last_mark - part . first_mark + 1 - cur_pos ;
if ( marks_left_in_part = = 0 )
{
+ + cur_part ;
cur_pos = 0 ;
continue ;
}
size_t marks_to_get_from_part = std : : min ( marks_left_in_part , need_marks ) ;
2012-11-29 08:41:20 +00:00
/// Н е будем оставлять в куске слишком мало строк.
if ( ( marks_left_in_part - marks_to_get_from_part ) * index_granularity < settings . min_rows_for_concurrent_read )
marks_to_get_from_part = marks_left_in_part ;
2012-11-28 08:52:15 +00:00
streams . push_back ( new MergeTreeBlockInputStream ( full_path + part . data_part - > name + ' / ' ,
max_block_size , column_names , * this ,
part . data_part , part . first_mark + cur_pos ,
marks_to_get_from_part * index_granularity ) ) ;
2012-11-29 08:41:20 +00:00
marks_spread + = marks_to_get_from_part ;
if ( marks_to_get_from_part > need_marks )
need_marks = 0 ;
else
need_marks - = marks_to_get_from_part ;
2012-11-28 08:52:15 +00:00
cur_pos + = marks_to_get_from_part ;
}
if ( streams . size ( ) = = 1 )
res . push_back ( streams [ 0 ] ) ;
else
res . push_back ( new ConcatBlockInputStream ( streams ) ) ;
}
2012-11-29 08:41:20 +00:00
if ( marks_spread ! = sum_marks | | cur_part + 1 ! = parts . size ( ) | | cur_pos ! = parts . back ( ) . last_mark - parts . back ( ) . first_mark + 1 )
2012-11-28 08:52:15 +00:00
throw Exception ( " Can't spread marks among threads " , ErrorCodes : : LOGICAL_ERROR ) ;
2012-07-21 06:47:17 +00:00
return res ;
2012-07-21 03:45:48 +00:00
}
2012-07-19 20:32:10 +00:00
String StorageMergeTree : : getPartName ( Yandex : : DayNum_t left_date , Yandex : : DayNum_t right_date , UInt64 left_id , UInt64 right_id , UInt64 level )
2012-07-17 20:04:39 +00:00
{
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-19 20:32:10 +00:00
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
2012-07-17 20:04:39 +00:00
String res ;
{
2012-07-19 20:32:10 +00:00
unsigned left_date_id = Yandex : : Date2OrderedIdentifier ( date_lut . fromDayNum ( left_date ) ) ;
unsigned right_date_id = Yandex : : Date2OrderedIdentifier ( date_lut . fromDayNum ( right_date ) ) ;
2012-07-17 20:04:39 +00:00
WriteBufferFromString wb ( res ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( left_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
2012-07-19 20:32:10 +00:00
writeIntText ( right_date_id , wb ) ;
2012-07-17 20:04:39 +00:00
writeChar ( ' _ ' , wb ) ;
writeIntText ( left_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( right_id , wb ) ;
writeChar ( ' _ ' , wb ) ;
writeIntText ( level , wb ) ;
}
return res ;
}
2012-07-19 20:32:10 +00:00
void StorageMergeTree : : loadDataParts ( )
{
LOG_DEBUG ( log , " Loading data parts " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-19 20:32:10 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-08-10 20:04:34 +00:00
data_parts . clear ( ) ;
2012-07-19 20:32:10 +00:00
static Poco : : RegularExpression file_name_regexp ( " ^( \\ d{8})_( \\ d{8})_( \\ d+)_( \\ d+)_( \\ d+) " ) ;
Poco : : DirectoryIterator end ;
Poco : : RegularExpression : : MatchVec matches ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
std : : string file_name = it . name ( ) ;
if ( ! ( file_name_regexp . match ( file_name , 0 , matches ) & & 6 = = matches . size ( ) ) )
continue ;
2012-07-31 20:03:53 +00:00
DataPartPtr part = new DataPart ( * this ) ;
2012-07-23 06:23:29 +00:00
part - > left_date = date_lut . toDayNum ( Yandex : : OrderedIdentifier2Date ( file_name . substr ( matches [ 1 ] . offset , matches [ 1 ] . length ) ) ) ;
part - > right_date = date_lut . toDayNum ( Yandex : : OrderedIdentifier2Date ( file_name . substr ( matches [ 2 ] . offset , matches [ 2 ] . length ) ) ) ;
part - > left = Poco : : NumberParser : : parseUnsigned64 ( file_name . substr ( matches [ 3 ] . offset , matches [ 3 ] . length ) ) ;
part - > right = Poco : : NumberParser : : parseUnsigned64 ( file_name . substr ( matches [ 4 ] . offset , matches [ 4 ] . length ) ) ;
part - > level = Poco : : NumberParser : : parseUnsigned ( file_name . substr ( matches [ 5 ] . offset , matches [ 5 ] . length ) ) ;
part - > name = file_name ;
2012-07-19 20:32:10 +00:00
/// Размер - в количестве засечек.
2012-07-23 06:23:29 +00:00
part - > size = Poco : : File ( full_path + file_name + " / " + escapeForFileName ( columns - > front ( ) . first ) + " .mrk " ) . getSize ( )
2012-07-19 20:32:10 +00:00
/ MERGE_TREE_MARK_SIZE ;
2012-07-23 06:23:29 +00:00
part - > modification_time = it - > getLastModified ( ) . epochTime ( ) ;
2012-07-19 20:32:10 +00:00
2012-08-31 20:38:05 +00:00
part - > left_month = date_lut . toFirstDayNumOfMonth ( part - > left_date ) ;
part - > right_month = date_lut . toFirstDayNumOfMonth ( part - > right_date ) ;
2012-07-19 20:32:10 +00:00
2012-08-10 20:04:34 +00:00
data_parts . insert ( part ) ;
2012-07-19 20:32:10 +00:00
}
2012-08-10 20:04:34 +00:00
all_data_parts = data_parts ;
2012-07-31 20:03:53 +00:00
2012-07-31 20:13:14 +00:00
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* н о п о к а к и м - т о п р и ч и н а м о с т а л и с ь л е ж а т ь в ф а й л о в о й с и с т е м е .
* У д а л е н и е ф а й л о в б у д е т п р о и з в е д е н о п о т о м в м е т о д е clearOldParts .
*/
2012-08-10 20:04:34 +00:00
if ( data_parts . size ( ) > = 2 )
2012-07-31 20:13:14 +00:00
{
2012-08-10 20:04:34 +00:00
DataParts : : iterator prev_jt = data_parts . begin ( ) ;
2012-08-07 20:37:45 +00:00
DataParts : : iterator curr_jt = prev_jt ;
+ + curr_jt ;
2012-08-10 20:04:34 +00:00
while ( curr_jt ! = data_parts . end ( ) )
2012-07-31 20:13:14 +00:00
{
2012-08-07 20:37:45 +00:00
/// Куски данных за разные месяцы рассматривать не будем
if ( ( * curr_jt ) - > left_month ! = ( * curr_jt ) - > right_month
| | ( * curr_jt ) - > right_month ! = ( * prev_jt ) - > left_month
| | ( * prev_jt ) - > left_month ! = ( * prev_jt ) - > right_month )
{
+ + prev_jt ;
+ + curr_jt ;
continue ;
}
2012-07-31 20:13:14 +00:00
2012-08-07 20:37:45 +00:00
if ( ( * curr_jt ) - > contains ( * * prev_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * curr_jt ) - > name < < " contains " < < ( * prev_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( prev_jt ) ;
2012-08-07 20:37:45 +00:00
prev_jt = curr_jt ;
+ + curr_jt ;
}
else if ( ( * prev_jt ) - > contains ( * * curr_jt ) )
{
LOG_WARNING ( log , " Part " < < ( * prev_jt ) - > name < < " contains " < < ( * curr_jt ) - > name ) ;
2012-08-10 20:04:34 +00:00
data_parts . erase ( curr_jt + + ) ;
2012-08-07 20:37:45 +00:00
}
else
{
+ + prev_jt ;
+ + curr_jt ;
}
2012-07-31 20:13:14 +00:00
}
}
2012-07-31 20:03:53 +00:00
2012-08-10 20:04:34 +00:00
LOG_DEBUG ( log , " Loaded data parts ( " < < data_parts . size ( ) < < " items) " ) ;
2012-07-19 20:32:10 +00:00
}
2012-07-23 06:23:29 +00:00
void StorageMergeTree : : clearOldParts ( )
{
Poco : : ScopedTry < Poco : : FastMutex > lock ;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if ( ! lock . lock ( & all_data_parts_mutex ) )
{
LOG_TRACE ( log , " Already clearing or modifying old parts " ) ;
return ;
}
LOG_TRACE ( log , " Clearing old parts " ) ;
for ( DataParts : : iterator it = all_data_parts . begin ( ) ; it ! = all_data_parts . end ( ) ; )
{
2012-07-31 18:25:16 +00:00
int ref_count = it - > referenceCount ( ) ;
LOG_TRACE ( log , ( * it ) - > name < < " : ref_count = " < < ref_count ) ;
2012-08-10 20:04:34 +00:00
if ( ref_count = = 1 ) /// После этого ref_count не может увеличиться.
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Removing part " < < ( * it ) - > name ) ;
( * it ) - > remove ( ) ;
all_data_parts . erase ( it + + ) ;
}
else
+ + it ;
}
}
2012-09-10 19:05:06 +00:00
void StorageMergeTree : : merge ( size_t iterations , bool async )
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
bool while_can = false ;
if ( iterations = = 0 ) {
while_can = true ;
iterations = settings . merging_threads ;
2012-08-01 20:08:59 +00:00
}
2012-11-28 17:17:17 +00:00
for ( size_t i = 0 ; i < iterations ; + + i )
2012-11-28 08:52:15 +00:00
merge_threads - > schedule ( boost : : bind ( & StorageMergeTree : : mergeThread , this , while_can ) ) ;
if ( ! async )
joinMergeThreads ( ) ;
2012-08-13 19:13:11 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeThread ( bool while_can )
2012-08-13 19:13:11 +00:00
{
try
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
std : : vector < DataPartPtr > parts ;
while ( selectPartsToMerge ( parts ) )
2012-08-13 19:13:11 +00:00
{
2012-11-28 08:52:15 +00:00
mergeParts ( parts ) ;
2012-08-13 19:13:11 +00:00
/// Удаляем старые куски.
2012-11-28 08:52:15 +00:00
parts . clear ( ) ;
2012-08-13 19:13:11 +00:00
clearOldParts ( ) ;
2012-11-28 08:52:15 +00:00
if ( ! while_can )
break ;
2012-08-13 19:13:11 +00:00
}
}
catch ( const Exception & e )
{
LOG_ERROR ( log , " Code: " < < e . code ( ) < < " . " < < e . displayText ( ) < < std : : endl
< < std : : endl
< < " Stack trace: " < < std : : endl
< < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
LOG_ERROR ( log , " Poco::Exception: " < < e . code ( ) < < " . " < < e . displayText ( ) ) ;
}
catch ( const std : : exception & e )
{
LOG_ERROR ( log , " std::exception: " < < e . what ( ) ) ;
}
catch ( . . . )
{
LOG_ERROR ( log , " Unknown exception " ) ;
}
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : joinMergeThreads ( )
{
LOG_DEBUG ( log , " Waiting for merge thread to finish. " ) ;
merge_threads - > wait ( ) ;
}
2012-11-29 10:50:17 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
/// При этом ограничении стараемся выбрать куски поменьше размером, но побольше количеством:
/// Из подходящих отрезков выбираем отрезок с минимальным максимумом размера.
/// Из всех таких отрезков выбираем отрезок с минимальным минимумом размера.
/// Из всех таких отрезков выбираем отрезок с максимальной длиной.
2012-11-28 08:52:15 +00:00
bool StorageMergeTree : : selectPartsToMerge ( std : : vector < DataPartPtr > & parts )
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG ( log , " Selecting parts to merge " ) ;
2012-08-10 20:04:34 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
2012-07-23 06:23:29 +00:00
2012-11-29 11:32:29 +00:00
size_t min_max = - 1U ;
size_t min_min = - 1U ;
int max_len = 0 ;
2012-11-29 10:50:17 +00:00
DataParts : : iterator best_begin ;
bool found = false ;
for ( DataParts : : iterator it = data_parts . begin ( ) ; it ! = data_parts . end ( ) ; + + it )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
const DataPartPtr & part = * it ;
/// Кусок не занят, достаточно мал и в одном месяце.
if ( part - > currently_merging | |
part - > size * index_granularity > settings . max_rows_to_merge_parts | |
part - > left_month ! = part - > right_month )
continue ;
size_t cur_max = part - > size ;
size_t cur_min = part - > size ;
size_t cur_sum = part - > size ;
int cur_len = 1 ;
Yandex : : DayNum_t month = part - > left_month ;
UInt64 cur_id = part - > right ;
DataParts : : iterator jt = it ;
2012-11-29 11:32:29 +00:00
for ( + + jt ; jt ! = data_parts . end ( ) & & cur_len < static_cast < int > ( settings . max_parts_to_merge_at_once ) ; + + jt )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
/// Кусок не занят, достаточно мал, в одном правильном месяце, правее предыдущего.
if ( part - > currently_merging | |
part - > size * index_granularity > settings . max_rows_to_merge_parts | |
part - > left_month ! = part - > right_month | |
part - > left_month ! = month | |
part - > left < cur_id )
break ;
cur_max = std : : max ( cur_max , part - > size ) ;
cur_min = std : : min ( cur_min , part - > size ) ;
cur_sum + = part - > size ;
+ + cur_len ;
cur_id = part - > right ;
2012-11-29 11:29:11 +00:00
if ( found & & cur_max > min_max )
2012-11-29 10:50:17 +00:00
break ;
if ( cur_len > = 2 & &
static_cast < double > ( cur_max ) / ( cur_sum - cur_max ) < settings . max_size_ratio_to_merge_parts & &
( ! found | |
std : : make_pair ( std : : make_pair ( cur_max , cur_min ) , - cur_len ) <
std : : make_pair ( std : : make_pair ( min_max , min_min ) , - max_len ) ) )
{
found = true ;
min_max = cur_max ;
min_min = cur_min ;
max_len = cur_len ;
best_begin = it ;
}
2012-07-23 06:23:29 +00:00
}
}
2012-11-29 10:50:17 +00:00
if ( found )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . clear ( ) ;
DataParts : : iterator it = best_begin ;
for ( int i = 0 ; i < max_len ; + + i )
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts . push_back ( * it ) ;
+ + it ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
LOG_DEBUG ( log , " Selected " < < parts . size ( ) < < " parts from " < < parts [ 0 ] - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
else
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
LOG_DEBUG ( log , " No parts to merge " ) ;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
return found ;
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree : : mergeParts ( std : : vector < DataPartPtr > parts )
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
LOG_DEBUG ( log , " Merging " < < parts . size ( ) < < " parts: from " < < parts [ 0 ] - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Names all_column_names ;
for ( NamesAndTypesList : : const_iterator it = columns - > begin ( ) ; it ! = columns - > end ( ) ; + + it )
all_column_names . push_back ( it - > first ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Yandex : : DateLUTSingleton & date_lut = Yandex : : DateLUTSingleton : : instance ( ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
StorageMergeTree : : DataPartPtr new_data_part = new DataPart ( * this ) ;
2012-11-28 08:52:15 +00:00
new_data_part - > left_date = parts [ 0 ] - > left_date ;
new_data_part - > right_date = parts . back ( ) - > right_date ;
new_data_part - > left = parts [ 0 ] - > left ;
new_data_part - > right = parts . back ( ) - > right ;
new_data_part - > level = 0 ;
new_data_part - > size = 0 ;
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
new_data_part - > level = std : : max ( new_data_part - > level , parts [ i ] - > level ) ;
new_data_part - > size + = parts [ i ] - > size ;
}
+ + new_data_part - > level ;
2012-08-13 19:13:11 +00:00
new_data_part - > name = getPartName (
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-08-31 20:38:05 +00:00
new_data_part - > left_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > left_date ) ;
new_data_part - > right_month = date_lut . toFirstDayNumOfMonth ( new_data_part - > right_date ) ;
2012-08-13 19:13:11 +00:00
2012-11-29 10:50:17 +00:00
/** Читаем из всех кусков куска, сливаем и пишем в новый.
2012-08-13 19:13:11 +00:00
* П о п у т н о в ы ч и с л я е м в ы р а ж е н и е д л я с о р т и р о в к и .
2012-11-29 10:50:17 +00:00
* Т а к ж е , е с л и н е к о т о р ы х к у с к а х е с т ь н е в с е с т о л б ц ы , т о д о б а в л я е м с т о л б ц ы с з н а ч е н и я м и п о - у м о л ч а н и ю .
2012-08-13 19:13:11 +00:00
*/
BlockInputStreams src_streams ;
2012-07-30 20:32:36 +00:00
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
src_streams . push_back ( new ExpressionBlockInputStream ( new MergeTreeBlockInputStream (
full_path + parts [ i ] - > name + ' / ' , DEFAULT_BLOCK_SIZE , all_column_names , * this , parts [ i ] , 0 , std : : numeric_limits < size_t > : : max ( ) ) , primary_expr ) ) ;
}
2012-07-30 20:32:36 +00:00
2012-08-13 20:16:06 +00:00
BlockInputStreamPtr merged_stream = new AddingDefaultBlockInputStream (
2012-08-20 05:32:50 +00:00
( sign_column . empty ( )
2012-08-16 17:27:40 +00:00
? new MergingSortedBlockInputStream ( src_streams , sort_descr , DEFAULT_BLOCK_SIZE )
2012-08-20 05:32:50 +00:00
: new CollapsingSortedBlockInputStream ( src_streams , sort_descr , sign_column , DEFAULT_BLOCK_SIZE ) ) ,
2012-08-16 17:27:40 +00:00
columns ) ;
2012-08-13 20:16:06 +00:00
2012-08-13 19:13:11 +00:00
BlockOutputStreamPtr to = new MergedBlockOutputStream ( * this ,
2012-11-28 08:52:15 +00:00
new_data_part - > left_date , new_data_part - > right_date , new_data_part - > left , new_data_part - > right , new_data_part - > level ) ;
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
copyData ( * merged_stream , * to ) ;
2012-07-31 20:03:53 +00:00
2012-08-13 19:13:11 +00:00
new_data_part - > modification_time = time ( 0 ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
{
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
/// Добавляем новый кусок в набор.
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
if ( data_parts . end ( ) = = data_parts . find ( parts [ i ] ) )
throw Exception ( " Logical error: cannot find data part " + parts [ i ] - > name + " in list " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
data_parts . insert ( new_data_part ) ;
all_data_parts . insert ( new_data_part ) ;
2012-11-28 08:52:15 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
data_parts . erase ( data_parts . find ( parts [ i ] ) ) ;
}
2012-07-31 17:07:20 +00:00
}
2012-08-13 19:13:11 +00:00
2012-11-28 08:52:15 +00:00
LOG_TRACE ( log , " Merged " < < parts . size ( ) < < " parts: from " < < parts [ 0 ] - > name < < " to " < < parts . back ( ) - > name ) ;
2012-07-23 06:23:29 +00:00
}
2012-08-16 18:17:01 +00:00
void StorageMergeTree : : drop ( )
{
2012-11-28 08:52:15 +00:00
joinMergeThreads ( ) ;
2012-08-16 18:17:01 +00:00
Poco : : ScopedLock < Poco : : FastMutex > lock ( data_parts_mutex ) ;
Poco : : ScopedLock < Poco : : FastMutex > lock_all ( all_data_parts_mutex ) ;
data_parts . clear ( ) ;
all_data_parts . clear ( ) ;
Poco : : File ( full_path ) . remove ( true ) ;
}
2012-07-17 20:04:39 +00:00
}