2013-04-24 10:31:32 +00:00
# pragma once
2013-05-03 10:20:53 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2013-04-24 10:31:32 +00:00
# include <DB/Storages/StorageMergeTree.h>
2013-05-03 10:20:53 +00:00
# include <DB/Storages/MergeTree/PKCondition.h>
2013-04-24 10:31:32 +00:00
2013-11-26 11:55:11 +00:00
# include <DB/Storages/MergeTree/MergeTreeReader.h>
2013-09-08 05:53:10 +00:00
2013-04-24 10:31:32 +00:00
namespace DB
{
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
{
public :
/// Параметры storage_ и owned_storage разделены, чтобы можно было сделать поток, не владеющий своим storage
/// (например, поток, сливаящий куски). В таком случае сам storage должен следить, чтобы не удалить данные, пока их читают.
MergeTreeBlockInputStream ( const String & path_ , /// Путь к куску
2013-09-08 05:53:10 +00:00
size_t block_size_ , const Names & column_names_ ,
StorageMergeTree & storage_ , const StorageMergeTree : : DataPartPtr & owned_data_part_ ,
2013-11-28 13:16:46 +00:00
const MarkRanges & mark_ranges_ , StoragePtr owned_storage , bool use_uncompressed_cache_ ,
2013-12-02 12:15:39 +00:00
ExpressionActionsPtr prewhere_actions_ , String prewhere_column_ )
2013-09-08 05:53:10 +00:00
: IProfilingBlockInputStream ( owned_storage ) ,
path ( path_ ) , block_size ( block_size_ ) , column_names ( column_names_ ) ,
storage ( storage_ ) , owned_data_part ( owned_data_part_ ) ,
2013-11-26 11:55:11 +00:00
all_mark_ranges ( mark_ranges_ ) , remaining_mark_ranges ( mark_ranges_ ) ,
2013-12-02 12:15:39 +00:00
use_uncompressed_cache ( use_uncompressed_cache_ ) ,
prewhere_actions ( prewhere_actions_ ) , prewhere_column ( prewhere_column_ )
2013-04-24 10:31:32 +00:00
{
2013-11-26 11:55:11 +00:00
std : : reverse ( remaining_mark_ranges . begin ( ) , remaining_mark_ranges . end ( ) ) ;
2013-12-02 12:15:39 +00:00
if ( prewhere_actions ) {
pre_column_names = prewhere_actions - > getRequiredColumns ( ) ;
if ( pre_column_names . empty ( ) )
pre_column_names . push_back ( column_names [ 0 ] ) ;
NameSet pre_name_set ( pre_column_names . begin ( ) , pre_column_names . end ( ) ) ;
2013-12-05 13:07:55 +00:00
/// Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним
/// (от storage ожидают получить только столбцы таблицы).
remove_prewhere_column = ! pre_name_set . count ( prewhere_column ) ;
2013-12-02 12:15:39 +00:00
Names post_column_names ;
for ( size_t i = 0 ; i < column_names . size ( ) ; + + i )
{
if ( ! pre_name_set . count ( column_names [ i ] ) )
post_column_names . push_back ( column_names [ i ] ) ;
}
column_names = post_column_names ;
}
column_name_set . insert ( column_names . begin ( ) , column_names . end ( ) ) ;
2013-11-26 11:55:11 +00:00
LOG_TRACE ( storage . log , " Reading " < < all_mark_ranges . size ( ) < < " ranges from part " < < owned_data_part - > name
< < " , up to " < < ( all_mark_ranges . back ( ) . end - all_mark_ranges . front ( ) . begin ) * storage . index_granularity
< < " rows starting from " < < all_mark_ranges . front ( ) . begin * storage . index_granularity ) ;
2013-04-24 10:31:32 +00:00
}
String getName ( ) const { return " MergeTreeBlockInputStream " ; }
2013-05-03 10:20:53 +00:00
String getID ( ) const
{
std : : stringstream res ;
res < < " MergeTree( " < < owned_storage - > getTableName ( ) < < " , " < < path < < " , columns " ;
for ( size_t i = 0 ; i < column_names . size ( ) ; + + i )
res < < " , " < < column_names [ i ] ;
res < < " , marks " ;
2013-11-26 11:55:11 +00:00
for ( size_t i = 0 ; i < all_mark_ranges . size ( ) ; + + i )
res < < " , " < < all_mark_ranges [ i ] . begin < < " , " < < all_mark_ranges [ i ] . end ;
2013-05-03 10:20:53 +00:00
res < < " ) " ;
return res . str ( ) ;
}
2013-04-24 10:31:32 +00:00
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
2013-12-09 00:29:24 +00:00
static MarkRanges markRangesFromPkRange (
const StorageMergeTree : : DataPart : : Index & index ,
StorageMergeTree & storage ,
PKCondition & key_condition )
2013-04-24 10:31:32 +00:00
{
MarkRanges res ;
2013-12-09 00:29:24 +00:00
size_t key_size = storage . sort_descr . size ( ) ;
size_t marks_count = index . size ( ) / key_size ;
2013-04-24 10:31:32 +00:00
/// Если индекс не используется.
if ( key_condition . alwaysTrue ( ) )
{
res . push_back ( MarkRange ( 0 , marks_count ) ) ;
}
else
{
2013-12-09 00:29:24 +00:00
/** В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
* Н а к а ж д о м ш а г е б е р е м л е в ы й о т р е з о к и п р о в е р я е м , п о д х о д и т л и о н .
* Е с л и п о д х о д и т , р а з б и в а е м е г о н а б о л е е м е л к и е и к л а д е м и х в с т е к . Е с л и н е т - в ы б р а с ы в а е м е г о .
* Е с л и о т р е з о к у ж е д л и н о й в о д н у з а с е ч к у , д о б а в л я е м е г о в о т в е т и в ы б р а с ы в а е м .
*/
2013-04-24 10:31:32 +00:00
std : : vector < MarkRange > ranges_stack ;
ranges_stack . push_back ( MarkRange ( 0 , marks_count ) ) ;
while ( ! ranges_stack . empty ( ) )
{
MarkRange range = ranges_stack . back ( ) ;
ranges_stack . pop_back ( ) ;
bool may_be_true ;
if ( range . end = = marks_count )
2013-12-09 00:29:24 +00:00
may_be_true = key_condition . mayBeTrueAfter ( & index [ range . begin * key_size ] ) ;
2013-04-24 10:31:32 +00:00
else
2013-12-09 00:29:24 +00:00
may_be_true = key_condition . mayBeTrueInRange ( & index [ range . begin * key_size ] , & index [ range . end * key_size ] ) ;
2013-12-11 20:44:06 +00:00
2013-04-24 10:31:32 +00:00
if ( ! may_be_true )
continue ;
if ( range . end = = range . begin + 1 )
{
/// Увидели полезный промежуток между соседними засечками. Либо добавим е г о к последнему диапазону, либо начнем новый диапазон.
if ( res . empty ( ) | | range . begin - res . back ( ) . end > storage . min_marks_for_seek )
res . push_back ( range ) ;
else
res . back ( ) . end = range . end ;
}
else
{
/// Разбиваем отрезок и кладем результат в стек справа налево.
size_t step = ( range . end - range . begin - 1 ) / storage . settings . coarse_index_granularity + 1 ;
size_t end ;
2013-12-09 00:29:24 +00:00
2013-04-24 10:31:32 +00:00
for ( end = range . end ; end > range . begin + step ; end - = step )
ranges_stack . push_back ( MarkRange ( end - step , end ) ) ;
2013-12-09 00:29:24 +00:00
2013-04-24 10:31:32 +00:00
ranges_stack . push_back ( MarkRange ( range . begin , end ) ) ;
}
}
}
return res ;
}
protected :
Block readImpl ( )
{
Block res ;
2013-11-26 11:55:11 +00:00
if ( remaining_mark_ranges . empty ( ) )
return res ;
if ( ! reader )
2013-04-24 10:31:32 +00:00
{
2013-11-26 11:55:11 +00:00
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage . context . getUncompressedCache ( ) : NULL ;
reader = new MergeTreeReader ( path , column_names , uncompressed_cache , storage ) ;
2013-12-02 12:15:39 +00:00
if ( prewhere_actions )
pre_reader = new MergeTreeReader ( path , pre_column_names , uncompressed_cache , storage ) ;
2013-04-24 10:31:32 +00:00
}
2013-11-26 11:55:11 +00:00
2013-12-02 12:15:39 +00:00
if ( prewhere_actions )
2013-04-24 10:31:32 +00:00
{
2013-12-02 12:15:39 +00:00
do
{
/// Прочитаем полный блок столбцов, нужных для вычисления выражения в PREWHERE.
size_t space_left = std : : max ( 1LU , block_size / storage . index_granularity ) ;
MarkRanges ranges_to_read ;
while ( ! remaining_mark_ranges . empty ( ) & & space_left )
{
MarkRange & range = remaining_mark_ranges . back ( ) ;
size_t marks_to_read = std : : min ( range . end - range . begin , space_left ) ;
pre_reader - > readRange ( range . begin , range . begin + marks_to_read , res ) ;
ranges_to_read . push_back ( MarkRange ( range . begin , range . begin + marks_to_read ) ) ;
space_left - = marks_to_read ;
range . begin + = marks_to_read ;
if ( range . begin = = range . end )
remaining_mark_ranges . pop_back ( ) ;
}
pre_reader - > fillMissingColumns ( res ) ;
/// Вычислим выражение в PREWHERE.
prewhere_actions - > execute ( res ) ;
ColumnPtr column = res . getByName ( prewhere_column ) . column ;
2013-12-05 13:07:55 +00:00
if ( remove_prewhere_column )
res . erase ( prewhere_column ) ;
2013-12-02 12:15:39 +00:00
/** Если фильтр - константа (например, написано PREWHERE 1),
* т о л и б о в е р н ё м п у с т о й б л о к , л и б о в е р н ё м б л о к б е з и з м е н е н и й .
*/
if ( ColumnConstUInt8 * column_const = dynamic_cast < ColumnConstUInt8 * > ( & * column ) )
{
if ( ! column_const - > getData ( ) )
{
res . clear ( ) ;
return res ;
}
for ( size_t i = 0 ; i < ranges_to_read . size ( ) ; + + i ) {
const MarkRange & range = ranges_to_read [ i ] ;
reader - > readRange ( range . begin , range . end , res ) ;
}
}
else if ( ColumnUInt8 * column_vec = dynamic_cast < ColumnUInt8 * > ( & * column ) )
{
size_t index_granularity = storage . index_granularity ;
2013-06-25 12:19:10 +00:00
2013-12-02 12:15:39 +00:00
const IColumn : : Filter & pre_filter = column_vec - > getData ( ) ;
IColumn : : Filter post_filter ( pre_filter . size ( ) ) ;
2013-11-11 05:35:58 +00:00
2013-12-02 12:15:39 +00:00
/// Прочитаем в нужных отрезках остальные столбцы и составим для них свой фильтр.
size_t pre_filter_pos = 0 ;
size_t post_filter_pos = 0 ;
for ( size_t i = 0 ; i < ranges_to_read . size ( ) ; + + i ) {
const MarkRange & range = ranges_to_read [ i ] ;
size_t begin = range . begin ;
size_t pre_filter_begin_pos = pre_filter_pos ;
for ( size_t mark = range . begin ; mark < = range . end ; + + mark )
{
UInt8 nonzero = 0 ;
if ( mark ! = range . end )
{
2013-12-05 12:41:47 +00:00
size_t limit = std : : min ( pre_filter . size ( ) , pre_filter_pos + index_granularity ) ;
for ( size_t row = pre_filter_pos ; row < limit ; + + row )
nonzero | = pre_filter [ row ] ;
2013-12-02 12:15:39 +00:00
}
if ( ! nonzero )
{
if ( mark > begin )
{
memcpy (
& post_filter [ post_filter_pos ] ,
& pre_filter [ pre_filter_begin_pos ] ,
pre_filter_pos - pre_filter_begin_pos ) ;
post_filter_pos + = pre_filter_pos - pre_filter_begin_pos ;
reader - > readRange ( begin , mark , res ) ;
}
begin = mark + 1 ;
pre_filter_begin_pos = std : : min ( pre_filter_pos + index_granularity , pre_filter . size ( ) ) ;
}
pre_filter_pos = std : : min ( pre_filter_pos + index_granularity , pre_filter . size ( ) ) ;
}
}
if ( ! post_filter_pos )
{
res . clear ( ) ;
continue ;
}
post_filter . resize ( post_filter_pos ) ;
/// Отфильтруем столбцы, относящиеся к PREWHERE, используя pre_filter,
/// остальные столбцы - используя post_filter.
size_t rows = 0 ;
for ( size_t i = 0 ; i < res . columns ( ) ; + + i )
{
ColumnWithNameAndType & column = res . getByPosition ( i ) ;
2013-12-06 09:02:19 +00:00
if ( column . name = = prewhere_column & & res . columns ( ) > 1 )
2013-12-02 12:15:39 +00:00
continue ;
column . column = column . column - > filter ( column_name_set . count ( column . name ) ? post_filter : pre_filter ) ;
rows = column . column - > size ( ) ;
}
/// Заменим столбец с о значением условия из PREWHERE на константу.
2013-12-05 13:07:55 +00:00
if ( ! remove_prewhere_column )
res . getByName ( prewhere_column ) . column = new ColumnConstUInt8 ( rows , 1 ) ;
2013-12-02 12:15:39 +00:00
}
else
throw Exception ( " Illegal type " + column - > getName ( ) + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8. " , ErrorCodes : : ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ) ;
reader - > fillMissingColumns ( res ) ;
}
2013-12-05 12:41:47 +00:00
while ( ! remaining_mark_ranges . empty ( ) & & ! res & & ! isCancelled ( ) ) ;
2013-04-24 10:31:32 +00:00
}
2013-12-02 12:15:39 +00:00
else
{
size_t space_left = std : : max ( 1LU , block_size / storage . index_granularity ) ;
while ( ! remaining_mark_ranges . empty ( ) & & space_left )
{
MarkRange & range = remaining_mark_ranges . back ( ) ;
size_t marks_to_read = std : : min ( range . end - range . begin , space_left ) ;
reader - > readRange ( range . begin , range . begin + marks_to_read , res ) ;
2013-11-11 05:35:58 +00:00
2013-12-02 12:15:39 +00:00
space_left - = marks_to_read ;
range . begin + = marks_to_read ;
if ( range . begin = = range . end )
remaining_mark_ranges . pop_back ( ) ;
}
reader - > fillMissingColumns ( res ) ;
}
2013-11-26 11:55:11 +00:00
if ( remaining_mark_ranges . empty ( ) )
2013-04-24 10:31:32 +00:00
{
/** Закрываем файлы (ещё до уничтожения объекта).
* Ч т о б ы п р и с о з д а н и и м н о г и х и с т о ч н и к о в , н о о д н о в р е м е н н о м ч т е н и и т о л ь к о и з н е с к о л ь к и х ,
* б у ф е р ы н е в и с е л и в п а м я т и .
*/
2013-11-26 11:55:11 +00:00
reader = NULL ;
2013-04-24 10:31:32 +00:00
}
2013-11-26 11:55:11 +00:00
2013-04-24 10:31:32 +00:00
return res ;
}
private :
const String path ;
size_t block_size ;
Names column_names ;
2013-12-02 12:15:39 +00:00
NameSet column_name_set ;
Names pre_column_names ;
2013-04-24 10:31:32 +00:00
StorageMergeTree & storage ;
const StorageMergeTree : : DataPartPtr owned_data_part ; /// Кусок не будет удалён, пока им владеет этот объект.
2013-11-26 11:55:11 +00:00
MarkRanges all_mark_ranges ; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
MarkRanges remaining_mark_ranges ; /// В каких диапазонах засечек еще не прочли.
/// В порядке убывания номеров, чтобы можно было выбрасывать из конца.
2013-09-08 05:53:10 +00:00
bool use_uncompressed_cache ;
2013-11-26 11:55:11 +00:00
Poco : : SharedPtr < MergeTreeReader > reader ;
2013-12-02 12:15:39 +00:00
Poco : : SharedPtr < MergeTreeReader > pre_reader ;
2013-11-28 13:16:46 +00:00
ExpressionActionsPtr prewhere_actions ;
String prewhere_column ;
2013-12-05 13:07:55 +00:00
bool remove_prewhere_column ;
2013-04-24 10:31:32 +00:00
} ;
}