2018-11-29 09:19:42 +00:00
# include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
2018-02-13 19:34:15 +00:00
# include <Storages/MergeTree/MergeTreeRangeReader.h>
2017-04-06 17:21:45 +00:00
# include <Storages/MergeTree/MergeTreeReader.h>
# include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
2017-12-15 20:48:46 +00:00
# include <Columns/FilterDescription.h>
2017-12-18 05:37:20 +00:00
# include <Columns/ColumnArray.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2018-11-14 11:26:44 +00:00
# include <Common/StackTrace.h>
2017-06-06 17:18:32 +00:00
# include <ext/range.h>
2018-04-06 13:58:06 +00:00
# include <DataTypes/DataTypeNothing.h>
2017-04-06 17:21:45 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ;
2017-08-01 13:04:48 +00:00
extern const int LOGICAL_ERROR ;
2017-04-06 17:21:45 +00:00
}
2018-11-29 09:19:42 +00:00
MergeTreeBaseSelectBlockInputStream : : MergeTreeBaseSelectBlockInputStream (
2018-10-17 03:13:00 +00:00
const MergeTreeData & storage ,
2018-04-11 14:31:54 +00:00
const PrewhereInfoPtr & prewhere_info ,
2018-06-06 17:16:05 +00:00
UInt64 max_block_size_rows ,
UInt64 preferred_block_size_bytes ,
UInt64 preferred_max_column_in_block_size_bytes ,
UInt64 min_bytes_to_use_direct_io ,
UInt64 max_read_buffer_size ,
2017-04-06 17:21:45 +00:00
bool use_uncompressed_cache ,
bool save_marks_in_cache ,
const Names & virt_column_names )
:
storage ( storage ) ,
2018-04-11 14:31:54 +00:00
prewhere_info ( prewhere_info ) ,
2017-04-06 17:21:45 +00:00
max_block_size_rows ( max_block_size_rows ) ,
preferred_block_size_bytes ( preferred_block_size_bytes ) ,
2017-06-30 16:28:27 +00:00
preferred_max_column_in_block_size_bytes ( preferred_max_column_in_block_size_bytes ) ,
2017-04-06 17:21:45 +00:00
min_bytes_to_use_direct_io ( min_bytes_to_use_direct_io ) ,
max_read_buffer_size ( max_read_buffer_size ) ,
use_uncompressed_cache ( use_uncompressed_cache ) ,
save_marks_in_cache ( save_marks_in_cache ) ,
2019-03-25 13:55:24 +00:00
virt_column_names ( virt_column_names )
2017-04-06 17:21:45 +00:00
{
}
2018-11-29 09:19:42 +00:00
Block MergeTreeBaseSelectBlockInputStream : : readImpl ( )
2017-04-06 17:21:45 +00:00
{
Block res ;
while ( ! res & & ! isCancelled ( ) )
{
if ( ! task & & ! getNewTask ( ) )
break ;
res = readFromPart ( ) ;
if ( res )
injectVirtualColumns ( res ) ;
2017-06-15 17:01:13 +00:00
if ( task - > isFinished ( ) )
2017-04-10 17:10:33 +00:00
task . reset ( ) ;
2017-04-10 14:06:44 +00:00
}
2017-04-06 17:21:45 +00:00
return res ;
}
2018-11-29 09:19:42 +00:00
Block MergeTreeBaseSelectBlockInputStream : : readFromPart ( )
2017-04-06 17:21:45 +00:00
{
2017-04-07 11:43:24 +00:00
if ( task - > size_predictor )
task - > size_predictor - > startBlock ( ) ;
2019-01-04 12:10:00 +00:00
const auto current_max_block_size_rows = max_block_size_rows ;
const auto current_preferred_block_size_bytes = preferred_block_size_bytes ;
const auto current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes ;
2019-03-25 13:55:24 +00:00
const auto avg_index_granularity = task - > data_part - > index_granularity . getAvgGranularity ( ) ;
2017-06-30 16:28:27 +00:00
const double min_filtration_ratio = 0.00001 ;
2017-06-21 17:19:35 +00:00
2019-01-04 12:10:00 +00:00
auto estimateNumRows = [ current_preferred_block_size_bytes , current_max_block_size_rows ,
2019-03-25 13:55:24 +00:00
avg_index_granularity , current_preferred_max_column_in_block_size_bytes , min_filtration_ratio ] (
2019-01-04 12:10:00 +00:00
MergeTreeReadTask & current_task , MergeTreeRangeReader & current_reader )
2017-06-21 17:19:35 +00:00
{
2019-01-04 12:10:00 +00:00
if ( ! current_task . size_predictor )
return current_max_block_size_rows ;
2017-06-30 16:28:27 +00:00
2017-07-24 15:06:32 +00:00
/// Calculates number of rows will be read using preferred_block_size_bytes.
2019-03-25 13:55:24 +00:00
/// Can't be less than avg_index_granularity.
2019-01-04 12:10:00 +00:00
UInt64 rows_to_read = current_task . size_predictor - > estimateNumRows ( current_preferred_block_size_bytes ) ;
2017-07-24 13:59:03 +00:00
if ( ! rows_to_read )
return rows_to_read ;
2019-03-25 13:55:24 +00:00
rows_to_read = std : : max < UInt64 > ( avg_index_granularity , rows_to_read ) ;
2017-07-21 17:45:51 +00:00
2019-01-04 12:10:00 +00:00
if ( current_preferred_max_column_in_block_size_bytes )
2017-07-21 17:45:51 +00:00
{
2017-07-24 13:59:03 +00:00
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
2018-06-06 17:16:05 +00:00
UInt64 rows_to_read_for_max_size_column
2019-01-04 12:10:00 +00:00
= current_task . size_predictor - > estimateNumRowsForMaxSizeColumn ( current_preferred_max_column_in_block_size_bytes ) ;
double filtration_ratio = std : : max ( min_filtration_ratio , 1.0 - current_task . size_predictor - > filtered_rows_ratio ) ;
2018-02-20 11:45:58 +00:00
auto rows_to_read_for_max_size_column_with_filtration
2018-06-06 17:16:05 +00:00
= static_cast < UInt64 > ( rows_to_read_for_max_size_column / filtration_ratio ) ;
2017-07-21 18:02:02 +00:00
2019-03-25 13:55:24 +00:00
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than avg_index_granularity.
2017-07-21 17:45:51 +00:00
rows_to_read = std : : min ( rows_to_read , rows_to_read_for_max_size_column_with_filtration ) ;
}
2017-07-11 14:16:00 +00:00
2019-01-04 12:10:00 +00:00
UInt64 unread_rows_in_current_granule = current_reader . numPendingRowsInCurrentGranule ( ) ;
2019-03-25 16:55:48 +00:00
std : : cerr < < " NUMPENDING: " < < unread_rows_in_current_granule < < std : : endl ;
std : : cerr < < " ROWSTOREAD: " < < rows_to_read < < std : : endl ;
2017-07-11 14:16:00 +00:00
if ( unread_rows_in_current_granule > = rows_to_read )
return rows_to_read ;
2019-03-25 13:55:24 +00:00
UInt64 granule_to_read = ( rows_to_read + current_reader . numReadRowsInCurrentGranule ( ) + avg_index_granularity / 2 ) / avg_index_granularity ;
2019-03-25 16:55:48 +00:00
std : : cerr < < " AVG GRANULARITY: " < < avg_index_granularity < < " ROWS TO READ: " < < rows_to_read < < " GRANULE TO READ: " < < granule_to_read < < " NUM READS in CURRENT GRANULE: " < < current_reader . numReadRowsInCurrentGranule ( ) < < std : : endl ;
2019-03-25 13:55:24 +00:00
return avg_index_granularity * granule_to_read - current_reader . numReadRowsInCurrentGranule ( ) ;
2017-06-21 17:19:35 +00:00
} ;
2017-06-20 10:12:20 +00:00
2018-11-15 14:06:54 +00:00
//if (reader == nullptr) {
// std::cerr << "=====STACK TRACE WITH NULL READER=====\n";
// std::cerr << StackTrace().toString() << std::endl;
//}
//if (pre_reader == nullptr) {
// std::cerr << "=====STACK TRACE WITH NULL PREREADER=====\n";
// std::cerr << StackTrace().toString() << std::endl;
//}
2018-11-14 11:26:44 +00:00
2018-02-20 11:45:58 +00:00
if ( ! task - > range_reader . isInitialized ( ) )
2017-04-06 17:21:45 +00:00
{
2018-04-11 14:31:54 +00:00
if ( prewhere_info )
2017-04-06 17:21:45 +00:00
{
2018-03-05 14:41:43 +00:00
if ( reader - > getColumns ( ) . empty ( ) )
{
2018-11-14 11:26:44 +00:00
std : : cerr < < " EMPTY COLUMNS \n " ;
2018-03-05 14:41:43 +00:00
task - > range_reader = MergeTreeRangeReader (
2018-11-15 14:06:54 +00:00
pre_reader . get ( ) , nullptr ,
2018-09-03 17:24:46 +00:00
prewhere_info - > alias_actions , prewhere_info - > prewhere_actions ,
2018-09-03 05:06:19 +00:00
& prewhere_info - > prewhere_column_name , & task - > ordered_names ,
task - > should_reorder , task - > remove_prewhere_column , true ) ;
2018-03-05 14:41:43 +00:00
}
else
{
2018-11-14 11:26:44 +00:00
std : : cerr < < " MORE INTERESTING COLUMNS \n " ;
MergeTreeRangeReader * pre_reader_ptr = nullptr ;
if ( pre_reader ! = nullptr )
{
std : : cerr < < " SETTING PREREADER \n " ;
std : : cerr < < " PreReader is NULL: " < < ( pre_reader = = nullptr ) < < std : : endl ;
task - > pre_range_reader = MergeTreeRangeReader (
2018-11-15 14:06:54 +00:00
pre_reader . get ( ) , nullptr ,
2018-11-14 11:26:44 +00:00
prewhere_info - > alias_actions , prewhere_info - > prewhere_actions ,
& prewhere_info - > prewhere_column_name , & task - > ordered_names ,
task - > should_reorder , task - > remove_prewhere_column , false ) ;
pre_reader_ptr = & task - > pre_range_reader ;
}
std : : cerr < < " Reader is NULL: " < < ( reader = = nullptr ) < < std : : endl ;
2018-03-05 14:41:43 +00:00
task - > range_reader = MergeTreeRangeReader (
2018-11-15 14:06:54 +00:00
reader . get ( ) , pre_reader_ptr , nullptr , nullptr ,
2018-09-03 05:06:19 +00:00
nullptr , & task - > ordered_names , true , false , true ) ;
2018-03-05 14:41:43 +00:00
}
2018-02-20 11:45:58 +00:00
}
else
{
task - > range_reader = MergeTreeRangeReader (
2018-11-15 14:06:54 +00:00
reader . get ( ) , nullptr , nullptr , nullptr ,
2018-09-03 05:06:19 +00:00
nullptr , & task - > ordered_names , task - > should_reorder , false , true ) ;
2018-02-20 11:45:58 +00:00
}
}
2017-04-06 17:21:45 +00:00
2018-06-06 17:16:05 +00:00
UInt64 recommended_rows = estimateNumRows ( * task , task - > range_reader ) ;
2019-01-04 12:10:00 +00:00
UInt64 rows_to_read = std : : max ( UInt64 ( 1 ) , std : : min ( current_max_block_size_rows , recommended_rows ) ) ;
2017-04-06 17:21:45 +00:00
2018-02-20 11:45:58 +00:00
auto read_result = task - > range_reader . read ( rows_to_read , task - > mark_ranges ) ;
2017-04-06 17:21:45 +00:00
2018-02-20 13:59:19 +00:00
/// All rows were filtered. Repeat.
if ( read_result . block . rows ( ) = = 0 )
read_result . block . clear ( ) ;
2018-06-06 17:16:05 +00:00
UInt64 num_filtered_rows = read_result . numReadRows ( ) - read_result . block . rows ( ) ;
2018-02-20 14:26:22 +00:00
2018-03-05 14:41:43 +00:00
progressImpl ( { read_result . numReadRows ( ) , read_result . numBytesRead ( ) } ) ;
2017-04-06 17:21:45 +00:00
2018-02-20 11:45:58 +00:00
if ( task - > size_predictor )
{
2018-03-05 14:41:43 +00:00
task - > size_predictor - > updateFilteredRowsRation ( read_result . numReadRows ( ) , num_filtered_rows ) ;
2017-08-01 13:04:48 +00:00
2018-02-20 11:45:58 +00:00
if ( read_result . block )
task - > size_predictor - > update ( read_result . block ) ;
}
2017-04-06 17:21:45 +00:00
2018-04-11 14:31:54 +00:00
if ( read_result . block & & prewhere_info & & ! task - > remove_prewhere_column )
2018-02-22 11:54:26 +00:00
{
2018-03-05 14:41:43 +00:00
/// Convert const column to full here because it's cheaper to filter const column than full.
2018-04-11 14:31:54 +00:00
auto & column = read_result . block . getByName ( prewhere_info - > prewhere_column_name ) ;
2018-03-05 14:41:43 +00:00
column . column = column . column - > convertToFullColumnIfConst ( ) ;
2017-04-06 17:21:45 +00:00
}
2017-04-10 17:10:33 +00:00
2018-02-22 12:43:57 +00:00
read_result . block . checkNumberOfRows ( ) ;
2017-04-06 17:21:45 +00:00
2018-02-20 11:45:58 +00:00
return read_result . block ;
2017-04-06 17:21:45 +00:00
}
2018-11-29 09:19:42 +00:00
void MergeTreeBaseSelectBlockInputStream : : injectVirtualColumns ( Block & block ) const
2017-04-06 17:21:45 +00:00
{
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if ( ! virt_column_names . empty ( ) )
{
2018-11-28 15:05:28 +00:00
const auto rows = block . rows ( ) ;
2017-04-06 17:21:45 +00:00
for ( const auto & virt_column_name : virt_column_names )
{
if ( virt_column_name = = " _part " )
{
2018-02-19 03:56:08 +00:00
ColumnPtr column ;
if ( rows )
column = DataTypeString ( ) . createColumnConst ( rows , task - > data_part - > name ) - > convertToFullColumnIfConst ( ) ;
else
column = DataTypeString ( ) . createColumn ( ) ;
block . insert ( { column , std : : make_shared < DataTypeString > ( ) , virt_column_name } ) ;
2017-04-06 17:21:45 +00:00
}
else if ( virt_column_name = = " _part_index " )
{
2018-02-19 03:56:08 +00:00
ColumnPtr column ;
if ( rows )
2018-10-22 08:54:54 +00:00
column = DataTypeUInt64 ( ) . createColumnConst ( rows , task - > part_index_in_query ) - > convertToFullColumnIfConst ( ) ;
2018-02-19 03:56:08 +00:00
else
column = DataTypeUInt64 ( ) . createColumn ( ) ;
block . insert ( { column , std : : make_shared < DataTypeUInt64 > ( ) , virt_column_name } ) ;
2017-04-06 17:21:45 +00:00
}
2018-09-10 09:53:13 +00:00
else if ( virt_column_name = = " _partition_id " )
{
ColumnPtr column ;
if ( rows )
column = DataTypeString ( ) . createColumnConst ( rows , task - > data_part - > info . partition_id ) - > convertToFullColumnIfConst ( ) ;
else
column = DataTypeString ( ) . createColumn ( ) ;
block . insert ( { column , std : : make_shared < DataTypeString > ( ) , virt_column_name } ) ;
}
2017-04-06 17:21:45 +00:00
}
}
}
2018-11-29 09:19:42 +00:00
void MergeTreeBaseSelectBlockInputStream : : executePrewhereActions ( Block & block , const PrewhereInfoPtr & prewhere_info )
2018-04-06 13:58:06 +00:00
{
2018-04-11 14:31:54 +00:00
if ( prewhere_info )
2018-04-06 13:58:06 +00:00
{
2018-09-03 17:24:46 +00:00
if ( prewhere_info - > alias_actions )
prewhere_info - > alias_actions - > execute ( block ) ;
2018-04-11 14:31:54 +00:00
prewhere_info - > prewhere_actions - > execute ( block ) ;
if ( prewhere_info - > remove_prewhere_column )
block . erase ( prewhere_info - > prewhere_column_name ) ;
2018-04-06 13:58:06 +00:00
2018-04-11 14:31:54 +00:00
if ( ! block )
2018-04-06 13:58:06 +00:00
block . insert ( { nullptr , std : : make_shared < DataTypeNothing > ( ) , " _nothing " } ) ;
}
}
2018-11-29 09:19:42 +00:00
MergeTreeBaseSelectBlockInputStream : : ~ MergeTreeBaseSelectBlockInputStream ( ) = default ;
2017-04-06 17:21:45 +00:00
}