2017-03-12 19:18:07 +00:00
# include <boost/rational.hpp> /// For calculations related to sampling coefficients.
2017-11-20 04:15:43 +00:00
# include <optional>
2015-11-19 21:34:53 +00:00
2019-01-07 12:51:14 +00:00
# include <Poco/File.h>
2017-11-24 13:55:31 +00:00
# include <Common/FieldVisitors.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
2019-10-01 16:50:08 +00:00
# include <Storages/MergeTree/MergeTreeSelectProcessor.h>
# include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergeTreeReadPool.h>
2019-10-01 16:50:08 +00:00
# include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
2019-01-17 12:11:36 +00:00
# include <Storages/MergeTree/MergeTreeIndices.h>
2019-01-07 12:51:14 +00:00
# include <Storages/MergeTree/MergeTreeIndexReader.h>
2018-04-20 00:20:36 +00:00
# include <Storages/MergeTree/KeyCondition.h>
2019-12-10 23:18:24 +00:00
# include <Storages/ReadInOrderOptimizer.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTIdentifier.h>
2018-10-29 19:04:28 +00:00
# include <Parsers/ASTLiteral.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTSampleRatio.h>
2019-08-14 14:06:16 +00:00
# include <Interpreters/ExpressionAnalyzer.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2017-12-01 20:38:50 +00:00
/// Allow to use __uint128_t as a template parameter for boost::rational.
2017-12-04 16:12:56 +00:00
// https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc
2017-12-25 17:16:29 +00:00
# if !defined(__GLIBCXX_BITSIZE_INT_N_0) && defined(__SIZEOF_INT128__)
2017-12-01 20:38:50 +00:00
namespace std
{
template < >
struct numeric_limits < __uint128_t >
{
static constexpr bool is_specialized = true ;
static constexpr bool is_signed = false ;
static constexpr bool is_integer = true ;
static constexpr int radix = 2 ;
2017-12-09 17:16:24 +00:00
static constexpr int digits = 128 ;
2017-12-01 20:38:50 +00:00
static constexpr __uint128_t min ( ) { return 0 ; } // used in boost 1.65.1+
2018-07-16 13:44:24 +00:00
static constexpr __uint128_t max ( ) { return __uint128_t ( 0 ) - 1 ; } // used in boost 1.68.0+
2017-12-01 20:38:50 +00:00
} ;
}
2017-12-04 13:03:40 +00:00
# endif
2017-12-01 20:38:50 +00:00
2017-04-01 09:19:00 +00:00
# include <DataStreams/CollapsingFinalBlockInputStream.h>
# include <DataStreams/CreatingSetsBlockInputStream.h>
2019-05-25 11:09:23 +00:00
# include <DataStreams/ReverseBlockInputStream.h>
2017-04-01 09:19:00 +00:00
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeEnum.h>
2020-04-22 13:52:07 +00:00
# include <DataTypes/DataTypesNumber.h>
# include <Processors/ConcatProcessor.h>
# include <Processors/Executors/TreeExecutorBlockInputStream.h>
2020-04-14 10:04:49 +00:00
# include <Processors/Merges/AggregatingSortedTransform.h>
2020-04-22 13:52:07 +00:00
# include <Processors/Merges/CollapsingSortedTransform.h>
# include <Processors/Merges/MergingSortedTransform.h>
2020-04-14 10:04:49 +00:00
# include <Processors/Merges/ReplacingSortedTransform.h>
2020-04-22 13:52:07 +00:00
# include <Processors/Merges/SummingSortedTransform.h>
2020-04-14 10:04:49 +00:00
# include <Processors/Merges/VersionedCollapsingTransform.h>
2019-10-01 16:50:08 +00:00
# include <Processors/Sources/SourceFromInputStream.h>
2020-04-22 13:52:07 +00:00
# include <Processors/Transforms/AddingConstColumnTransform.h>
# include <Processors/Transforms/AddingSelectorTransform.h>
# include <Processors/Transforms/CopyTransform.h>
# include <Processors/Transforms/ExpressionTransform.h>
# include <Processors/Transforms/FilterTransform.h>
# include <Processors/Transforms/ReverseTransform.h>
# include <Storages/VirtualColumnUtils.h>
2015-02-03 14:37:35 +00:00
2016-10-24 02:02:37 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event SelectedParts ;
extern const Event SelectedRanges ;
extern const Event SelectedMarks ;
2016-10-24 02:02:37 +00:00
}
2014-03-13 12:48:07 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR ;
2017-04-01 07:20:54 +00:00
extern const int INDEX_NOT_USED ;
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ;
extern const int ILLEGAL_COLUMN ;
2017-12-20 08:14:33 +00:00
extern const int ARGUMENT_OUT_OF_BOUND ;
2016-01-11 21:46:36 +00:00
}
2018-10-17 03:13:00 +00:00
MergeTreeDataSelectExecutor : : MergeTreeDataSelectExecutor ( const MergeTreeData & data_ )
2020-05-30 21:57:37 +00:00
: data ( data_ ) , log ( & Poco : : Logger : : get ( data . getLogName ( ) + " (SelectExecutor) " ) )
2014-03-13 12:48:07 +00:00
{
}
2015-11-18 21:37:28 +00:00
2017-03-13 18:02:45 +00:00
/// Construct a block consisting only of possible values of virtual columns
2016-03-05 03:17:11 +00:00
static Block getBlockWithPartColumn ( const MergeTreeData : : DataPartsVector & parts )
2014-07-28 10:36:11 +00:00
{
2017-12-15 21:11:24 +00:00
auto column = ColumnString : : create ( ) ;
2014-07-28 10:36:11 +00:00
2017-04-01 07:20:54 +00:00
for ( const auto & part : parts )
2017-12-15 21:11:24 +00:00
column - > insert ( part - > name ) ;
2014-07-28 10:36:11 +00:00
2017-12-15 21:11:24 +00:00
return Block { ColumnWithTypeAndName ( std : : move ( column ) , std : : make_shared < DataTypeString > ( ) , " _part " ) } ;
2014-07-28 10:36:11 +00:00
}
2015-11-18 21:37:28 +00:00
size_t MergeTreeDataSelectExecutor : : getApproximateTotalRowsToRead (
2018-04-20 00:20:36 +00:00
const MergeTreeData : : DataPartsVector & parts , const KeyCondition & key_condition , const Settings & settings ) const
2015-11-18 21:37:28 +00:00
{
2019-03-25 13:55:24 +00:00
size_t rows_count = 0 ;
2017-04-01 07:20:54 +00:00
/// We will find out how many rows we would have read without sampling.
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Preliminary index scan with condition: {} " , key_condition . toString ( ) ) ;
2017-04-01 07:20:54 +00:00
2020-03-09 01:59:08 +00:00
for ( const auto & part : parts )
2017-04-01 07:20:54 +00:00
{
2019-03-25 13:55:24 +00:00
MarkRanges ranges = markRangesFromPKRange ( part , key_condition , settings ) ;
2017-04-01 07:20:54 +00:00
/** In order to get a lower bound on the number of rows that match the condition on PK,
* consider only guaranteed full marks .
* That is , do not take into account the first and last marks , which may be incomplete .
*/
2020-03-09 01:59:08 +00:00
for ( const auto & range : ranges )
if ( range . end - range . begin > 2 )
rows_count + = part - > index_granularity . getRowsCountInRange ( { range . begin + 1 , range . end - 1 } ) ;
2019-03-25 13:55:24 +00:00
2017-04-01 07:20:54 +00:00
}
2019-03-25 13:55:24 +00:00
return rows_count ;
2015-11-18 21:37:28 +00:00
}
2015-11-19 21:34:53 +00:00
using RelativeSize = boost : : rational < ASTSampleRatio : : BigNum > ;
2019-12-15 06:34:43 +00:00
static std : : string toString ( const RelativeSize & x )
2015-11-19 21:34:53 +00:00
{
2017-07-28 20:41:51 +00:00
return ASTSampleRatio : : toString ( x . numerator ( ) ) + " / " + ASTSampleRatio : : toString ( x . denominator ( ) ) ;
2015-11-19 21:34:53 +00:00
}
2015-11-18 21:37:28 +00:00
2017-03-12 19:18:07 +00:00
/// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`).
2015-11-18 21:37:28 +00:00
static RelativeSize convertAbsoluteSampleSizeToRelative ( const ASTPtr & node , size_t approx_total_rows )
{
2017-04-01 07:20:54 +00:00
if ( approx_total_rows = = 0 )
return 1 ;
2015-11-18 21:37:28 +00:00
2019-03-15 16:14:13 +00:00
const auto & node_sample = node - > as < ASTSampleRatio & > ( ) ;
2015-11-19 21:34:53 +00:00
2017-04-01 07:20:54 +00:00
auto absolute_sample_size = node_sample . ratio . numerator / node_sample . ratio . denominator ;
2017-07-28 20:41:51 +00:00
return std : : min ( RelativeSize ( 1 ) , RelativeSize ( absolute_sample_size ) / RelativeSize ( approx_total_rows ) ) ;
2015-11-18 21:37:28 +00:00
}
2019-09-13 15:41:09 +00:00
Pipes MergeTreeDataSelectExecutor : : read (
2017-04-01 07:20:54 +00:00
const Names & column_names_to_return ,
2017-07-15 03:48:36 +00:00
const SelectQueryInfo & query_info ,
2017-04-01 07:20:54 +00:00
const Context & context ,
2019-02-10 16:55:12 +00:00
const UInt64 max_block_size ,
2017-06-02 15:54:39 +00:00
const unsigned num_streams ,
2018-10-10 16:20:15 +00:00
const PartitionIdToMaxBlock * max_block_numbers_to_read ) const
2014-03-13 12:48:07 +00:00
{
2018-07-18 12:17:48 +00:00
return readFromParts (
2018-08-05 07:05:36 +00:00
data . getDataPartsVector ( ) , column_names_to_return , query_info , context ,
2018-10-10 16:20:15 +00:00
max_block_size , num_streams , max_block_numbers_to_read ) ;
2018-07-18 12:17:48 +00:00
}
2017-04-01 07:20:54 +00:00
2019-09-13 15:41:09 +00:00
Pipes MergeTreeDataSelectExecutor : : readFromParts (
2018-07-18 12:17:48 +00:00
MergeTreeData : : DataPartsVector parts ,
const Names & column_names_to_return ,
const SelectQueryInfo & query_info ,
const Context & context ,
2019-02-10 16:55:12 +00:00
const UInt64 max_block_size ,
2018-07-18 12:17:48 +00:00
const unsigned num_streams ,
2018-10-10 16:20:15 +00:00
const PartitionIdToMaxBlock * max_block_numbers_to_read ) const
2018-07-18 12:17:48 +00:00
{
size_t part_index = 0 ;
2017-04-01 07:20:54 +00:00
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
2017-08-29 14:08:09 +00:00
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
2017-04-01 07:20:54 +00:00
Names virt_column_names ;
Names real_column_names ;
bool part_column_queried = false ;
bool sample_factor_column_queried = false ;
Float64 used_sample_factor = 1 ;
for ( const String & name : column_names_to_return )
{
if ( name = = " _part " )
{
part_column_queried = true ;
virt_column_names . push_back ( name ) ;
}
else if ( name = = " _part_index " )
{
virt_column_names . push_back ( name ) ;
}
2018-09-10 09:53:13 +00:00
else if ( name = = " _partition_id " )
{
virt_column_names . push_back ( name ) ;
}
2017-04-01 07:20:54 +00:00
else if ( name = = " _sample_factor " )
{
sample_factor_column_queried = true ;
virt_column_names . push_back ( name ) ;
}
else
{
real_column_names . push_back ( name ) ;
}
}
2018-03-13 15:00:28 +00:00
NamesAndTypesList available_real_columns = data . getColumns ( ) . getAllPhysical ( ) ;
2017-04-01 07:20:54 +00:00
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if ( real_column_names . empty ( ) )
real_column_names . push_back ( ExpressionActions : : getSmallestColumn ( available_real_columns ) ) ;
/// If `_part` virtual column is requested, we try to use it as an index.
Block virtual_columns_block = getBlockWithPartColumn ( parts ) ;
if ( part_column_queried )
2017-07-15 03:48:36 +00:00
VirtualColumnUtils : : filterBlockWithQuery ( query_info . query , virtual_columns_block , context ) ;
2017-04-01 07:20:54 +00:00
std : : multiset < String > part_values = VirtualColumnUtils : : extractSingleValueFromBlock < String > ( virtual_columns_block , " _part " ) ;
data . check ( real_column_names ) ;
PKCondition: infer index use with pk subexpression
By default only constraints explicitly matching
primary key expression (or expression wrapped in
a monotonic function) are eligible for part and
range selection. So for example, if index is:
(toStartOfHour(dt), UserID)
Then a query such as this resorts to full scan:
SELECT count() FROM t WHERE dt = now()
Intuitively, only parts with toStartOfHour(now())
could be selected, but it is less trivial to prove.
The primary key currently can be wrapped in a chain
of monotonic functions, so following would work:
toStartOfHour(dt) = toStartOfHour(now()) AND dt = now()
It must be however explicitly stated, if we wanted
to infer that we’d have to know the inverse function,
and prove that the inverse function is monotonic
on given interval. This is not practical as
there is no inverse function that for example undos
rounding, it isn’t strictly monotonic.
There are however functions that don’t transform
output range and preserve monotonicity on the
complete input range, such as rounding or casts
to a same or wider numeric type. This eliminates
the need to find inverse function, as no check for monotonicity over arbitrary interval is needed,
and thus makes this optimisation possible.
2017-07-06 05:39:05 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2020-05-20 18:11:38 +00:00
const auto & primary_key = data . getPrimaryKey ( ) ;
2020-05-21 19:46:03 +00:00
Names primary_key_columns = primary_key . column_names ;
2017-04-01 07:20:54 +00:00
2020-05-21 19:46:03 +00:00
KeyCondition key_condition ( query_info , context , primary_key_columns , primary_key . expression ) ;
2017-05-24 21:06:29 +00:00
2017-04-01 07:20:54 +00:00
if ( settings . force_primary_key & & key_condition . alwaysUnknownOrTrue ( ) )
{
std : : stringstream exception_message ;
exception_message < < " Primary key ( " ;
2018-10-11 14:53:23 +00:00
for ( size_t i = 0 , size = primary_key_columns . size ( ) ; i < size ; + + i )
exception_message < < ( i = = 0 ? " " : " , " ) < < primary_key_columns [ i ] ;
2017-04-01 07:20:54 +00:00
exception_message < < " ) is not used and setting 'force_primary_key' is set. " ;
throw Exception ( exception_message . str ( ) , ErrorCodes : : INDEX_NOT_USED ) ;
}
2018-04-20 00:20:36 +00:00
std : : optional < KeyCondition > minmax_idx_condition ;
2017-09-01 20:33:17 +00:00
if ( data . minmax_idx_expr )
{
2018-11-08 17:28:52 +00:00
minmax_idx_condition . emplace ( query_info , context , data . minmax_idx_columns , data . minmax_idx_expr ) ;
2017-08-21 15:35:29 +00:00
2017-09-01 20:33:17 +00:00
if ( settings . force_index_by_date & & minmax_idx_condition - > alwaysUnknownOrTrue ( ) )
2017-09-08 13:17:38 +00:00
{
String msg = " MinMax index by columns ( " ;
bool first = true ;
for ( const String & col : data . minmax_idx_columns )
{
if ( first )
first = false ;
else
msg + = " , " ;
msg + = col ;
}
msg + = " ) is not used and setting 'force_index_by_date' is set " ;
throw Exception ( msg , ErrorCodes : : INDEX_NOT_USED ) ;
}
2017-09-01 20:33:17 +00:00
}
2017-04-01 07:20:54 +00:00
2017-08-21 15:35:29 +00:00
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
2017-04-01 07:20:54 +00:00
/// as well as `max_block_number_to_read`.
{
auto prev_parts = parts ;
parts . clear ( ) ;
for ( const auto & part : prev_parts )
{
if ( part_values . find ( part - > name ) = = part_values . end ( ) )
continue ;
2018-05-23 19:34:37 +00:00
if ( part - > isEmpty ( ) )
continue ;
2020-03-10 14:56:55 +00:00
if ( minmax_idx_condition & & ! minmax_idx_condition - > checkInHyperrectangle (
part - > minmax_idx . hyperrectangle , data . minmax_idx_column_types ) . can_be_true )
2017-04-01 07:20:54 +00:00
continue ;
2018-10-10 16:20:15 +00:00
if ( max_block_numbers_to_read )
2018-09-24 09:53:28 +00:00
{
2018-10-10 16:20:15 +00:00
auto blocks_iterator = max_block_numbers_to_read - > find ( part - > info . partition_id ) ;
if ( blocks_iterator = = max_block_numbers_to_read - > end ( ) | | part - > info . max_block > blocks_iterator - > second )
2018-09-24 09:53:28 +00:00
continue ;
}
2017-04-01 07:20:54 +00:00
parts . push_back ( part ) ;
}
}
/// Sampling.
Names column_names_to_read = real_column_names ;
std : : shared_ptr < ASTFunction > filter_function ;
ExpressionActionsPtr filter_expression ;
RelativeSize relative_sample_size = 0 ;
RelativeSize relative_sample_offset = 0 ;
2019-03-15 16:14:13 +00:00
const auto & select = query_info . query - > as < ASTSelectQuery & > ( ) ;
2017-04-01 07:20:54 +00:00
2020-03-23 02:12:31 +00:00
auto select_sample_size = select . sampleSize ( ) ;
auto select_sample_offset = select . sampleOffset ( ) ;
2017-04-01 07:20:54 +00:00
if ( select_sample_size )
{
relative_sample_size . assign (
2019-03-15 16:14:13 +00:00
select_sample_size - > as < ASTSampleRatio & > ( ) . ratio . numerator ,
select_sample_size - > as < ASTSampleRatio & > ( ) . ratio . denominator ) ;
2017-04-01 07:20:54 +00:00
if ( relative_sample_size < 0 )
throw Exception ( " Negative sample size " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
relative_sample_offset = 0 ;
if ( select_sample_offset )
relative_sample_offset . assign (
2019-03-15 16:14:13 +00:00
select_sample_offset - > as < ASTSampleRatio & > ( ) . ratio . numerator ,
select_sample_offset - > as < ASTSampleRatio & > ( ) . ratio . denominator ) ;
2017-04-01 07:20:54 +00:00
if ( relative_sample_offset < 0 )
throw Exception ( " Negative sample offset " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
/// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read).
size_t approx_total_rows = 0 ;
if ( relative_sample_size > 1 | | relative_sample_offset > 1 )
approx_total_rows = getApproximateTotalRowsToRead ( parts , key_condition , settings ) ;
if ( relative_sample_size > 1 )
{
relative_sample_size = convertAbsoluteSampleSizeToRelative ( select_sample_size , approx_total_rows ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected relative sample size: {} " , toString ( relative_sample_size ) ) ;
2017-04-01 07:20:54 +00:00
}
/// SAMPLE 1 is the same as the absence of SAMPLE.
2017-07-28 20:41:51 +00:00
if ( relative_sample_size = = RelativeSize ( 1 ) )
2017-04-01 07:20:54 +00:00
relative_sample_size = 0 ;
2018-08-10 04:02:56 +00:00
if ( relative_sample_offset > 0 & & RelativeSize ( 0 ) = = relative_sample_size )
2017-04-01 07:20:54 +00:00
throw Exception ( " Sampling offset is incorrect because no sampling " , ErrorCodes : : ARGUMENT_OUT_OF_BOUND ) ;
if ( relative_sample_offset > 1 )
{
relative_sample_offset = convertAbsoluteSampleSizeToRelative ( select_sample_offset , approx_total_rows ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected relative sample offset: {} " , toString ( relative_sample_offset ) ) ;
2017-04-01 07:20:54 +00:00
}
}
/** Which range of sampling key values do I need to read?
* First , in the whole range ( " universe " ) we select the interval
* of relative ` relative_sample_size ` size , offset from the beginning by ` relative_sample_offset ` .
*
* Example : SAMPLE 0.4 OFFSET 0.3
*
* [ - - - - - - * * * * * * * * - - - - - - ]
* ^ - offset
* < - - - - - - > - size
*
* If the interval passes through the end of the universe , then cut its right side .
*
* Example : SAMPLE 0.4 OFFSET 0.8
*
* [ - - - - - - - - - - - - - - - - * * * * ]
* ^ - offset
* < - - - - - - > - size
*
* Next , if the ` parallel_replicas_count ` , ` parallel_replica_offset ` settings are set ,
* then it is necessary to break the received interval into pieces of the number ` parallel_replicas_count ` ,
* and select a piece with the number ` parallel_replica_offset ` ( from zero ) .
*
* Example : SAMPLE 0.4 OFFSET 0.3 , parallel_replicas_count = 2 , parallel_replica_offset = 1
*
* [ - - - - - - - - - - * * * * - - - - - - ]
* ^ - offset
* < - - - - - - > - size
* < - - > < - - > - pieces for different ` parallel_replica_offset ` , select the second one .
*
* It is very important that the intervals for different ` parallel_replica_offset ` cover the entire range without gaps and overlaps .
* It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0 , . . . OFFSET 0.9 and similar decimals .
*/
2019-03-09 14:30:55 +00:00
bool use_sampling = relative_sample_size > 0 | | ( settings . parallel_replicas_count > 1 & & data . supportsSampling ( ) ) ;
2017-04-01 07:20:54 +00:00
bool no_data = false ; /// There is nothing left after sampling.
if ( use_sampling )
{
2018-08-10 04:02:56 +00:00
if ( sample_factor_column_queried & & relative_sample_size ! = RelativeSize ( 0 ) )
2017-04-01 07:20:54 +00:00
used_sample_factor = 1.0 / boost : : rational_cast < Float64 > ( relative_sample_size ) ;
RelativeSize size_of_universum = 0 ;
2020-05-20 15:16:39 +00:00
const auto & sampling_key = data . getSamplingKey ( ) ;
DataTypePtr sampling_column_type = sampling_key . data_types [ 0 ] ;
2017-04-01 07:20:54 +00:00
2019-11-25 13:35:28 +00:00
if ( typeid_cast < const DataTypeUInt64 * > ( sampling_column_type . get ( ) ) )
2017-07-28 20:41:51 +00:00
size_of_universum = RelativeSize ( std : : numeric_limits < UInt64 > : : max ( ) ) + RelativeSize ( 1 ) ;
2019-11-25 13:35:28 +00:00
else if ( typeid_cast < const DataTypeUInt32 * > ( sampling_column_type . get ( ) ) )
2017-07-28 20:41:51 +00:00
size_of_universum = RelativeSize ( std : : numeric_limits < UInt32 > : : max ( ) ) + RelativeSize ( 1 ) ;
2019-11-25 13:35:28 +00:00
else if ( typeid_cast < const DataTypeUInt16 * > ( sampling_column_type . get ( ) ) )
2017-07-28 20:41:51 +00:00
size_of_universum = RelativeSize ( std : : numeric_limits < UInt16 > : : max ( ) ) + RelativeSize ( 1 ) ;
2019-11-25 13:35:28 +00:00
else if ( typeid_cast < const DataTypeUInt8 * > ( sampling_column_type . get ( ) ) )
2017-07-28 20:41:51 +00:00
size_of_universum = RelativeSize ( std : : numeric_limits < UInt8 > : : max ( ) ) + RelativeSize ( 1 ) ;
2017-04-01 07:20:54 +00:00
else
2019-11-25 13:35:28 +00:00
throw Exception ( " Invalid sampling column type in storage parameters: " + sampling_column_type - > getName ( ) + " . Must be unsigned integer type. " ,
2017-04-01 07:20:54 +00:00
ErrorCodes : : ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ) ;
if ( settings . parallel_replicas_count > 1 )
{
2017-07-28 20:41:51 +00:00
if ( relative_sample_size = = RelativeSize ( 0 ) )
2017-04-01 07:20:54 +00:00
relative_sample_size = 1 ;
2019-08-13 11:24:18 +00:00
relative_sample_size / = settings . parallel_replicas_count . value ;
relative_sample_offset + = relative_sample_size * RelativeSize ( settings . parallel_replica_offset . value ) ;
2017-04-01 07:20:54 +00:00
}
2017-07-28 20:41:51 +00:00
if ( relative_sample_offset > = RelativeSize ( 1 ) )
2017-04-01 07:20:54 +00:00
no_data = true ;
/// Calculate the half-interval of `[lower, upper)` column values.
bool has_lower_limit = false ;
bool has_upper_limit = false ;
RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum ;
RelativeSize upper_limit_rational = ( relative_sample_offset + relative_sample_size ) * size_of_universum ;
UInt64 lower = boost : : rational_cast < ASTSampleRatio : : BigNum > ( lower_limit_rational ) ;
UInt64 upper = boost : : rational_cast < ASTSampleRatio : : BigNum > ( upper_limit_rational ) ;
if ( lower > 0 )
has_lower_limit = true ;
if ( upper_limit_rational < size_of_universum )
has_upper_limit = true ;
/*std::cerr << std::fixed << std::setprecision(100)
< < " relative_sample_size: " < < relative_sample_size < < " \n "
< < " relative_sample_offset: " < < relative_sample_offset < < " \n "
< < " lower_limit_float: " < < lower_limit_rational < < " \n "
< < " upper_limit_float: " < < upper_limit_rational < < " \n "
< < " lower: " < < lower < < " \n "
< < " upper: " < < upper < < " \n " ; */
if ( ( has_upper_limit & & upper = = 0 )
| | ( has_lower_limit & & has_upper_limit & & lower = = upper ) )
no_data = true ;
if ( no_data | | ( ! has_lower_limit & & ! has_upper_limit ) )
{
use_sampling = false ;
}
else
{
/// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
std : : shared_ptr < ASTFunction > lower_function ;
std : : shared_ptr < ASTFunction > upper_function ;
2019-11-25 12:05:29 +00:00
/// If sample and final are used together no need to calculate sampling expression twice.
/// The first time it was calculated for final, because sample key is a part of the PK.
/// So, assume that we already have calculated column.
ASTPtr sampling_key_ast = data . getSamplingKeyAST ( ) ;
2020-05-20 15:16:39 +00:00
2019-11-25 12:05:29 +00:00
if ( select . final ( ) )
2019-11-25 12:49:05 +00:00
{
2020-05-21 19:46:03 +00:00
sampling_key_ast = std : : make_shared < ASTIdentifier > ( sampling_key . column_names [ 0 ] ) ;
2019-11-25 12:49:05 +00:00
/// We do spoil available_real_columns here, but it is not used later.
2020-05-21 19:46:03 +00:00
available_real_columns . emplace_back ( sampling_key . column_names [ 0 ] , std : : move ( sampling_column_type ) ) ;
2019-11-25 12:49:05 +00:00
}
2019-11-25 12:05:29 +00:00
2017-04-01 07:20:54 +00:00
if ( has_lower_limit )
{
2020-05-21 19:46:03 +00:00
if ( ! key_condition . addCondition ( sampling_key . column_names [ 0 ] , Range : : createLeftBounded ( lower , true ) ) )
2017-04-01 07:20:54 +00:00
throw Exception ( " Sampling column not in primary key " , ErrorCodes : : ILLEGAL_COLUMN ) ;
ASTPtr args = std : : make_shared < ASTExpressionList > ( ) ;
2019-11-25 12:05:29 +00:00
args - > children . push_back ( sampling_key_ast ) ;
2018-02-26 03:37:08 +00:00
args - > children . push_back ( std : : make_shared < ASTLiteral > ( lower ) ) ;
2017-04-01 07:20:54 +00:00
lower_function = std : : make_shared < ASTFunction > ( ) ;
lower_function - > name = " greaterOrEquals " ;
lower_function - > arguments = args ;
lower_function - > children . push_back ( lower_function - > arguments ) ;
filter_function = lower_function ;
}
if ( has_upper_limit )
{
2020-05-21 19:46:03 +00:00
if ( ! key_condition . addCondition ( sampling_key . column_names [ 0 ] , Range : : createRightBounded ( upper , false ) ) )
2017-04-01 07:20:54 +00:00
throw Exception ( " Sampling column not in primary key " , ErrorCodes : : ILLEGAL_COLUMN ) ;
ASTPtr args = std : : make_shared < ASTExpressionList > ( ) ;
2019-11-25 12:05:29 +00:00
args - > children . push_back ( sampling_key_ast ) ;
2018-02-26 03:37:08 +00:00
args - > children . push_back ( std : : make_shared < ASTLiteral > ( upper ) ) ;
2017-04-01 07:20:54 +00:00
upper_function = std : : make_shared < ASTFunction > ( ) ;
upper_function - > name = " less " ;
upper_function - > arguments = args ;
upper_function - > children . push_back ( upper_function - > arguments ) ;
filter_function = upper_function ;
}
if ( has_lower_limit & & has_upper_limit )
{
ASTPtr args = std : : make_shared < ASTExpressionList > ( ) ;
args - > children . push_back ( lower_function ) ;
args - > children . push_back ( upper_function ) ;
filter_function = std : : make_shared < ASTFunction > ( ) ;
filter_function - > name = " and " ;
filter_function - > arguments = args ;
filter_function - > children . push_back ( filter_function - > arguments ) ;
}
2018-11-08 15:43:14 +00:00
ASTPtr query = filter_function ;
2019-01-09 16:16:59 +00:00
auto syntax_result = SyntaxAnalyzer ( context ) . analyze ( query , available_real_columns ) ;
2018-11-08 17:28:52 +00:00
filter_expression = ExpressionAnalyzer ( filter_function , syntax_result , context ) . getActions ( false ) ;
2018-02-15 21:31:09 +00:00
2019-11-25 12:05:29 +00:00
if ( ! select . final ( ) )
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std : : vector < String > add_columns = filter_expression - > getRequiredColumns ( ) ;
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ,
column_names_to_read . end ( ) ) ;
}
2017-04-01 07:20:54 +00:00
}
}
if ( no_data )
{
LOG_DEBUG ( log , " Sampling yields no data. " ) ;
return { } ;
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Key condition: {} " , key_condition . toString ( ) ) ;
2017-09-01 20:33:17 +00:00
if ( minmax_idx_condition )
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " MinMax index condition: {} " , minmax_idx_condition - > toString ( ) ) ;
2017-04-01 07:20:54 +00:00
/// PREWHERE
String prewhere_column ;
2019-04-09 14:22:35 +00:00
if ( select . prewhere ( ) )
prewhere_column = select . prewhere ( ) - > getColumnName ( ) ;
2017-04-01 07:20:54 +00:00
RangesInDataParts parts_with_ranges ;
2019-06-19 15:30:48 +00:00
std : : vector < std : : pair < MergeTreeIndexPtr , MergeTreeIndexConditionPtr > > useful_indices ;
2020-05-28 12:37:05 +00:00
2020-05-28 12:47:17 +00:00
for ( const auto & index : data . getIndices ( ) )
2019-02-05 15:22:47 +00:00
{
2020-05-28 12:37:05 +00:00
auto index_helper = MergeTreeIndexFactory : : instance ( ) . get ( index ) ;
auto condition = index_helper - > createIndexCondition ( query_info , context ) ;
2019-02-05 15:22:47 +00:00
if ( ! condition - > alwaysUnknownOrTrue ( ) )
2020-05-28 12:37:05 +00:00
useful_indices . emplace_back ( index_helper , condition ) ;
2019-02-05 15:22:47 +00:00
}
2017-04-01 07:20:54 +00:00
/// Let's find what range to read from each part.
size_t sum_marks = 0 ;
size_t sum_ranges = 0 ;
for ( auto & part : parts )
{
2017-12-25 14:56:32 +00:00
RangesInDataPart ranges ( part , part_index + + ) ;
2017-04-01 07:20:54 +00:00
2018-02-19 17:31:30 +00:00
if ( data . hasPrimaryKey ( ) )
2019-03-25 13:55:24 +00:00
ranges . ranges = markRangesFromPKRange ( part , key_condition , settings ) ;
2017-04-01 07:20:54 +00:00
else
2019-11-05 17:42:35 +00:00
{
size_t total_marks_count = part - > getMarksCount ( ) ;
if ( total_marks_count )
{
if ( part - > index_granularity . hasFinalMark ( ) )
- - total_marks_count ;
ranges . ranges = MarkRanges { MarkRange { 0 , total_marks_count } } ;
}
}
2017-04-01 07:20:54 +00:00
2019-02-05 15:22:47 +00:00
for ( const auto & index_and_condition : useful_indices )
ranges . ranges = filterMarksUsingIndex (
index_and_condition . first , index_and_condition . second , part , ranges . ranges , settings ) ;
2017-04-01 07:20:54 +00:00
if ( ! ranges . ranges . empty ( ) )
{
parts_with_ranges . push_back ( ranges ) ;
sum_ranges + = ranges . ranges . size ( ) ;
2019-07-18 14:41:11 +00:00
sum_marks + = ranges . getMarksCount ( ) ;
2017-04-01 07:20:54 +00:00
}
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected {} parts by date, {} parts by key, {} marks to read from {} ranges " , parts . size ( ) , parts_with_ranges . size ( ) , sum_marks , sum_ranges ) ;
2017-04-01 07:20:54 +00:00
if ( parts_with_ranges . empty ( ) )
return { } ;
ProfileEvents : : increment ( ProfileEvents : : SelectedParts , parts_with_ranges . size ( ) ) ;
ProfileEvents : : increment ( ProfileEvents : : SelectedRanges , sum_ranges ) ;
ProfileEvents : : increment ( ProfileEvents : : SelectedMarks , sum_marks ) ;
2019-09-13 15:41:09 +00:00
Pipes res ;
2017-04-01 07:20:54 +00:00
2019-12-18 16:41:11 +00:00
MergeTreeReaderSettings reader_settings =
2019-10-10 16:30:30 +00:00
{
. min_bytes_to_use_direct_io = settings . min_bytes_to_use_direct_io ,
. max_read_buffer_size = settings . max_read_buffer_size ,
. save_marks_in_cache = true
} ;
2020-05-12 18:22:58 +00:00
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions.
ExpressionActionsPtr result_projection ;
2017-04-01 07:20:54 +00:00
if ( select . final ( ) )
{
2018-10-11 14:53:23 +00:00
/// Add columns needed to calculate the sorting expression and the sign.
2020-05-20 18:11:38 +00:00
std : : vector < String > add_columns = data . getColumnsRequiredForSortingKey ( ) ;
2017-04-01 07:20:54 +00:00
column_names_to_read . insert ( column_names_to_read . end ( ) , add_columns . begin ( ) , add_columns . end ( ) ) ;
if ( ! data . merging_params . sign_column . empty ( ) )
column_names_to_read . push_back ( data . merging_params . sign_column ) ;
if ( ! data . merging_params . version_column . empty ( ) )
column_names_to_read . push_back ( data . merging_params . version_column ) ;
std : : sort ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) ;
column_names_to_read . erase ( std : : unique ( column_names_to_read . begin ( ) , column_names_to_read . end ( ) ) , column_names_to_read . end ( ) ) ;
2017-06-02 15:54:39 +00:00
res = spreadMarkRangesAmongStreamsFinal (
2017-11-24 23:03:58 +00:00
std : : move ( parts_with_ranges ) ,
2020-04-22 13:52:07 +00:00
num_streams ,
2017-04-01 07:20:54 +00:00
column_names_to_read ,
max_block_size ,
settings . use_uncompressed_cache ,
2019-04-17 21:20:51 +00:00
query_info ,
2017-04-01 07:20:54 +00:00
virt_column_names ,
2019-10-10 16:30:30 +00:00
settings ,
2020-05-12 18:22:58 +00:00
reader_settings ,
result_projection ) ;
2017-04-01 07:20:54 +00:00
}
2019-11-15 14:03:42 +00:00
else if ( settings . optimize_read_in_order & & query_info . input_sorting_info )
2019-05-18 12:21:40 +00:00
{
2019-11-15 14:03:42 +00:00
size_t prefix_size = query_info . input_sorting_info - > order_key_prefix_descr . size ( ) ;
2020-05-21 19:46:03 +00:00
auto order_key_prefix_ast = data . getSortingKey ( ) . expression_list_ast - > clone ( ) ;
2019-07-28 00:41:26 +00:00
order_key_prefix_ast - > children . resize ( prefix_size ) ;
auto syntax_result = SyntaxAnalyzer ( context ) . analyze ( order_key_prefix_ast , data . getColumns ( ) . getAllPhysical ( ) ) ;
auto sorting_key_prefix_expr = ExpressionAnalyzer ( order_key_prefix_ast , syntax_result , context ) . getActions ( false ) ;
2019-07-28 01:16:56 +00:00
res = spreadMarkRangesAmongStreamsWithOrder (
2019-05-18 12:21:40 +00:00
std : : move ( parts_with_ranges ) ,
2019-07-18 14:41:11 +00:00
num_streams ,
2019-05-18 12:21:40 +00:00
column_names_to_read ,
max_block_size ,
settings . use_uncompressed_cache ,
query_info ,
2019-07-28 00:41:26 +00:00
sorting_key_prefix_expr ,
2017-04-01 07:20:54 +00:00
virt_column_names ,
2019-10-10 16:30:30 +00:00
settings ,
2020-05-12 18:22:58 +00:00
reader_settings ,
result_projection ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2017-06-02 15:54:39 +00:00
res = spreadMarkRangesAmongStreams (
2017-11-24 23:03:58 +00:00
std : : move ( parts_with_ranges ) ,
2017-06-02 15:54:39 +00:00
num_streams ,
2017-04-01 07:20:54 +00:00
column_names_to_read ,
max_block_size ,
settings . use_uncompressed_cache ,
2019-04-17 21:20:51 +00:00
query_info ,
2017-04-01 07:20:54 +00:00
virt_column_names ,
2019-10-10 16:30:30 +00:00
settings ,
reader_settings ) ;
2017-04-01 07:20:54 +00:00
}
if ( use_sampling )
2019-10-01 16:50:08 +00:00
{
for ( auto & pipe : res )
2019-10-20 09:12:42 +00:00
pipe . addSimpleTransform ( std : : make_shared < FilterTransform > (
pipe . getHeader ( ) , filter_expression , filter_function - > getColumnName ( ) , false ) ) ;
2019-10-01 16:50:08 +00:00
}
2017-04-01 07:20:54 +00:00
2020-05-12 18:22:58 +00:00
if ( result_projection )
{
for ( auto & pipe : res )
pipe . addSimpleTransform ( std : : make_shared < ExpressionTransform > (
pipe . getHeader ( ) , result_projection ) ) ;
}
2017-04-01 07:20:54 +00:00
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if ( sample_factor_column_queried )
2019-10-01 16:50:08 +00:00
{
for ( auto & pipe : res )
2019-10-20 09:12:42 +00:00
pipe . addSimpleTransform ( std : : make_shared < AddingConstColumnTransform < Float64 > > (
pipe . getHeader ( ) , std : : make_shared < DataTypeFloat64 > ( ) , used_sample_factor , " _sample_factor " ) ) ;
2019-10-01 16:50:08 +00:00
}
2017-04-01 07:20:54 +00:00
2018-10-04 08:58:19 +00:00
if ( query_info . prewhere_info & & query_info . prewhere_info - > remove_columns_actions )
2019-10-01 16:50:08 +00:00
{
for ( auto & pipe : res )
2019-10-20 09:12:42 +00:00
pipe . addSimpleTransform ( std : : make_shared < ExpressionTransform > (
pipe . getHeader ( ) , query_info . prewhere_info - > remove_columns_actions ) ) ;
2019-10-01 16:50:08 +00:00
}
2018-09-07 15:13:08 +00:00
2017-04-01 07:20:54 +00:00
return res ;
2014-03-13 12:48:07 +00:00
}
2019-04-01 12:10:32 +00:00
namespace
{
2019-04-01 11:09:30 +00:00
size_t roundRowsOrBytesToMarks (
size_t rows_setting ,
size_t bytes_setting ,
2019-06-19 10:07:56 +00:00
size_t rows_granularity ,
size_t bytes_granularity )
2019-04-01 11:09:30 +00:00
{
2019-06-19 10:07:56 +00:00
if ( bytes_granularity = = 0 )
2019-06-19 14:46:06 +00:00
return ( rows_setting + rows_granularity - 1 ) / rows_granularity ;
2019-04-01 11:09:30 +00:00
else
2019-06-19 10:07:56 +00:00
return ( bytes_setting + bytes_granularity - 1 ) / bytes_granularity ;
2019-04-01 11:09:30 +00:00
}
}
2015-02-15 04:16:11 +00:00
2019-09-13 15:41:09 +00:00
Pipes MergeTreeDataSelectExecutor : : spreadMarkRangesAmongStreams (
2017-11-24 23:03:58 +00:00
RangesInDataParts & & parts ,
2017-06-02 15:54:39 +00:00
size_t num_streams ,
2017-04-01 07:20:54 +00:00
const Names & column_names ,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size ,
2017-04-01 07:20:54 +00:00
bool use_uncompressed_cache ,
2019-04-17 21:20:51 +00:00
const SelectQueryInfo & query_info ,
2017-04-01 07:20:54 +00:00
const Names & virt_columns ,
2019-10-10 16:30:30 +00:00
const Settings & settings ,
2019-12-18 15:54:45 +00:00
const MergeTreeReaderSettings & reader_settings ) const
2014-03-13 12:48:07 +00:00
{
2017-04-01 07:20:54 +00:00
/// Count marks for each part.
std : : vector < size_t > sum_marks_in_parts ( parts . size ( ) ) ;
size_t sum_marks = 0 ;
2019-03-25 13:55:24 +00:00
size_t total_rows = 0 ;
2019-06-19 10:07:56 +00:00
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-06-19 10:07:56 +00:00
size_t adaptive_parts = 0 ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
2019-03-25 13:55:24 +00:00
total_rows + = parts [ i ] . getRowsCount ( ) ;
2019-07-18 14:41:11 +00:00
sum_marks_in_parts [ i ] = parts [ i ] . getMarksCount ( ) ;
2017-04-01 07:20:54 +00:00
sum_marks + = sum_marks_in_parts [ i ] ;
2019-07-18 14:41:11 +00:00
2019-06-19 10:07:56 +00:00
if ( parts [ i ] . data_part - > index_granularity_info . is_adaptive )
adaptive_parts + + ;
2017-04-01 07:20:54 +00:00
}
2019-06-19 10:07:56 +00:00
size_t index_granularity_bytes = 0 ;
if ( adaptive_parts > parts . size ( ) / 2 )
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings - > index_granularity_bytes ;
2019-06-19 10:07:56 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks (
settings . merge_tree_max_rows_to_use_cache ,
settings . merge_tree_max_bytes_to_use_cache ,
2019-08-13 10:29:31 +00:00
data_settings - > index_granularity ,
2019-06-19 10:07:56 +00:00
index_granularity_bytes ) ;
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks (
settings . merge_tree_min_rows_for_concurrent_read ,
settings . merge_tree_min_bytes_for_concurrent_read ,
2019-08-13 10:29:31 +00:00
data_settings - > index_granularity ,
2019-06-19 10:07:56 +00:00
index_granularity_bytes ) ;
2019-04-01 11:09:30 +00:00
if ( sum_marks > max_marks_to_use_cache )
2017-04-01 07:20:54 +00:00
use_uncompressed_cache = false ;
2019-09-13 15:41:09 +00:00
Pipes res ;
2019-12-19 22:38:05 +00:00
if ( 0 = = sum_marks )
return res ;
2017-04-01 07:20:54 +00:00
2019-12-19 22:38:05 +00:00
if ( num_streams > 1 )
2017-04-01 07:20:54 +00:00
{
2019-12-19 22:38:05 +00:00
/// Parallel query execution.
2017-06-02 15:54:39 +00:00
/// Reduce the number of num_streams if the data is small.
if ( sum_marks < num_streams * min_marks_for_concurrent_read & & parts . size ( ) < num_streams )
num_streams = std : : max ( ( sum_marks + min_marks_for_concurrent_read - 1 ) / min_marks_for_concurrent_read , parts . size ( ) ) ;
2017-04-01 07:20:54 +00:00
MergeTreeReadPoolPtr pool = std : : make_shared < MergeTreeReadPool > (
2019-07-18 14:41:11 +00:00
num_streams , sum_marks , min_marks_for_concurrent_read , parts , data , query_info . prewhere_info , true ,
column_names , MergeTreeReadPool : : BackoffSettings ( settings ) , settings . preferred_block_size_bytes , false ) ;
2017-04-01 07:20:54 +00:00
/// Let's estimate total number of rows for progress bar.
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Reading approx. {} rows with {} streams " , total_rows , num_streams ) ;
2017-04-01 07:20:54 +00:00
2017-07-21 06:35:58 +00:00
for ( size_t i = 0 ; i < num_streams ; + + i )
2017-04-01 07:20:54 +00:00
{
2019-10-04 15:40:05 +00:00
auto source = std : : make_shared < MergeTreeThreadSelectBlockInputProcessor > (
2017-06-30 16:28:27 +00:00
i , pool , min_marks_for_concurrent_read , max_block_size , settings . preferred_block_size_bytes ,
settings . preferred_max_column_in_block_size_bytes , data , use_uncompressed_cache ,
2019-12-19 13:10:57 +00:00
query_info . prewhere_info , reader_settings , virt_columns ) ;
2017-04-01 07:20:54 +00:00
if ( i = = 0 )
{
/// Set the approximate number of rows for the first source only
2019-10-04 15:40:05 +00:00
source - > addTotalRowsApprox ( total_rows ) ;
2017-04-01 07:20:54 +00:00
}
2019-10-04 15:40:05 +00:00
2019-10-20 09:12:42 +00:00
res . emplace_back ( std : : move ( source ) ) ;
2017-04-01 07:20:54 +00:00
}
}
2019-12-19 22:38:05 +00:00
else
2017-04-01 07:20:54 +00:00
{
2019-12-19 22:38:05 +00:00
/// Sequential query execution.
2017-04-01 07:20:54 +00:00
2020-03-09 01:59:08 +00:00
for ( const auto & part : parts )
2017-04-01 07:20:54 +00:00
{
2019-12-19 23:49:41 +00:00
auto source = std : : make_shared < MergeTreeSelectProcessor > (
2019-12-19 22:38:05 +00:00
data , part . data_part , max_block_size , settings . preferred_block_size_bytes ,
settings . preferred_max_column_in_block_size_bytes , column_names , part . ranges , use_uncompressed_cache ,
2019-12-25 00:10:24 +00:00
query_info . prewhere_info , true , reader_settings , virt_columns , part . part_index_in_query ) ;
2017-04-01 07:20:54 +00:00
2019-12-19 23:49:41 +00:00
res . emplace_back ( std : : move ( source ) ) ;
2017-04-01 07:20:54 +00:00
}
2020-03-25 12:10:24 +00:00
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if ( res . size ( ) > 1 )
{
auto concat = std : : make_shared < ConcatProcessor > ( res . front ( ) . getHeader ( ) , res . size ( ) ) ;
Pipe pipe ( std : : move ( res ) , std : : move ( concat ) ) ;
2020-03-25 12:17:11 +00:00
res = Pipes ( ) ;
2020-03-25 12:15:51 +00:00
res . emplace_back ( std : : move ( pipe ) ) ;
2020-03-25 12:10:24 +00:00
}
2017-04-01 07:20:54 +00:00
}
return res ;
2014-03-13 12:48:07 +00:00
}
2020-05-06 22:27:35 +00:00
static ExpressionActionsPtr createProjection ( const Pipe & pipe , const MergeTreeData & data )
{
const auto & header = pipe . getHeader ( ) ;
auto projection = std : : make_shared < ExpressionActions > ( header . getNamesAndTypesList ( ) , data . global_context ) ;
projection - > add ( ExpressionAction : : project ( header . getNames ( ) ) ) ;
return projection ;
}
2019-09-13 15:41:09 +00:00
Pipes MergeTreeDataSelectExecutor : : spreadMarkRangesAmongStreamsWithOrder (
2019-05-18 12:21:40 +00:00
RangesInDataParts & & parts ,
2019-07-18 14:41:11 +00:00
size_t num_streams ,
2019-05-18 12:21:40 +00:00
const Names & column_names ,
UInt64 max_block_size ,
bool use_uncompressed_cache ,
const SelectQueryInfo & query_info ,
2019-07-28 00:41:26 +00:00
const ExpressionActionsPtr & sorting_key_prefix_expr ,
2019-05-18 12:21:40 +00:00
const Names & virt_columns ,
2019-10-10 16:30:30 +00:00
const Settings & settings ,
2020-05-12 18:22:58 +00:00
const MergeTreeReaderSettings & reader_settings ,
ExpressionActionsPtr & out_projection ) const
2019-05-18 12:21:40 +00:00
{
2019-07-18 14:41:11 +00:00
size_t sum_marks = 0 ;
2019-11-15 14:03:42 +00:00
const InputSortingInfoPtr & input_sorting_info = query_info . input_sorting_info ;
2019-07-18 14:41:11 +00:00
size_t adaptive_parts = 0 ;
std : : vector < size_t > sum_marks_in_parts ( parts . size ( ) ) ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-07-18 14:41:11 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
{
sum_marks_in_parts [ i ] = parts [ i ] . getMarksCount ( ) ;
sum_marks + = sum_marks_in_parts [ i ] ;
if ( parts [ i ] . data_part - > index_granularity_info . is_adaptive )
adaptive_parts + + ;
}
2019-07-18 18:34:15 +00:00
size_t index_granularity_bytes = 0 ;
if ( adaptive_parts > parts . size ( ) / 2 )
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings - > index_granularity_bytes ;
2019-07-18 18:34:15 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks (
settings . merge_tree_max_rows_to_use_cache ,
settings . merge_tree_max_bytes_to_use_cache ,
2019-08-13 10:29:31 +00:00
data_settings - > index_granularity ,
2019-07-18 18:34:15 +00:00
index_granularity_bytes ) ;
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks (
settings . merge_tree_min_rows_for_concurrent_read ,
settings . merge_tree_min_bytes_for_concurrent_read ,
2019-08-13 10:29:31 +00:00
data_settings - > index_granularity ,
2019-07-18 18:34:15 +00:00
index_granularity_bytes ) ;
if ( sum_marks > max_marks_to_use_cache )
use_uncompressed_cache = false ;
2019-10-20 09:12:42 +00:00
Pipes res ;
2019-07-18 14:41:11 +00:00
if ( sum_marks = = 0 )
2019-10-20 09:12:42 +00:00
return res ;
2019-07-18 14:41:11 +00:00
2019-08-02 16:16:18 +00:00
/// Let's split ranges to avoid reading much data.
2019-08-19 08:25:07 +00:00
auto split_ranges = [ rows_granularity = data_settings - > index_granularity , max_block_size ] ( const auto & ranges , int direction )
2019-08-01 19:03:39 +00:00
{
MarkRanges new_ranges ;
const size_t max_marks_in_range = ( max_block_size + rows_granularity - 1 ) / rows_granularity ;
2019-08-02 17:34:29 +00:00
size_t marks_in_range = 1 ;
2019-08-01 19:03:39 +00:00
2019-08-02 17:34:29 +00:00
if ( direction = = 1 )
2019-08-01 19:03:39 +00:00
{
2019-08-02 17:34:29 +00:00
/// Split first few ranges to avoid reading much data.
bool splitted = false ;
for ( auto range : ranges )
2019-08-01 19:03:39 +00:00
{
2019-08-02 17:34:29 +00:00
while ( ! splitted & & range . begin + marks_in_range < range . end )
2019-08-01 19:03:39 +00:00
{
new_ranges . emplace_back ( range . begin , range . begin + marks_in_range ) ;
range . begin + = marks_in_range ;
marks_in_range * = 2 ;
2019-08-02 17:34:29 +00:00
if ( marks_in_range > max_marks_in_range )
splitted = true ;
2019-08-01 19:03:39 +00:00
}
2019-08-02 17:34:29 +00:00
new_ranges . emplace_back ( range . begin , range . end ) ;
}
}
else
{
/// Split all ranges to avoid reading much data, because we have to
/// store whole range in memory to reverse it.
for ( auto it = ranges . rbegin ( ) ; it ! = ranges . rend ( ) ; + + it )
{
auto range = * it ;
while ( range . begin + marks_in_range < range . end )
2019-08-01 19:03:39 +00:00
{
2020-02-10 12:36:01 +00:00
new_ranges . emplace_front ( range . end - marks_in_range , range . end ) ;
2019-08-01 19:03:39 +00:00
range . end - = marks_in_range ;
marks_in_range = std : : min ( marks_in_range * 2 , max_marks_in_range ) ;
}
2020-02-10 12:36:01 +00:00
new_ranges . emplace_front ( range . begin , range . end ) ;
2019-08-01 19:03:39 +00:00
}
}
return new_ranges ;
} ;
2019-07-18 14:41:11 +00:00
const size_t min_marks_per_stream = ( sum_marks - 1 ) / num_streams + 1 ;
for ( size_t i = 0 ; i < num_streams & & ! parts . empty ( ) ; + + i )
2019-05-18 12:21:40 +00:00
{
2019-07-18 14:41:11 +00:00
size_t need_marks = min_marks_per_stream ;
2019-05-25 11:09:23 +00:00
2019-10-20 09:12:42 +00:00
Pipes pipes ;
2019-05-18 12:21:40 +00:00
2019-07-18 14:41:11 +00:00
/// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
/// and assign a stream to read from it.
while ( need_marks > 0 & & ! parts . empty ( ) )
{
RangesInDataPart part = parts . back ( ) ;
parts . pop_back ( ) ;
2020-02-18 21:00:51 +00:00
size_t & marks_in_part = sum_marks_in_parts . back ( ) ;
2019-07-18 14:41:11 +00:00
/// We will not take too few rows from a part.
if ( marks_in_part > = min_marks_for_concurrent_read & &
need_marks < min_marks_for_concurrent_read )
need_marks = min_marks_for_concurrent_read ;
/// Do not leave too few rows in the part.
if ( marks_in_part > need_marks & &
marks_in_part - need_marks < min_marks_for_concurrent_read )
need_marks = marks_in_part ;
2019-05-18 12:21:40 +00:00
2019-07-18 14:41:11 +00:00
MarkRanges ranges_to_get_from_part ;
/// We take the whole part if it is small enough.
if ( marks_in_part < = need_marks )
{
ranges_to_get_from_part = part . ranges ;
need_marks - = marks_in_part ;
sum_marks_in_parts . pop_back ( ) ;
}
else
{
/// Loop through ranges in part. Take enough ranges to cover "need_marks".
while ( need_marks > 0 )
{
if ( part . ranges . empty ( ) )
throw Exception ( " Unexpected end of ranges while spreading marks among streams " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-02-10 12:36:01 +00:00
MarkRange & range = part . ranges . front ( ) ;
2019-07-18 14:41:11 +00:00
const size_t marks_in_range = range . end - range . begin ;
const size_t marks_to_get_from_range = std : : min ( marks_in_range , need_marks ) ;
ranges_to_get_from_part . emplace_back ( range . begin , range . begin + marks_to_get_from_range ) ;
range . begin + = marks_to_get_from_range ;
marks_in_part - = marks_to_get_from_range ;
need_marks - = marks_to_get_from_range ;
if ( range . begin = = range . end )
2020-02-10 12:36:01 +00:00
part . ranges . pop_front ( ) ;
2019-07-18 14:41:11 +00:00
}
parts . emplace_back ( part ) ;
}
2019-11-15 14:03:42 +00:00
ranges_to_get_from_part = split_ranges ( ranges_to_get_from_part , input_sorting_info - > direction ) ;
2019-08-01 19:03:39 +00:00
2019-11-15 14:03:42 +00:00
if ( input_sorting_info - > direction = = 1 )
2019-07-18 14:41:11 +00:00
{
2019-10-20 09:12:42 +00:00
pipes . emplace_back ( std : : make_shared < MergeTreeSelectProcessor > (
2019-07-18 14:41:11 +00:00
data , part . data_part , max_block_size , settings . preferred_block_size_bytes ,
settings . preferred_max_column_in_block_size_bytes , column_names , ranges_to_get_from_part ,
2019-10-10 16:30:30 +00:00
use_uncompressed_cache , query_info . prewhere_info , true , reader_settings ,
2019-12-19 13:10:57 +00:00
virt_columns , part . part_index_in_query ) ) ;
2019-07-18 14:41:11 +00:00
}
else
{
2019-10-20 09:12:42 +00:00
pipes . emplace_back ( std : : make_shared < MergeTreeReverseSelectProcessor > (
2019-07-18 14:41:11 +00:00
data , part . data_part , max_block_size , settings . preferred_block_size_bytes ,
settings . preferred_max_column_in_block_size_bytes , column_names , ranges_to_get_from_part ,
2019-10-10 16:30:30 +00:00
use_uncompressed_cache , query_info . prewhere_info , true , reader_settings ,
2019-12-19 13:10:57 +00:00
virt_columns , part . part_index_in_query ) ) ;
2019-07-18 14:41:11 +00:00
2019-10-20 09:12:42 +00:00
pipes . back ( ) . addSimpleTransform ( std : : make_shared < ReverseTransform > ( pipes . back ( ) . getHeader ( ) ) ) ;
2019-07-18 14:41:11 +00:00
}
}
2019-10-20 09:12:42 +00:00
if ( pipes . size ( ) > 1 )
2019-07-26 22:18:27 +00:00
{
SortDescription sort_description ;
2019-11-15 14:03:42 +00:00
for ( size_t j = 0 ; j < input_sorting_info - > order_key_prefix_descr . size ( ) ; + + j )
2020-05-21 19:46:03 +00:00
sort_description . emplace_back ( data . getSortingKey ( ) . column_names [ j ] ,
2019-11-15 14:03:42 +00:00
input_sorting_info - > direction , 1 ) ;
2019-07-26 22:18:27 +00:00
2020-05-13 15:53:47 +00:00
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
2020-05-12 18:22:58 +00:00
out_projection = createProjection ( pipes . back ( ) , data ) ;
2019-10-20 09:12:42 +00:00
for ( auto & pipe : pipes )
pipe . addSimpleTransform ( std : : make_shared < ExpressionTransform > ( pipe . getHeader ( ) , sorting_key_prefix_expr ) ) ;
2019-07-26 22:18:27 +00:00
2019-10-20 09:12:42 +00:00
auto merging_sorted = std : : make_shared < MergingSortedTransform > (
pipes . back ( ) . getHeader ( ) , pipes . size ( ) , sort_description , max_block_size ) ;
2019-07-24 14:23:57 +00:00
2020-05-12 18:22:58 +00:00
res . emplace_back ( std : : move ( pipes ) , std : : move ( merging_sorted ) ) ;
2019-07-26 22:18:27 +00:00
}
2019-07-18 14:41:11 +00:00
else
2019-10-20 09:12:42 +00:00
res . emplace_back ( std : : move ( pipes . front ( ) ) ) ;
2019-05-18 12:21:40 +00:00
}
2019-10-20 09:12:42 +00:00
return res ;
2019-05-18 12:21:40 +00:00
}
2019-09-13 15:41:09 +00:00
Pipes MergeTreeDataSelectExecutor : : spreadMarkRangesAmongStreamsFinal (
2017-11-24 23:03:58 +00:00
RangesInDataParts & & parts ,
2020-04-22 13:52:07 +00:00
size_t num_streams ,
2017-04-01 07:20:54 +00:00
const Names & column_names ,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size ,
2017-04-01 07:20:54 +00:00
bool use_uncompressed_cache ,
2019-04-17 21:20:51 +00:00
const SelectQueryInfo & query_info ,
2017-04-01 07:20:54 +00:00
const Names & virt_columns ,
2019-10-10 16:30:30 +00:00
const Settings & settings ,
2020-05-12 18:22:58 +00:00
const MergeTreeReaderSettings & reader_settings ,
ExpressionActionsPtr & out_projection ) const
2014-03-13 12:48:07 +00:00
{
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-04-01 11:09:30 +00:00
size_t sum_marks = 0 ;
2019-06-19 10:07:56 +00:00
size_t adaptive_parts = 0 ;
2020-03-09 01:59:08 +00:00
for ( const auto & part : parts )
2019-06-19 10:07:56 +00:00
{
2020-03-09 01:59:08 +00:00
for ( const auto & range : part . ranges )
sum_marks + = range . end - range . begin ;
2017-04-01 07:20:54 +00:00
2020-03-09 01:59:08 +00:00
if ( part . data_part - > index_granularity_info . is_adaptive )
+ + adaptive_parts ;
2019-06-19 10:07:56 +00:00
}
size_t index_granularity_bytes = 0 ;
if ( adaptive_parts > = parts . size ( ) / 2 )
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings - > index_granularity_bytes ;
2019-06-19 10:07:56 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks (
settings . merge_tree_max_rows_to_use_cache ,
settings . merge_tree_max_bytes_to_use_cache ,
2019-08-13 10:29:31 +00:00
data_settings - > index_granularity ,
2019-06-19 10:07:56 +00:00
index_granularity_bytes ) ;
2019-04-01 11:09:30 +00:00
if ( sum_marks > max_marks_to_use_cache )
2017-04-01 07:20:54 +00:00
use_uncompressed_cache = false ;
2019-10-01 16:50:08 +00:00
Pipes pipes ;
2017-04-01 07:20:54 +00:00
2020-03-09 01:59:08 +00:00
for ( const auto & part : parts )
2017-04-01 07:20:54 +00:00
{
2019-10-01 16:50:08 +00:00
auto source_processor = std : : make_shared < MergeTreeSelectProcessor > (
2017-06-30 16:28:27 +00:00
data , part . data_part , max_block_size , settings . preferred_block_size_bytes ,
settings . preferred_max_column_in_block_size_bytes , column_names , part . ranges , use_uncompressed_cache ,
2019-10-10 16:30:30 +00:00
query_info . prewhere_info , true , reader_settings ,
2017-04-05 20:34:19 +00:00
virt_columns , part . part_index_in_query ) ;
2017-04-01 07:20:54 +00:00
2019-10-20 09:12:42 +00:00
Pipe pipe ( std : : move ( source_processor ) ) ;
2020-05-13 15:53:47 +00:00
/// Drop temporary columns, added by 'sorting_key_expr'
2020-05-12 18:22:58 +00:00
if ( ! out_projection )
out_projection = createProjection ( pipe , data ) ;
2020-05-06 22:27:35 +00:00
2020-05-21 19:46:03 +00:00
pipe . addSimpleTransform ( std : : make_shared < ExpressionTransform > ( pipe . getHeader ( ) , data . getSortingKey ( ) . expression ) ) ;
2019-10-01 16:50:08 +00:00
pipes . emplace_back ( std : : move ( pipe ) ) ;
2017-04-01 07:20:54 +00:00
}
2020-05-21 19:46:03 +00:00
Names sort_columns = data . getSortingKeyColumns ( ) ;
2018-06-30 21:35:01 +00:00
SortDescription sort_description ;
size_t sort_columns_size = sort_columns . size ( ) ;
sort_description . reserve ( sort_columns_size ) ;
2019-10-20 09:12:42 +00:00
Block header = pipes . at ( 0 ) . getHeader ( ) ;
2018-06-30 21:35:01 +00:00
for ( size_t i = 0 ; i < sort_columns_size ; + + i )
sort_description . emplace_back ( header . getPositionByName ( sort_columns [ i ] ) , 1 , 1 ) ;
2017-04-01 07:20:54 +00:00
2020-04-22 13:52:07 +00:00
auto get_merging_processor = [ & ] ( ) - > MergingTransformPtr
2019-10-01 16:50:08 +00:00
{
2020-04-22 13:52:07 +00:00
switch ( data . merging_params . mode )
{
case MergeTreeData : : MergingParams : : Ordinary :
{
return std : : make_shared < MergingSortedTransform > ( header , pipes . size ( ) ,
sort_description , max_block_size ) ;
}
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData : : MergingParams : : Collapsing :
return std : : make_shared < CollapsingSortedTransform > ( header , pipes . size ( ) ,
sort_description , data . merging_params . sign_column , true , max_block_size ) ;
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData : : MergingParams : : Summing :
return std : : make_shared < SummingSortedTransform > ( header , pipes . size ( ) ,
sort_description , data . merging_params . columns_to_sum , max_block_size ) ;
case MergeTreeData : : MergingParams : : Aggregating :
return std : : make_shared < AggregatingSortedTransform > ( header , pipes . size ( ) ,
sort_description , max_block_size ) ;
case MergeTreeData : : MergingParams : : Replacing :
return std : : make_shared < ReplacingSortedTransform > ( header , pipes . size ( ) ,
sort_description , data . merging_params . version_column , max_block_size ) ;
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData : : MergingParams : : VersionedCollapsing :
return std : : make_shared < VersionedCollapsingTransform > ( header , pipes . size ( ) ,
sort_description , data . merging_params . sign_column , max_block_size ) ;
case MergeTreeData : : MergingParams : : Graphite :
throw Exception ( " GraphiteMergeTree doesn't support FINAL " , ErrorCodes : : LOGICAL_ERROR ) ;
}
__builtin_unreachable ( ) ;
2019-10-01 16:50:08 +00:00
} ;
2020-04-22 13:52:07 +00:00
if ( num_streams > settings . max_final_threads )
num_streams = settings . max_final_threads ;
2020-05-28 10:57:04 +00:00
if ( num_streams < = 1 | | sort_description . empty ( ) )
2020-04-22 13:52:07 +00:00
{
Pipe pipe ( std : : move ( pipes ) , get_merging_processor ( ) ) ;
pipes = Pipes ( ) ;
pipes . emplace_back ( std : : move ( pipe ) ) ;
return pipes ;
}
ColumnNumbers key_columns ;
key_columns . reserve ( sort_description . size ( ) ) ;
for ( auto & desc : sort_description )
{
if ( ! desc . column_name . empty ( ) )
key_columns . push_back ( header . getPositionByName ( desc . column_name ) ) ;
else
key_columns . emplace_back ( desc . column_number ) ;
}
Processors selectors ;
Processors copiers ;
selectors . reserve ( pipes . size ( ) ) ;
for ( auto & pipe : pipes )
{
auto selector = std : : make_shared < AddingSelectorTransform > ( pipe . getHeader ( ) , num_streams , key_columns ) ;
auto copier = std : : make_shared < CopyTransform > ( pipe . getHeader ( ) , num_streams ) ;
connect ( pipe . getPort ( ) , selector - > getInputPort ( ) ) ;
connect ( selector - > getOutputPort ( ) , copier - > getInputPort ( ) ) ;
selectors . emplace_back ( std : : move ( selector ) ) ;
copiers . emplace_back ( std : : move ( copier ) ) ;
}
Processors merges ;
std : : vector < InputPorts : : iterator > input_ports ;
merges . reserve ( num_streams ) ;
input_ports . reserve ( num_streams ) ;
for ( size_t i = 0 ; i < num_streams ; + + i )
2017-04-01 07:20:54 +00:00
{
2020-04-22 13:52:07 +00:00
auto merge = get_merging_processor ( ) ;
merge - > setSelectorPosition ( i ) ;
input_ports . emplace_back ( merge - > getInputs ( ) . begin ( ) ) ;
merges . emplace_back ( std : : move ( merge ) ) ;
}
/// Connect outputs of i-th splitter with i-th input port of every merge.
for ( auto & resize : copiers )
{
size_t input_num = 0 ;
for ( auto & output : resize - > getOutputs ( ) )
2019-10-20 09:12:42 +00:00
{
2020-04-22 13:52:07 +00:00
connect ( output , * input_ports [ input_num ] ) ;
+ + input_ports [ input_num ] ;
+ + input_num ;
2019-10-20 09:12:42 +00:00
}
2017-04-01 07:20:54 +00:00
}
2020-04-22 13:52:07 +00:00
Processors processors ;
for ( auto & pipe : pipes )
2020-04-14 10:04:49 +00:00
{
2020-04-22 13:52:07 +00:00
auto pipe_processors = std : : move ( pipe ) . detachProcessors ( ) ;
processors . insert ( processors . end ( ) , pipe_processors . begin ( ) , pipe_processors . end ( ) ) ;
2020-04-14 10:04:49 +00:00
}
2020-04-22 13:52:07 +00:00
pipes . clear ( ) ;
pipes . reserve ( num_streams ) ;
for ( auto & merge : merges )
pipes . emplace_back ( & merge - > getOutputs ( ) . front ( ) ) ;
pipes . front ( ) . addProcessors ( processors ) ;
pipes . front ( ) . addProcessors ( selectors ) ;
pipes . front ( ) . addProcessors ( copiers ) ;
pipes . front ( ) . addProcessors ( merges ) ;
2019-10-01 16:50:08 +00:00
2019-10-20 09:12:42 +00:00
return pipes ;
2014-03-13 12:48:07 +00:00
}
2016-03-27 11:37:25 +00:00
2016-03-05 03:17:11 +00:00
void MergeTreeDataSelectExecutor : : createPositiveSignCondition (
2017-04-01 07:20:54 +00:00
ExpressionActionsPtr & out_expression , String & out_column , const Context & context ) const
2014-03-13 12:48:07 +00:00
{
2017-04-01 07:20:54 +00:00
auto function = std : : make_shared < ASTFunction > ( ) ;
auto arguments = std : : make_shared < ASTExpressionList > ( ) ;
2018-02-26 03:37:08 +00:00
auto sign = std : : make_shared < ASTIdentifier > ( data . merging_params . sign_column ) ;
2018-10-22 08:54:54 +00:00
auto one = std : : make_shared < ASTLiteral > ( 1 ) ;
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
function - > name = " equals " ;
function - > arguments = arguments ;
function - > children . push_back ( arguments ) ;
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
arguments - > children . push_back ( sign ) ;
arguments - > children . push_back ( one ) ;
2014-03-13 12:48:07 +00:00
2018-11-08 15:43:14 +00:00
ASTPtr query = function ;
2019-01-09 16:16:59 +00:00
auto syntax_result = SyntaxAnalyzer ( context ) . analyze ( query , data . getColumns ( ) . getAllPhysical ( ) ) ;
2018-11-08 17:28:52 +00:00
out_expression = ExpressionAnalyzer ( query , syntax_result , context ) . getActions ( false ) ;
2017-04-01 07:20:54 +00:00
out_column = function - > getColumnName ( ) ;
2014-03-13 12:48:07 +00:00
}
2016-03-27 11:37:25 +00:00
2017-01-20 02:22:18 +00:00
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
2017-03-12 19:18:07 +00:00
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
2015-11-29 11:58:44 +00:00
MarkRanges MergeTreeDataSelectExecutor : : markRangesFromPKRange (
2019-03-25 13:55:24 +00:00
const MergeTreeData : : DataPartPtr & part , const KeyCondition & key_condition , const Settings & settings ) const
2014-03-13 12:48:07 +00:00
{
2017-04-01 07:20:54 +00:00
MarkRanges res ;
2019-03-25 13:55:24 +00:00
size_t marks_count = part - > index_granularity . getMarksCount ( ) ;
const auto & index = part - > index ;
2018-05-23 19:34:37 +00:00
if ( marks_count = = 0 )
return res ;
2017-04-01 07:20:54 +00:00
2019-06-18 12:54:27 +00:00
bool has_final_mark = part - > index_granularity . hasFinalMark ( ) ;
2017-04-01 07:20:54 +00:00
/// If index is not used.
if ( key_condition . alwaysUnknownOrTrue ( ) )
{
2019-06-18 12:54:27 +00:00
if ( has_final_mark )
res . push_back ( MarkRange ( 0 , marks_count - 1 ) ) ;
else
res . push_back ( MarkRange ( 0 , marks_count ) ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2018-05-23 19:34:37 +00:00
size_t used_key_size = key_condition . getMaxKeyColumn ( ) + 1 ;
2019-04-01 11:09:30 +00:00
size_t min_marks_for_seek = roundRowsOrBytesToMarks (
settings . merge_tree_min_rows_for_seek ,
settings . merge_tree_min_bytes_for_seek ,
2019-06-19 10:07:56 +00:00
part - > index_granularity_info . fixed_index_granularity ,
part - > index_granularity_info . index_granularity_bytes ) ;
2018-05-23 19:34:37 +00:00
2017-04-01 07:20:54 +00:00
/** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
* At each step , take the left segment and check if it fits .
* If fits , split it into smaller ones and put them on the stack . If not , discard it .
* If the segment is already of one mark length , add it to response and discard it .
*/
2020-04-02 17:27:07 +00:00
std : : vector < MarkRange > ranges_stack = { { 0 , marks_count } } ;
2020-04-06 10:36:56 +00:00
std : : function < void ( size_t , size_t , FieldRef & ) > create_field_ref ;
2020-04-08 02:54:08 +00:00
/// If there are no monotonic functions, there is no need to save block reference.
2020-04-06 10:36:56 +00:00
/// Passing explicit field to FieldRef allows to optimize ranges and shows better performance.
2020-05-20 18:11:38 +00:00
const auto & primary_key = data . getPrimaryKey ( ) ;
2020-04-02 17:27:07 +00:00
if ( key_condition . hasMonotonicFunctionsChain ( ) )
2020-04-06 10:36:56 +00:00
{
auto index_block = std : : make_shared < Block > ( ) ;
for ( size_t i = 0 ; i < used_key_size ; + + i )
2020-05-21 19:46:03 +00:00
index_block - > insert ( { index [ i ] , primary_key . data_types [ i ] , primary_key . column_names [ i ] } ) ;
2020-04-06 10:36:56 +00:00
create_field_ref = [ index_block ] ( size_t row , size_t column , FieldRef & field )
{
field = { index_block . get ( ) , row , column } ;
} ;
}
2020-04-02 17:27:07 +00:00
else
2020-04-06 10:36:56 +00:00
{
create_field_ref = [ & index ] ( size_t row , size_t column , FieldRef & field )
{
index [ column ] - > get ( row , field ) ;
} ;
}
2017-04-01 07:20:54 +00:00
2018-04-20 00:20:36 +00:00
/// NOTE Creating temporary Field objects to pass to KeyCondition.
2020-04-02 17:27:07 +00:00
std : : vector < FieldRef > index_left ( used_key_size ) ;
std : : vector < FieldRef > index_right ( used_key_size ) ;
2017-04-01 07:20:54 +00:00
while ( ! ranges_stack . empty ( ) )
{
MarkRange range = ranges_stack . back ( ) ;
ranges_stack . pop_back ( ) ;
bool may_be_true ;
2019-06-18 12:54:27 +00:00
if ( range . end = = marks_count & & ! has_final_mark )
2017-04-01 07:20:54 +00:00
{
for ( size_t i = 0 ; i < used_key_size ; + + i )
2020-04-06 10:36:56 +00:00
create_field_ref ( range . begin , i , index_left [ i ] ) ;
2017-04-01 07:20:54 +00:00
2019-09-24 01:29:26 +00:00
may_be_true = key_condition . mayBeTrueAfter (
2020-05-20 18:11:38 +00:00
used_key_size , index_left . data ( ) , primary_key . data_types ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2019-06-18 12:54:27 +00:00
if ( has_final_mark & & range . end = = marks_count )
range . end - = 1 ; /// Remove final empty mark. It's useful only for primary key condition.
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < used_key_size ; + + i )
{
2020-04-06 10:36:56 +00:00
create_field_ref ( range . begin , i , index_left [ i ] ) ;
create_field_ref ( range . end , i , index_right [ i ] ) ;
2017-04-01 07:20:54 +00:00
}
2019-09-24 01:29:26 +00:00
may_be_true = key_condition . mayBeTrueInRange (
2020-05-20 18:11:38 +00:00
used_key_size , index_left . data ( ) , index_right . data ( ) , primary_key . data_types ) ;
2017-04-01 07:20:54 +00:00
}
if ( ! may_be_true )
continue ;
if ( range . end = = range . begin + 1 )
{
/// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
if ( res . empty ( ) | | range . begin - res . back ( ) . end > min_marks_for_seek )
res . push_back ( range ) ;
else
res . back ( ) . end = range . end ;
}
else
{
/// Break the segment and put the result on the stack from right to left.
size_t step = ( range . end - range . begin - 1 ) / settings . merge_tree_coarse_index_granularity + 1 ;
size_t end ;
for ( end = range . end ; end > range . begin + step ; end - = step )
2020-04-02 17:27:07 +00:00
ranges_stack . emplace_back ( end - step , end ) ;
2017-04-01 07:20:54 +00:00
2020-04-02 17:27:07 +00:00
ranges_stack . emplace_back ( range . begin , end ) ;
2017-04-01 07:20:54 +00:00
}
}
}
return res ;
2014-03-13 12:48:07 +00:00
}
2019-01-08 19:41:36 +00:00
MarkRanges MergeTreeDataSelectExecutor : : filterMarksUsingIndex (
2020-05-28 13:45:08 +00:00
MergeTreeIndexPtr index_helper ,
2019-06-19 15:30:48 +00:00
MergeTreeIndexConditionPtr condition ,
2019-01-09 17:05:52 +00:00
MergeTreeData : : DataPartPtr part ,
const MarkRanges & ranges ,
const Settings & settings ) const
2019-01-08 19:41:36 +00:00
{
2020-05-28 13:45:08 +00:00
if ( ! part - > volume - > getDisk ( ) - > exists ( part - > getFullRelativePath ( ) + index_helper - > getFileName ( ) + " .idx " ) )
2019-01-30 13:34:28 +00:00
{
2020-05-28 13:45:08 +00:00
LOG_DEBUG ( log , " File for index {} does not exist. Skipping it. " , backQuote ( index_helper - > index . name ) ) ;
2019-01-08 19:41:36 +00:00
return ranges ;
2019-01-30 13:34:28 +00:00
}
2019-01-07 12:51:14 +00:00
2020-05-28 13:45:08 +00:00
auto index_granularity = index_helper - > index . granularity ;
2020-05-28 12:37:05 +00:00
2019-04-01 11:09:30 +00:00
const size_t min_marks_for_seek = roundRowsOrBytesToMarks (
settings . merge_tree_min_rows_for_seek ,
settings . merge_tree_min_bytes_for_seek ,
2020-01-28 20:10:06 +00:00
part - > index_granularity_info . fixed_index_granularity ,
part - > index_granularity_info . index_granularity_bytes ) ;
2019-01-07 12:51:14 +00:00
2019-01-30 13:34:28 +00:00
size_t granules_dropped = 0 ;
2019-06-18 12:54:27 +00:00
size_t marks_count = part - > getMarksCount ( ) ;
size_t final_mark = part - > index_granularity . hasFinalMark ( ) ;
2020-05-28 12:37:05 +00:00
size_t index_marks_count = ( marks_count - final_mark + index_granularity - 1 ) / index_granularity ;
2019-06-18 12:54:27 +00:00
2019-01-08 19:41:36 +00:00
MergeTreeIndexReader reader (
2020-05-28 13:45:08 +00:00
index_helper , part ,
2019-06-18 12:54:27 +00:00
index_marks_count ,
2019-01-08 19:41:36 +00:00
ranges ) ;
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
MarkRanges res ;
2019-01-07 12:51:14 +00:00
2019-01-22 15:58:11 +00:00
/// Some granules can cover two or more ranges,
/// this variable is stored to avoid reading the same granule twice.
2019-01-08 19:41:36 +00:00
MergeTreeIndexGranulePtr granule = nullptr ;
size_t last_index_mark = 0 ;
for ( const auto & range : ranges )
{
MarkRange index_range (
2020-05-28 12:37:05 +00:00
range . begin / index_granularity ,
( range . end + index_granularity - 1 ) / index_granularity ) ;
2019-01-07 12:51:14 +00:00
2019-01-22 15:58:11 +00:00
if ( last_index_mark ! = index_range . begin | | ! granule )
2019-01-08 19:41:36 +00:00
reader . seek ( index_range . begin ) ;
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
for ( size_t index_mark = index_range . begin ; index_mark < index_range . end ; + + index_mark )
{
if ( index_mark ! = index_range . begin | | ! granule | | last_index_mark ! = index_range . begin )
granule = reader . read ( ) ;
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
MarkRange data_range (
2020-05-28 12:37:05 +00:00
std : : max ( range . begin , index_mark * index_granularity ) ,
std : : min ( range . end , ( index_mark + 1 ) * index_granularity ) ) ;
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
if ( ! condition - > mayBeTrueOnGranule ( granule ) )
2019-01-09 14:15:23 +00:00
{
2019-01-30 13:34:28 +00:00
+ + granules_dropped ;
2019-01-08 19:41:36 +00:00
continue ;
2019-01-09 14:15:23 +00:00
}
2019-01-07 12:51:14 +00:00
2019-02-14 16:59:26 +00:00
if ( res . empty ( ) | | res . back ( ) . end - data_range . begin > min_marks_for_seek )
2019-01-08 19:41:36 +00:00
res . push_back ( data_range ) ;
else
res . back ( ) . end = data_range . end ;
2019-01-07 12:51:14 +00:00
}
2019-01-08 19:41:36 +00:00
last_index_mark = index_range . end - 1 ;
2019-01-07 12:51:14 +00:00
}
2019-01-30 13:34:28 +00:00
2020-05-28 13:45:08 +00:00
LOG_DEBUG ( log , " Index {} has dropped {} granules. " , backQuote ( index_helper - > index . name ) , granules_dropped ) ;
2019-01-30 13:34:28 +00:00
2019-01-08 19:41:36 +00:00
return res ;
}
2019-04-01 11:09:30 +00:00
2019-01-07 12:51:14 +00:00
2014-03-13 12:48:07 +00:00
}