2017-04-01 09:19:00 +00:00
# include <DataStreams/ExpressionBlockInputStream.h>
# include <DataStreams/FilterBlockInputStream.h>
# include <DataStreams/LimitBlockInputStream.h>
# include <DataStreams/LimitByBlockInputStream.h>
# include <DataStreams/PartialSortingBlockInputStream.h>
# include <DataStreams/MergeSortingBlockInputStream.h>
# include <DataStreams/MergingSortedBlockInputStream.h>
# include <DataStreams/AggregatingBlockInputStream.h>
# include <DataStreams/MergingAggregatedBlockInputStream.h>
# include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
# include <DataStreams/AsynchronousBlockInputStream.h>
# include <DataStreams/UnionBlockInputStream.h>
# include <DataStreams/ParallelAggregatingBlockInputStream.h>
# include <DataStreams/DistinctBlockInputStream.h>
# include <DataStreams/NullBlockInputStream.h>
# include <DataStreams/TotalsHavingBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <DataStreams/CreatingSetsBlockInputStream.h>
# include <DataStreams/MaterializingBlockInputStream.h>
# include <DataStreams/ConcatBlockInputStream.h>
# include <Parsers/ASTSelectQuery.h>
2018-02-26 06:12:59 +00:00
# include <Parsers/ASTSelectWithUnionQuery.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTIdentifier.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Parsers/ASTOrderByElement.h>
# include <Parsers/ASTTablesInSelectQuery.h>
# include <Interpreters/InterpreterSelectQuery.h>
2018-02-25 06:34:20 +00:00
# include <Interpreters/InterpreterSelectWithUnionQuery.h>
2017-04-01 09:19:00 +00:00
# include <Interpreters/InterpreterSetQuery.h>
# include <Interpreters/ExpressionAnalyzer.h>
2017-05-25 01:12:41 +00:00
# include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
2017-04-01 09:19:00 +00:00
# include <Storages/IStorage.h>
2017-05-25 01:12:41 +00:00
# include <Storages/StorageMergeTree.h>
# include <Storages/StorageReplicatedMergeTree.h>
2017-04-01 09:19:00 +00:00
# include <TableFunctions/ITableFunction.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <Core/Field.h>
2017-11-20 06:01:05 +00:00
# include <Columns/Collator.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2014-01-28 19:26:39 +00:00
2015-10-29 15:14:19 +00:00
2011-08-28 05:13:24 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int TOO_DEEP_SUBQUERIES ;
extern const int THERE_IS_NO_COLUMN ;
extern const int SAMPLING_NOT_SUPPORTED ;
extern const int ILLEGAL_FINAL ;
extern const int ILLEGAL_PREWHERE ;
2018-03-09 23:23:15 +00:00
extern const int TOO_MANY_COLUMNS ;
2018-02-21 06:25:21 +00:00
extern const int LOGICAL_ERROR ;
2018-03-16 02:08:31 +00:00
extern const int NOT_IMPLEMENTED ;
2016-01-11 21:46:36 +00:00
}
2018-02-26 21:00:42 +00:00
InterpreterSelectQuery : : InterpreterSelectQuery (
const ASTPtr & query_ptr_ ,
const Context & context_ ,
2018-07-17 13:09:33 +00:00
const Names & required_result_column_names ,
2018-02-26 21:00:42 +00:00
QueryProcessingStage : : Enum to_stage_ ,
size_t subquery_depth_ ,
2018-07-17 13:09:33 +00:00
bool only_analyze_ )
: InterpreterSelectQuery ( query_ptr_ , context_ , nullptr , nullptr , required_result_column_names , to_stage_ , subquery_depth_ , only_analyze_ )
2018-02-26 21:00:42 +00:00
{
}
2018-07-17 13:09:33 +00:00
InterpreterSelectQuery : : InterpreterSelectQuery (
const ASTPtr & query_ptr_ ,
const Context & context_ ,
const BlockInputStreamPtr & input_ ,
QueryProcessingStage : : Enum to_stage_ ,
bool only_analyze_ )
: InterpreterSelectQuery ( query_ptr_ , context_ , input_ , nullptr , Names { } , to_stage_ , 0 , only_analyze_ )
{
}
2018-02-26 21:00:42 +00:00
2018-07-18 12:17:48 +00:00
InterpreterSelectQuery : : InterpreterSelectQuery (
const ASTPtr & query_ptr_ ,
const Context & context_ ,
const StoragePtr & storage_ ,
QueryProcessingStage : : Enum to_stage_ ,
bool only_analyze_ )
: InterpreterSelectQuery ( query_ptr_ , context_ , nullptr , storage_ , Names { } , to_stage_ , 0 , only_analyze_ )
{
}
2018-07-19 13:36:21 +00:00
InterpreterSelectQuery : : ~ InterpreterSelectQuery ( ) = default ;
/** There are no limits on the maximum size of the result for the subquery.
* Since the result of the query is not the result of the entire query .
*/
static Context getSubqueryContext ( const Context & context )
2018-02-26 21:00:42 +00:00
{
2018-07-19 13:36:21 +00:00
Context subquery_context = context ;
Settings subquery_settings = context . getSettings ( ) ;
subquery_settings . max_result_rows = 0 ;
subquery_settings . max_result_bytes = 0 ;
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).
subquery_settings . extremes = 0 ;
subquery_context . setSettings ( subquery_settings ) ;
return subquery_context ;
2018-02-26 21:00:42 +00:00
}
2016-01-11 21:46:36 +00:00
2018-07-17 13:09:33 +00:00
InterpreterSelectQuery : : InterpreterSelectQuery (
const ASTPtr & query_ptr_ ,
const Context & context_ ,
const BlockInputStreamPtr & input_ ,
const StoragePtr & storage_ ,
const Names & required_result_column_names ,
QueryProcessingStage : : Enum to_stage_ ,
size_t subquery_depth_ ,
bool only_analyze_ )
: query_ptr ( query_ptr_ - > clone ( ) ) /// Note: the query is cloned because it will be modified during analysis.
, query ( typeid_cast < ASTSelectQuery & > ( * query_ptr ) )
, context ( context_ )
, to_stage ( to_stage_ )
, subquery_depth ( subquery_depth_ )
, only_analyze ( only_analyze_ )
, storage ( storage_ )
, input ( input_ )
, log ( & Logger : : get ( " InterpreterSelectQuery " ) )
2014-12-26 10:35:03 +00:00
{
2018-03-02 05:44:17 +00:00
if ( ! context . hasQueryContext ( ) )
context . setQueryContext ( context ) ;
2017-04-01 07:20:54 +00:00
initSettings ( ) ;
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-01 07:20:54 +00:00
2018-03-11 00:15:26 +00:00
if ( settings . max_subquery_depth & & subquery_depth > settings . max_subquery_depth )
throw Exception ( " Too deep subqueries. Maximum: " + settings . max_subquery_depth . toString ( ) ,
2017-04-01 07:20:54 +00:00
ErrorCodes : : TOO_DEEP_SUBQUERIES ) ;
2017-07-03 21:04:10 +00:00
max_streams = settings . max_threads ;
2018-02-28 01:29:55 +00:00
const auto & table_expression = query . table ( ) ;
2018-02-15 18:54:12 +00:00
if ( input )
2017-04-01 07:20:54 +00:00
{
2018-02-28 01:29:55 +00:00
/// Read from prepared input.
2018-07-19 13:36:21 +00:00
source_header = input - > getHeader ( ) ;
2017-04-01 07:20:54 +00:00
}
2018-02-28 01:29:55 +00:00
else if ( table_expression & & typeid_cast < const ASTSelectWithUnionQuery * > ( table_expression . get ( ) ) )
2017-04-01 07:20:54 +00:00
{
2018-02-15 18:54:12 +00:00
/// Read from subquery.
2018-07-19 13:36:21 +00:00
interpreter_subquery = std : : make_unique < InterpreterSelectWithUnionQuery > (
table_expression , getSubqueryContext ( context ) , required_columns , QueryProcessingStage : : Complete , subquery_depth + 1 , only_analyze ) ;
source_header = interpreter_subquery - > getSampleBlock ( ) ;
2018-02-28 01:29:55 +00:00
}
2018-07-18 12:17:48 +00:00
else if ( ! storage )
2018-02-28 01:29:55 +00:00
{
2018-07-18 12:17:48 +00:00
if ( table_expression & & typeid_cast < const ASTFunction * > ( table_expression . get ( ) ) )
{
/// Read from table function.
storage = context . getQueryContext ( ) . executeTableFunction ( table_expression ) ;
}
else
{
/// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
String database_name ;
String table_name ;
getDatabaseAndTableNames ( database_name , table_name ) ;
storage = context . getTable ( database_name , table_name ) ;
}
2017-04-01 07:20:54 +00:00
}
2018-02-28 01:29:55 +00:00
if ( storage )
table_lock = storage - > lockStructure ( false , __PRETTY_FUNCTION__ ) ;
2017-04-01 07:20:54 +00:00
2018-02-26 09:05:06 +00:00
query_analyzer = std : : make_unique < ExpressionAnalyzer > (
2018-07-19 13:36:21 +00:00
query_ptr , context , storage , source_header . getNamesAndTypesList ( ) , required_result_column_names , subquery_depth , ! only_analyze ) ;
2017-04-01 07:20:54 +00:00
2018-03-12 15:14:26 +00:00
if ( ! only_analyze )
{
if ( query . sample_size ( ) & & ( input | | ! storage | | ! storage - > supportsSampling ( ) ) )
throw Exception ( " Illegal SAMPLE: table doesn't support sampling " , ErrorCodes : : SAMPLING_NOT_SUPPORTED ) ;
2017-11-05 17:48:50 +00:00
2018-03-12 15:14:26 +00:00
if ( query . final ( ) & & ( input | | ! storage | | ! storage - > supportsFinal ( ) ) )
throw Exception ( ( ! input & & storage ) ? " Storage " + storage - > getName ( ) + " doesn't support FINAL " : " Illegal FINAL " , ErrorCodes : : ILLEGAL_FINAL ) ;
2017-11-05 17:48:50 +00:00
2018-03-12 15:14:26 +00:00
if ( query . prewhere_expression & & ( input | | ! storage | | ! storage - > supportsPrewhere ( ) ) )
throw Exception ( ( ! input & & storage ) ? " Storage " + storage - > getName ( ) + " doesn't support PREWHERE " : " Illegal PREWHERE " , ErrorCodes : : ILLEGAL_PREWHERE ) ;
2017-11-05 17:48:50 +00:00
2018-03-12 15:14:26 +00:00
/// Save the new temporary tables in the query context
for ( const auto & it : query_analyzer - > getExternalTables ( ) )
if ( ! context . tryGetExternalTable ( it . first ) )
context . addExternalTable ( it . first , it . second ) ;
}
2018-07-19 13:36:21 +00:00
if ( interpreter_subquery )
{
/// If there is an aggregation in the outer query, WITH TOTALS is ignored in the subquery.
if ( query_analyzer - > hasAggregation ( ) )
interpreter_subquery - > ignoreWithTotals ( ) ;
}
required_columns = query_analyzer - > getRequiredSourceColumns ( ) ;
if ( storage )
source_header = storage - > getSampleBlockForColumns ( required_columns ) ;
/// Calculate structure of the result.
{
Pipeline pipeline ;
executeImpl ( pipeline , input , true ) ;
result_header = pipeline . firstStream ( ) - > getHeader ( ) ;
}
2014-12-24 14:51:02 +00:00
}
2017-12-22 18:30:42 +00:00
2012-08-20 19:21:04 +00:00
void InterpreterSelectQuery : : getDatabaseAndTableNames ( String & database_name , String & table_name )
2011-08-28 05:13:24 +00:00
{
2017-04-01 07:20:54 +00:00
auto query_database = query . database ( ) ;
auto query_table = query . table ( ) ;
2017-04-02 17:37:49 +00:00
/** If the table is not specified - use the table `system.one`.
* If the database is not specified - use the current database .
2017-04-01 07:20:54 +00:00
*/
if ( query_database )
database_name = typeid_cast < ASTIdentifier & > ( * query_database ) . name ;
if ( query_table )
table_name = typeid_cast < ASTIdentifier & > ( * query_table ) . name ;
if ( ! query_table )
{
database_name = " system " ;
table_name = " one " ;
}
else if ( ! query_database )
{
if ( context . tryGetTable ( " " , table_name ) )
database_name = " " ;
else
database_name = context . getCurrentDatabase ( ) ;
}
2012-08-20 19:21:04 +00:00
}
2011-08-28 05:13:24 +00:00
2011-10-30 05:19:41 +00:00
Block InterpreterSelectQuery : : getSampleBlock ( )
{
2018-07-19 13:36:21 +00:00
return result_header ;
2015-07-13 15:02:29 +00:00
}
2015-06-18 02:11:05 +00:00
BlockIO InterpreterSelectQuery : : execute ( )
2014-12-19 15:56:12 +00:00
{
2018-02-21 03:26:06 +00:00
Pipeline pipeline ;
2018-07-19 13:36:21 +00:00
executeImpl ( pipeline , input , only_analyze ) ;
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2017-04-01 07:20:54 +00:00
BlockIO res ;
2018-02-21 03:26:06 +00:00
res . in = pipeline . firstStream ( ) ;
2017-04-01 07:20:54 +00:00
return res ;
2014-12-16 10:39:02 +00:00
}
2018-02-25 00:50:53 +00:00
BlockInputStreams InterpreterSelectQuery : : executeWithMultipleStreams ( )
2018-02-21 03:26:06 +00:00
{
Pipeline pipeline ;
2018-07-19 13:36:21 +00:00
executeImpl ( pipeline , input , only_analyze ) ;
2018-02-21 03:26:06 +00:00
return pipeline . streams ;
}
2018-02-23 06:00:48 +00:00
2018-07-19 13:36:21 +00:00
InterpreterSelectQuery : : AnalysisResult InterpreterSelectQuery : : analyzeExpressions ( QueryProcessingStage : : Enum from_stage , bool dry_run )
2018-02-23 06:00:48 +00:00
{
AnalysisResult res ;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res . first_stage = from_stage < QueryProcessingStage : : WithMergeableState
& & to_stage > = QueryProcessingStage : : WithMergeableState ;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res . second_stage = from_stage < = QueryProcessingStage : : WithMergeableState
& & to_stage > QueryProcessingStage : : WithMergeableState ;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage , we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query . In unnecessary parts of the query , we will not execute subqueries .
*/
{
ExpressionActionsChain chain ;
res . need_aggregate = query_analyzer - > hasAggregation ( ) ;
2018-07-19 13:36:21 +00:00
query_analyzer - > appendArrayJoin ( chain , dry_run | | ! res . first_stage ) ;
2018-02-23 06:00:48 +00:00
2018-07-19 13:36:21 +00:00
if ( query_analyzer - > appendJoin ( chain , dry_run | | ! res . first_stage ) )
2018-02-23 06:00:48 +00:00
{
res . has_join = true ;
res . before_join = chain . getLastActions ( ) ;
chain . addStep ( ) ;
}
2018-07-19 13:36:21 +00:00
if ( query_analyzer - > appendWhere ( chain , dry_run | | ! res . first_stage ) )
2018-02-23 06:00:48 +00:00
{
res . has_where = true ;
res . before_where = chain . getLastActions ( ) ;
chain . addStep ( ) ;
}
if ( res . need_aggregate )
{
2018-07-19 13:36:21 +00:00
query_analyzer - > appendGroupBy ( chain , dry_run | | ! res . first_stage ) ;
query_analyzer - > appendAggregateFunctionsArguments ( chain , dry_run | | ! res . first_stage ) ;
2018-02-23 06:00:48 +00:00
res . before_aggregation = chain . getLastActions ( ) ;
chain . finalize ( ) ;
chain . clear ( ) ;
2018-07-19 13:36:21 +00:00
if ( query_analyzer - > appendHaving ( chain , dry_run | | ! res . second_stage ) )
2018-02-23 06:00:48 +00:00
{
res . has_having = true ;
res . before_having = chain . getLastActions ( ) ;
chain . addStep ( ) ;
}
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
2018-07-19 13:36:21 +00:00
query_analyzer - > appendSelect ( chain , dry_run | | ( res . need_aggregate ? ! res . second_stage : ! res . first_stage ) ) ;
2018-02-23 06:00:48 +00:00
res . selected_columns = chain . getLastStep ( ) . required_output ;
2018-07-19 13:36:21 +00:00
res . has_order_by = query_analyzer - > appendOrderBy ( chain , dry_run | | ( res . need_aggregate ? ! res . second_stage : ! res . first_stage ) ) ;
2018-02-23 06:00:48 +00:00
res . before_order_and_select = chain . getLastActions ( ) ;
chain . addStep ( ) ;
2018-07-19 13:36:21 +00:00
if ( query_analyzer - > appendLimitBy ( chain , dry_run | | ! res . second_stage ) )
2018-03-01 06:07:04 +00:00
{
res . has_limit_by = true ;
res . before_limit_by = chain . getLastActions ( ) ;
chain . addStep ( ) ;
}
2018-03-01 05:24:56 +00:00
2018-02-23 06:00:48 +00:00
query_analyzer - > appendProjectResult ( chain ) ;
res . final_projection = chain . getLastActions ( ) ;
chain . finalize ( ) ;
chain . clear ( ) ;
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
if ( res . has_where )
res . before_where - > prependProjectInput ( ) ;
if ( res . has_having )
res . before_having - > prependProjectInput ( ) ;
res . subqueries_for_sets = query_analyzer - > getSubqueriesForSets ( ) ;
return res ;
}
2018-02-28 01:29:55 +00:00
void InterpreterSelectQuery : : executeImpl ( Pipeline & pipeline , const BlockInputStreamPtr & input , bool dry_run )
2011-08-28 05:13:24 +00:00
{
2018-02-25 06:34:20 +00:00
if ( input )
pipeline . streams . push_back ( input ) ;
2017-04-02 17:37:49 +00:00
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY , then perform all operations before ORDER BY and LIMIT in parallel , then
* if there is an ORDER BY , then glue the streams using UnionBlockInputStream , and then MergeSortingBlockInputStream ,
* if not , then glue it using UnionBlockInputStream ,
* then apply LIMIT .
* If there is GROUP BY , then we will perform all operations up to GROUP BY , inclusive , in parallel ;
* a parallel GROUP BY will glue streams into one ,
* then perform the remaining operations with one resulting stream .
2017-04-01 07:20:54 +00:00
*/
2018-07-19 13:36:21 +00:00
AnalysisResult expressions ;
if ( dry_run )
{
pipeline . streams . emplace_back ( std : : make_shared < NullBlockInputStream > ( source_header ) ) ;
expressions = analyzeExpressions ( QueryProcessingStage : : FetchColumns , true ) ;
}
else
{
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage : : Enum from_stage = executeFetchColumns ( pipeline ) ;
2018-03-16 02:08:31 +00:00
2018-07-19 13:36:21 +00:00
if ( from_stage = = QueryProcessingStage : : WithMergeableState & & to_stage = = QueryProcessingStage : : WithMergeableState )
throw Exception ( " Distributed on Distributed is not supported " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2017-04-01 07:20:54 +00:00
2018-03-01 01:25:06 +00:00
LOG_TRACE ( log , QueryProcessingStage : : toString ( from_stage ) < < " -> " < < QueryProcessingStage : : toString ( to_stage ) ) ;
2017-04-01 07:20:54 +00:00
2018-07-19 13:36:21 +00:00
expressions = analyzeExpressions ( from_stage , false ) ;
}
2018-02-23 06:00:48 +00:00
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-01 07:20:54 +00:00
if ( to_stage > QueryProcessingStage : : FetchColumns )
{
2017-04-02 17:37:49 +00:00
/// Now we will compose block streams that perform the necessary actions.
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
2017-04-01 07:20:54 +00:00
bool aggregate_overflow_row =
2018-02-23 06:00:48 +00:00
expressions . need_aggregate & &
2017-04-01 07:20:54 +00:00
query . group_by_with_totals & &
2018-03-11 00:15:26 +00:00
settings . max_rows_to_group_by & &
settings . group_by_overflow_mode = = OverflowMode : : ANY & &
2017-04-01 07:20:54 +00:00
settings . totals_mode ! = TotalsMode : : AFTER_HAVING_EXCLUSIVE ;
2017-04-02 17:37:49 +00:00
/// Do I need to immediately finalize the aggregate functions after the aggregation?
2017-04-01 07:20:54 +00:00
bool aggregate_final =
2018-02-23 06:00:48 +00:00
expressions . need_aggregate & &
2017-04-01 07:20:54 +00:00
to_stage > QueryProcessingStage : : WithMergeableState & &
! query . group_by_with_totals ;
2018-02-23 06:00:48 +00:00
if ( expressions . first_stage )
2017-04-01 07:20:54 +00:00
{
2018-02-23 06:00:48 +00:00
if ( expressions . has_join )
2018-02-21 08:16:01 +00:00
{
const ASTTableJoin & join = static_cast < const ASTTableJoin & > ( * query . join ( ) - > table_join ) ;
if ( join . kind = = ASTTableJoin : : Kind : : Full | | join . kind = = ASTTableJoin : : Kind : : Right )
2018-02-23 06:00:48 +00:00
pipeline . stream_with_non_joined_data = expressions . before_join - > createStreamWithNonJoinedDataIfFullOrRightJoin (
2018-02-21 08:16:01 +00:00
pipeline . firstStream ( ) - > getHeader ( ) , settings . max_block_size ) ;
2018-02-21 03:26:06 +00:00
for ( auto & stream : pipeline . streams ) /// Applies to all sources except stream_with_non_joined_data.
2018-02-23 06:00:48 +00:00
stream = std : : make_shared < ExpressionBlockInputStream > ( stream , expressions . before_join ) ;
2018-02-21 08:16:01 +00:00
}
2017-04-01 07:20:54 +00:00
2018-02-23 06:00:48 +00:00
if ( expressions . has_where )
executeWhere ( pipeline , expressions . before_where ) ;
2017-04-01 07:20:54 +00:00
2018-02-23 06:00:48 +00:00
if ( expressions . need_aggregate )
executeAggregation ( pipeline , expressions . before_aggregation , aggregate_overflow_row , aggregate_final ) ;
2017-04-01 07:20:54 +00:00
else
{
2018-02-23 06:00:48 +00:00
executeExpression ( pipeline , expressions . before_order_and_select ) ;
executeDistinct ( pipeline , true , expressions . selected_columns ) ;
2017-04-01 07:20:54 +00:00
}
2017-04-02 17:37:49 +00:00
/** For distributed query processing,
* if no GROUP , HAVING set ,
* but there is an ORDER or LIMIT ,
* then we will perform the preliminary sorting and LIMIT on the remote server .
2017-04-01 07:20:54 +00:00
*/
2018-02-23 06:00:48 +00:00
if ( ! expressions . second_stage & & ! expressions . need_aggregate & & ! expressions . has_having )
2017-04-01 07:20:54 +00:00
{
2018-02-23 06:00:48 +00:00
if ( expressions . has_order_by )
2018-02-21 03:26:06 +00:00
executeOrder ( pipeline ) ;
2017-04-01 07:20:54 +00:00
2018-02-23 06:00:48 +00:00
if ( expressions . has_order_by & & query . limit_length )
executeDistinct ( pipeline , false , expressions . selected_columns ) ;
2017-04-01 07:20:54 +00:00
if ( query . limit_length )
2018-02-21 03:26:06 +00:00
executePreLimit ( pipeline ) ;
2017-04-01 07:20:54 +00:00
}
}
2018-02-23 06:00:48 +00:00
if ( expressions . second_stage )
2017-04-01 07:20:54 +00:00
{
2018-02-25 06:34:20 +00:00
bool need_second_distinct_pass = false ;
bool need_merge_streams = false ;
2017-04-01 07:20:54 +00:00
2018-02-23 06:00:48 +00:00
if ( expressions . need_aggregate )
2017-04-01 07:20:54 +00:00
{
2017-04-02 17:37:49 +00:00
/// If you need to combine aggregated results from multiple servers
2018-02-23 06:00:48 +00:00
if ( ! expressions . first_stage )
2018-02-21 03:26:06 +00:00
executeMergeAggregated ( pipeline , aggregate_overflow_row , aggregate_final ) ;
2017-04-01 07:20:54 +00:00
if ( ! aggregate_final )
2018-02-23 06:00:48 +00:00
executeTotalsAndHaving ( pipeline , expressions . has_having , expressions . before_having , aggregate_overflow_row ) ;
else if ( expressions . has_having )
executeHaving ( pipeline , expressions . before_having ) ;
2017-04-01 07:20:54 +00:00
2018-02-23 06:00:48 +00:00
executeExpression ( pipeline , expressions . before_order_and_select ) ;
executeDistinct ( pipeline , true , expressions . selected_columns ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
need_second_distinct_pass = query . distinct & & pipeline . hasMoreThanOneStream ( ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2018-02-21 03:26:06 +00:00
need_second_distinct_pass = query . distinct & & pipeline . hasMoreThanOneStream ( ) ;
2017-04-01 07:20:54 +00:00
if ( query . group_by_with_totals & & ! aggregate_final )
2018-02-21 03:26:06 +00:00
executeTotalsAndHaving ( pipeline , false , nullptr , aggregate_overflow_row ) ;
2017-04-01 07:20:54 +00:00
}
2018-02-23 06:00:48 +00:00
if ( expressions . has_order_by )
2017-04-01 07:20:54 +00:00
{
2017-04-02 17:37:49 +00:00
/** If there is an ORDER BY for distributed query processing,
* but there is no aggregation , then on the remote servers ORDER BY was made
* - therefore , we merge the sorted streams from remote servers .
2017-04-01 07:20:54 +00:00
*/
2018-02-23 06:00:48 +00:00
if ( ! expressions . first_stage & & ! expressions . need_aggregate & & ! ( query . group_by_with_totals & & ! aggregate_final ) )
2018-02-21 03:26:06 +00:00
executeMergeSorted ( pipeline ) ;
2017-04-02 17:37:49 +00:00
else /// Otherwise, just sort.
2018-02-21 03:26:06 +00:00
executeOrder ( pipeline ) ;
2017-04-01 07:20:54 +00:00
}
2017-04-02 17:37:49 +00:00
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
2018-02-23 06:00:48 +00:00
* limiting the number of rows in each up to ` offset + limit ` .
*/
2018-03-01 06:07:04 +00:00
if ( query . limit_length & & pipeline . hasMoreThanOneStream ( ) & & ! query . distinct & & ! expressions . has_limit_by & & ! settings . extremes )
2018-02-25 06:34:20 +00:00
{
2018-02-21 03:26:06 +00:00
executePreLimit ( pipeline ) ;
2018-02-25 06:34:20 +00:00
}
2017-04-01 07:20:54 +00:00
2018-02-25 06:34:20 +00:00
if ( need_second_distinct_pass
| | query . limit_length
| | query . limit_by_expression_list
| | pipeline . stream_with_non_joined_data )
{
need_merge_streams = true ;
}
2017-04-01 07:20:54 +00:00
2018-02-25 06:34:20 +00:00
if ( need_merge_streams )
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2017-04-01 07:20:54 +00:00
2018-03-01 01:25:06 +00:00
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams .
*/
if ( need_second_distinct_pass )
2018-03-01 05:42:44 +00:00
executeDistinct ( pipeline , false , expressions . selected_columns ) ;
2017-04-01 07:20:54 +00:00
2018-03-01 06:07:04 +00:00
if ( expressions . has_limit_by )
2018-03-01 05:24:56 +00:00
{
executeExpression ( pipeline , expressions . before_limit_by ) ;
executeLimitBy ( pipeline ) ;
}
2018-03-01 01:25:06 +00:00
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection ( pipeline , expressions . final_projection ) ;
/** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
*/
executeExtremes ( pipeline ) ;
executeLimit ( pipeline ) ;
2017-04-01 07:20:54 +00:00
}
}
2018-02-23 06:00:48 +00:00
if ( ! expressions . subqueries_for_sets . empty ( ) )
executeSubqueriesInSetsAndJoins ( pipeline , expressions . subqueries_for_sets ) ;
2012-05-09 13:12:38 +00:00
}
2011-08-28 05:13:24 +00:00
2012-05-09 13:12:38 +00:00
static void getLimitLengthAndOffset ( ASTSelectQuery & query , size_t & length , size_t & offset )
{
2017-04-01 07:20:54 +00:00
length = 0 ;
offset = 0 ;
if ( query . limit_length )
{
length = safeGet < UInt64 > ( typeid_cast < ASTLiteral & > ( * query . limit_length ) . value ) ;
if ( query . limit_offset )
offset = safeGet < UInt64 > ( typeid_cast < ASTLiteral & > ( * query . limit_offset ) . value ) ;
}
2012-05-09 13:12:38 +00:00
}
2018-07-19 13:36:21 +00:00
QueryProcessingStage : : Enum InterpreterSelectQuery : : executeFetchColumns ( Pipeline & pipeline )
2012-05-09 13:12:38 +00:00
{
2017-04-02 17:37:49 +00:00
/// Actions to calculate ALIAS if required.
2017-04-01 07:20:54 +00:00
ExpressionActionsPtr alias_actions ;
2017-04-02 17:37:49 +00:00
/// Are ALIAS columns required for query execution?
2017-04-01 07:20:54 +00:00
auto alias_columns_required = false ;
2018-03-13 14:18:11 +00:00
if ( storage & & ! storage - > getColumns ( ) . aliases . empty ( ) )
2017-04-01 07:20:54 +00:00
{
2018-03-13 14:18:11 +00:00
const auto & column_defaults = storage - > getColumns ( ) . defaults ;
2017-04-01 07:20:54 +00:00
for ( const auto & column : required_columns )
{
2018-03-13 14:18:11 +00:00
const auto default_it = column_defaults . find ( column ) ;
if ( default_it ! = std : : end ( column_defaults ) & & default_it - > second . kind = = ColumnDefaultKind : : Alias )
2017-04-01 07:20:54 +00:00
{
alias_columns_required = true ;
break ;
}
}
if ( alias_columns_required )
{
2017-04-02 17:37:49 +00:00
/// We will create an expression to return all the requested columns, with the calculation of the required ALIAS columns.
2017-04-01 07:20:54 +00:00
auto required_columns_expr_list = std : : make_shared < ASTExpressionList > ( ) ;
for ( const auto & column : required_columns )
{
2018-03-13 14:18:11 +00:00
const auto default_it = column_defaults . find ( column ) ;
if ( default_it ! = std : : end ( column_defaults ) & & default_it - > second . kind = = ColumnDefaultKind : : Alias )
2017-04-01 07:20:54 +00:00
required_columns_expr_list - > children . emplace_back ( setAlias ( default_it - > second . expression - > clone ( ) , column ) ) ;
else
2018-02-26 03:37:08 +00:00
required_columns_expr_list - > children . emplace_back ( std : : make_shared < ASTIdentifier > ( column ) ) ;
2017-04-01 07:20:54 +00:00
}
2018-02-28 01:29:55 +00:00
alias_actions = ExpressionAnalyzer ( required_columns_expr_list , context , storage ) . getActions ( true ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
2017-04-01 07:20:54 +00:00
required_columns = alias_actions - > getRequiredColumns ( ) ;
}
}
2017-05-24 20:25:01 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
/// Limitation on the number of columns to read.
2018-07-19 13:36:21 +00:00
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if ( ! only_analyze & & settings . max_columns_to_read & & required_columns . size ( ) > settings . max_columns_to_read )
2017-05-24 20:25:01 +00:00
throw Exception ( " Limit for number of columns to read exceeded. "
" Requested: " + toString ( required_columns . size ( ) )
2018-03-11 00:15:26 +00:00
+ " , maximum: " + settings . max_columns_to_read . toString ( ) ,
2018-03-09 23:23:15 +00:00
ErrorCodes : : TOO_MANY_COLUMNS ) ;
2017-05-24 20:25:01 +00:00
size_t limit_length = 0 ;
size_t limit_offset = 0 ;
getLimitLengthAndOffset ( query , limit_length , limit_offset ) ;
2017-04-02 17:37:49 +00:00
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers .
* If we have 20 remote servers , and max_threads = 8 , then it would not be very good
* connect and ask only 8 servers at a time .
* To simultaneously query more remote servers ,
* instead of max_threads , max_distributed_connections is used .
2017-04-01 07:20:54 +00:00
*/
bool is_remote = false ;
if ( storage & & storage - > isRemote ( ) )
{
is_remote = true ;
2017-05-24 20:25:01 +00:00
max_streams = settings . max_distributed_connections ;
2017-04-01 07:20:54 +00:00
}
2018-02-25 06:34:20 +00:00
size_t max_block_size = settings . max_block_size ;
2017-04-02 17:37:49 +00:00
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset ( not to read more from the table than requested ) ,
* and also set the number of threads to 1.
2017-04-01 07:20:54 +00:00
*/
if ( ! query . distinct
& & ! query . prewhere_expression
& & ! query . where_expression
& & ! query . group_expression_list
& & ! query . having_expression
& & ! query . order_expression_list
& & ! query . limit_by_expression_list
& & query . limit_length
& & ! query_analyzer - > hasAggregation ( )
2018-02-25 06:34:20 +00:00
& & limit_length + limit_offset < max_block_size )
2017-04-01 07:20:54 +00:00
{
2017-05-24 20:25:01 +00:00
max_block_size = limit_length + limit_offset ;
max_streams = 1 ;
2017-04-01 07:20:54 +00:00
}
QueryProcessingStage : : Enum from_stage = QueryProcessingStage : : FetchColumns ;
2018-02-21 06:25:21 +00:00
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if ( ! pipeline . streams . empty ( ) )
{
/// Prepared input.
}
else if ( interpreter_subquery )
2017-04-01 07:20:54 +00:00
{
2018-02-21 06:25:21 +00:00
/// Subquery.
2018-03-01 08:46:59 +00:00
2018-07-19 13:36:21 +00:00
/// If we need less number of columns that subquery have - update the interpreter.
if ( required_columns . size ( ) < source_header . columns ( ) )
{
interpreter_subquery = std : : make_unique < InterpreterSelectWithUnionQuery > (
query . table ( ) , getSubqueryContext ( context ) , required_columns , QueryProcessingStage : : Complete , subquery_depth + 1 , only_analyze ) ;
if ( query_analyzer - > hasAggregation ( ) )
interpreter_subquery - > ignoreWithTotals ( ) ;
}
pipeline . streams = interpreter_subquery - > executeWithMultipleStreams ( ) ;
2018-02-21 06:25:21 +00:00
}
else if ( storage )
{
/// Table.
2017-04-01 07:20:54 +00:00
if ( max_streams = = 0 )
throw Exception ( " Logical error: zero number of streams requested " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-02 17:37:49 +00:00
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
2017-04-01 07:20:54 +00:00
if ( max_streams > 1 & & ! is_remote )
max_streams * = settings . max_streams_to_max_threads_ratio ;
2018-04-20 19:14:04 +00:00
query_analyzer - > makeSetsForIndex ( ) ;
2017-07-15 03:48:36 +00:00
SelectQueryInfo query_info ;
query_info . query = query_ptr ;
query_info . sets = query_analyzer - > getPreparedSets ( ) ;
2017-05-25 01:12:41 +00:00
/// PREWHERE optimization
{
auto optimize_prewhere = [ & ] ( auto & merge_tree )
{
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
2017-06-15 14:07:31 +00:00
if ( settings . optimize_move_to_prewhere & & query . where_expression & & ! query . prewhere_expression & & ! query . final ( ) )
2017-07-15 03:48:36 +00:00
MergeTreeWhereOptimizer { query_info , context , merge_tree . getData ( ) , required_columns , log } ;
2017-05-25 01:12:41 +00:00
} ;
2017-11-04 16:46:14 +00:00
if ( const StorageMergeTree * merge_tree = dynamic_cast < const StorageMergeTree * > ( storage . get ( ) ) )
2017-05-25 01:12:41 +00:00
optimize_prewhere ( * merge_tree ) ;
2017-11-04 16:46:14 +00:00
else if ( const StorageReplicatedMergeTree * merge_tree = dynamic_cast < const StorageReplicatedMergeTree * > ( storage . get ( ) ) )
2017-05-25 01:12:41 +00:00
optimize_prewhere ( * merge_tree ) ;
}
2018-07-19 13:36:21 +00:00
pipeline . streams = storage - > read ( required_columns , query_info , context , from_stage , max_block_size , max_streams ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
if ( pipeline . streams . empty ( ) )
2018-03-15 16:22:43 +00:00
pipeline . streams . emplace_back ( std : : make_shared < NullBlockInputStream > ( storage - > getSampleBlockForColumns ( required_columns ) ) ) ;
2018-01-07 00:35:44 +00:00
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream - > addTableLock ( table_lock ) ;
} ) ;
2018-04-02 18:01:25 +00:00
/// Set the limits and quota for reading data, the speed and time of the query.
2017-04-01 07:20:54 +00:00
{
2018-02-21 06:25:21 +00:00
IProfilingBlockInputStream : : LocalLimits limits ;
limits . mode = IProfilingBlockInputStream : : LIMITS_TOTAL ;
2018-03-11 00:15:26 +00:00
limits . size_limits = SizeLimits ( settings . max_rows_to_read , settings . max_bytes_to_read , settings . read_overflow_mode ) ;
limits . max_execution_time = settings . max_execution_time ;
limits . timeout_overflow_mode = settings . timeout_overflow_mode ;
2018-04-02 18:01:25 +00:00
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers .
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server , because these limits are checked per block of data processed ,
* and remote servers may process way more blocks of data than are received by initiator .
*/
if ( to_stage = = QueryProcessingStage : : Complete )
{
limits . min_execution_speed = settings . min_execution_speed ;
limits . timeout_before_checking_execution_speed = settings . timeout_before_checking_execution_speed ;
}
2018-02-21 06:25:21 +00:00
QuotaForIntervals & quota = context . getQuota ( ) ;
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
2018-02-21 06:25:21 +00:00
if ( IProfilingBlockInputStream * p_stream = dynamic_cast < IProfilingBlockInputStream * > ( stream . get ( ) ) )
{
p_stream - > setLimits ( limits ) ;
2018-04-02 18:01:25 +00:00
if ( to_stage = = QueryProcessingStage : : Complete )
p_stream - > setQuota ( quota ) ;
2018-02-21 06:25:21 +00:00
}
} ) ;
}
2017-04-01 07:20:54 +00:00
}
2018-02-21 06:25:21 +00:00
else
throw Exception ( " Logical error in InterpreterSelectQuery: nowhere to read " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
2018-02-26 21:00:42 +00:00
/// Aliases in table declaration.
2018-02-27 19:00:55 +00:00
if ( from_stage = = QueryProcessingStage : : FetchColumns & & alias_actions )
2018-02-26 21:00:42 +00:00
{
pipeline . transform ( [ & ] ( auto & stream )
{
stream = std : : make_shared < ExpressionBlockInputStream > ( stream , alias_actions ) ;
} ) ;
}
2017-04-01 07:20:54 +00:00
return from_stage ;
2012-05-09 13:12:38 +00:00
}
2011-08-28 05:13:24 +00:00
2012-05-09 13:12:38 +00:00
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeWhere ( Pipeline & pipeline , const ExpressionActionsPtr & expression )
2012-08-27 05:13:14 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < FilterBlockInputStream > ( stream , expression , query . where_expression - > getColumnName ( ) ) ;
} ) ;
2012-08-27 05:13:14 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeAggregation ( Pipeline & pipeline , const ExpressionActionsPtr & expression , bool overflow_row , bool final )
2012-05-09 13:12:38 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < ExpressionBlockInputStream > ( stream , expression ) ;
} ) ;
Names key_names ;
AggregateDescriptions aggregates ;
query_analyzer - > getAggregateInfo ( key_names , aggregates ) ;
2018-02-21 03:26:06 +00:00
Block header = pipeline . firstStream ( ) - > getHeader ( ) ;
2018-01-06 18:10:44 +00:00
ColumnNumbers keys ;
for ( const auto & name : key_names )
keys . push_back ( header . getPositionByName ( name ) ) ;
for ( auto & descr : aggregates )
if ( descr . arguments . empty ( ) )
for ( const auto & name : descr . argument_names )
descr . arguments . push_back ( header . getPositionByName ( name ) ) ;
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-02 17:37:49 +00:00
/** Two-level aggregation is useful in two cases:
2017-07-04 12:46:31 +00:00
* 1. Parallel aggregation is done , and the results should be merged in parallel .
* 2. An aggregation is done with store of temporary data on the disk , and they need to be merged in a memory efficient way .
2017-04-01 07:20:54 +00:00
*/
2018-03-11 00:15:26 +00:00
bool allow_to_use_two_level_group_by = pipeline . streams . size ( ) > 1 | | settings . max_bytes_before_external_group_by ! = 0 ;
2017-04-01 07:20:54 +00:00
2018-01-06 18:10:44 +00:00
Aggregator : : Params params ( header , keys , aggregates ,
2018-03-11 00:15:26 +00:00
overflow_row , settings . max_rows_to_group_by , settings . group_by_overflow_mode ,
2017-04-01 07:20:54 +00:00
settings . compile ? & context . getCompiler ( ) : nullptr , settings . min_count_to_compile ,
allow_to_use_two_level_group_by ? settings . group_by_two_level_threshold : SettingUInt64 ( 0 ) ,
allow_to_use_two_level_group_by ? settings . group_by_two_level_threshold_bytes : SettingUInt64 ( 0 ) ,
2018-03-11 00:15:26 +00:00
settings . max_bytes_before_external_group_by , settings . empty_result_for_aggregation_by_empty_set ,
2018-02-18 05:35:48 +00:00
context . getTemporaryPath ( ) ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// If there are several sources, then we perform parallel aggregation
2018-02-21 03:26:06 +00:00
if ( pipeline . streams . size ( ) > 1 )
2017-04-01 07:20:54 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < ParallelAggregatingBlockInputStream > (
pipeline . streams , pipeline . stream_with_non_joined_data , params , final ,
2017-07-03 21:04:10 +00:00
max_streams ,
2017-04-01 07:20:54 +00:00
settings . aggregation_memory_efficient_merge_threads
2017-07-03 21:04:10 +00:00
? static_cast < size_t > ( settings . aggregation_memory_efficient_merge_threads )
2017-07-25 19:43:23 +00:00
: static_cast < size_t > ( settings . max_threads ) ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . stream_with_non_joined_data = nullptr ;
pipeline . streams . resize ( 1 ) ;
2017-04-01 07:20:54 +00:00
}
else
{
BlockInputStreams inputs ;
2018-02-21 03:26:06 +00:00
if ( ! pipeline . streams . empty ( ) )
inputs . push_back ( pipeline . firstStream ( ) ) ;
2017-04-01 07:20:54 +00:00
else
2018-02-21 03:26:06 +00:00
pipeline . streams . resize ( 1 ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
if ( pipeline . stream_with_non_joined_data )
inputs . push_back ( pipeline . stream_with_non_joined_data ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < AggregatingBlockInputStream > ( std : : make_shared < ConcatBlockInputStream > ( inputs ) , params , final ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . stream_with_non_joined_data = nullptr ;
2017-04-01 07:20:54 +00:00
}
2012-05-09 13:12:38 +00:00
}
2011-11-06 22:00:39 +00:00
2012-05-09 13:12:38 +00:00
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeMergeAggregated ( Pipeline & pipeline , bool overflow_row , bool final )
2012-05-30 01:38:02 +00:00
{
2017-04-01 07:20:54 +00:00
Names key_names ;
AggregateDescriptions aggregates ;
query_analyzer - > getAggregateInfo ( key_names , aggregates ) ;
2018-02-21 03:26:06 +00:00
Block header = pipeline . firstStream ( ) - > getHeader ( ) ;
2018-01-07 00:35:44 +00:00
2018-01-06 18:10:44 +00:00
ColumnNumbers keys ;
for ( const auto & name : key_names )
keys . push_back ( header . getPositionByName ( name ) ) ;
2017-04-02 17:37:49 +00:00
/** There are two modes of distributed aggregation.
2017-04-01 07:20:54 +00:00
*
2017-04-02 17:37:49 +00:00
* 1. In different threads read from the remote servers blocks .
* Save all the blocks in the RAM . Merge blocks .
* If the aggregation is two - level - parallelize to the number of buckets .
2017-04-01 07:20:54 +00:00
*
2017-04-02 17:37:49 +00:00
* 2. In one thread , read blocks from different servers in order .
* RAM stores only one block from each server .
* If the aggregation is a two - level aggregation , we consistently merge the blocks of each next level .
2017-04-01 07:20:54 +00:00
*
2017-04-02 17:37:49 +00:00
* The second option consumes less memory ( up to 256 times less )
* in the case of two - level aggregation , which is used for large results after GROUP BY ,
* but it can work more slowly .
2017-04-01 07:20:54 +00:00
*/
2018-01-06 18:10:44 +00:00
Aggregator : : Params params ( header , keys , aggregates , overflow_row ) ;
2017-04-01 07:20:54 +00:00
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-01 07:20:54 +00:00
if ( ! settings . distributed_aggregation_memory_efficient )
{
2017-04-02 17:37:49 +00:00
/// We union several sources into one, parallelizing the work.
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Now merge the aggregated blocks
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < MergingAggregatedBlockInputStream > ( pipeline . firstStream ( ) , params , final , settings . max_threads ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < MergingAggregatedMemoryEfficientBlockInputStream > ( pipeline . streams , params , final ,
2017-07-03 21:04:10 +00:00
max_streams ,
2017-04-01 07:20:54 +00:00
settings . aggregation_memory_efficient_merge_threads
2017-07-25 19:43:23 +00:00
? static_cast < size_t > ( settings . aggregation_memory_efficient_merge_threads )
: static_cast < size_t > ( settings . max_threads ) ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . streams . resize ( 1 ) ;
2017-04-01 07:20:54 +00:00
}
2012-05-09 13:12:38 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeHaving ( Pipeline & pipeline , const ExpressionActionsPtr & expression )
2012-05-09 13:12:38 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < FilterBlockInputStream > ( stream , expression , query . having_expression - > getColumnName ( ) ) ;
} ) ;
2012-05-09 13:12:38 +00:00
}
2011-09-25 03:37:09 +00:00
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeTotalsAndHaving ( Pipeline & pipeline , bool has_having , const ExpressionActionsPtr & expression , bool overflow_row )
2014-02-27 12:49:21 +00:00
{
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2014-12-19 12:48:09 +00:00
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < TotalsHavingBlockInputStream > (
pipeline . firstStream ( ) , overflow_row , expression ,
2017-04-01 07:20:54 +00:00
has_having ? query . having_expression - > getColumnName ( ) : " " , settings . totals_mode , settings . totals_auto_threshold ) ;
2014-02-27 12:49:21 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeExpression ( Pipeline & pipeline , const ExpressionActionsPtr & expression )
2012-05-09 13:12:38 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < ExpressionBlockInputStream > ( stream , expression ) ;
} ) ;
2012-05-09 13:12:38 +00:00
}
2015-01-18 08:27:28 +00:00
static SortDescription getSortDescription ( ASTSelectQuery & query )
2012-05-09 13:12:38 +00:00
{
2017-04-01 07:20:54 +00:00
SortDescription order_descr ;
order_descr . reserve ( query . order_expression_list - > children . size ( ) ) ;
for ( const auto & elem : query . order_expression_list - > children )
{
String name = elem - > children . front ( ) - > getColumnName ( ) ;
const ASTOrderByElement & order_by_elem = typeid_cast < const ASTOrderByElement & > ( * elem ) ;
2015-01-07 15:30:11 +00:00
2017-04-01 07:20:54 +00:00
std : : shared_ptr < Collator > collator ;
if ( order_by_elem . collation )
collator = std : : make_shared < Collator > ( typeid_cast < const ASTLiteral & > ( * order_by_elem . collation ) . value . get < String > ( ) ) ;
2016-11-20 12:43:20 +00:00
2017-04-01 07:20:54 +00:00
order_descr . emplace_back ( name , order_by_elem . direction , order_by_elem . nulls_direction , collator ) ;
}
2014-12-19 12:48:09 +00:00
2017-04-01 07:20:54 +00:00
return order_descr ;
2015-01-18 08:27:28 +00:00
}
static size_t getLimitForSorting ( ASTSelectQuery & query )
{
2017-04-01 07:20:54 +00:00
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
size_t limit = 0 ;
if ( ! query . distinct & & ! query . limit_by_expression_list )
{
size_t limit_length = 0 ;
size_t limit_offset = 0 ;
getLimitLengthAndOffset ( query , limit_length , limit_offset ) ;
limit = limit_length + limit_offset ;
}
return limit ;
2015-01-18 08:27:28 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeOrder ( Pipeline & pipeline )
2015-01-18 08:27:28 +00:00
{
2017-04-01 07:20:54 +00:00
SortDescription order_descr = getSortDescription ( query ) ;
size_t limit = getLimitForSorting ( query ) ;
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
auto sorting_stream = std : : make_shared < PartialSortingBlockInputStream > ( stream , order_descr , limit ) ;
2017-04-02 17:37:49 +00:00
/// Limits on sorting
2017-04-01 07:20:54 +00:00
IProfilingBlockInputStream : : LocalLimits limits ;
limits . mode = IProfilingBlockInputStream : : LIMITS_TOTAL ;
2018-03-11 00:15:26 +00:00
limits . size_limits = SizeLimits ( settings . max_rows_to_sort , settings . max_bytes_to_sort , settings . sort_overflow_mode ) ;
2017-04-01 07:20:54 +00:00
sorting_stream - > setLimits ( limits ) ;
stream = sorting_stream ;
} ) ;
2017-04-02 17:37:49 +00:00
/// If there are several streams, we merge them into one
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2017-04-01 07:20:54 +00:00
2017-04-02 17:37:49 +00:00
/// Merge the sorted blocks.
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < MergeSortingBlockInputStream > (
pipeline . firstStream ( ) , order_descr , settings . max_block_size , limit ,
2018-03-11 00:15:26 +00:00
settings . max_bytes_before_external_sort , context . getTemporaryPath ( ) ) ;
2013-06-03 10:18:41 +00:00
}
2012-02-27 06:28:20 +00:00
2012-07-25 20:29:22 +00:00
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeMergeSorted ( Pipeline & pipeline )
2015-01-18 08:27:28 +00:00
{
2017-04-01 07:20:54 +00:00
SortDescription order_descr = getSortDescription ( query ) ;
size_t limit = getLimitForSorting ( query ) ;
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-02 17:37:49 +00:00
/// If there are several streams, then we merge them into one
2018-02-21 03:26:06 +00:00
if ( pipeline . hasMoreThanOneStream ( ) )
2017-04-01 07:20:54 +00:00
{
2017-04-02 17:37:49 +00:00
/** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel , we wrap it in AsynchronousBlockInputStream .
2017-04-01 07:20:54 +00:00
*/
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < AsynchronousBlockInputStream > ( stream ) ;
} ) ;
2017-04-02 17:37:49 +00:00
/// Merge the sorted sources into one sorted source.
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < MergingSortedBlockInputStream > ( pipeline . streams , order_descr , settings . max_block_size , limit ) ;
pipeline . streams . resize ( 1 ) ;
2017-04-01 07:20:54 +00:00
}
2015-01-18 08:27:28 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeProjection ( Pipeline & pipeline , const ExpressionActionsPtr & expression )
2013-06-03 10:18:41 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < ExpressionBlockInputStream > ( stream , expression ) ;
} ) ;
2012-05-09 13:12:38 +00:00
}
2012-02-27 06:28:20 +00:00
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeDistinct ( Pipeline & pipeline , bool before_order , Names columns )
2013-06-01 07:43:57 +00:00
{
2017-04-01 07:20:54 +00:00
if ( query . distinct )
{
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2017-04-01 07:20:54 +00:00
size_t limit_length = 0 ;
size_t limit_offset = 0 ;
getLimitLengthAndOffset ( query , limit_length , limit_offset ) ;
size_t limit_for_distinct = 0 ;
2017-04-02 17:37:49 +00:00
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
2017-04-01 07:20:54 +00:00
if ( ! query . order_expression_list | | ! before_order )
limit_for_distinct = limit_length + limit_offset ;
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
2018-03-11 00:15:26 +00:00
SizeLimits limits ( settings . max_rows_in_distinct , settings . max_bytes_in_distinct , settings . distinct_overflow_mode ) ;
2018-05-31 18:28:04 +00:00
stream = std : : make_shared < DistinctBlockInputStream > ( stream , limits , limit_for_distinct , columns ) ;
2017-04-01 07:20:54 +00:00
} ) ;
}
2013-06-01 07:43:57 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeUnion ( Pipeline & pipeline )
2012-05-09 13:12:38 +00:00
{
2017-04-02 17:37:49 +00:00
/// If there are still several streams, then we combine them into one
2018-02-21 03:26:06 +00:00
if ( pipeline . hasMoreThanOneStream ( ) )
2017-04-01 07:20:54 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . firstStream ( ) = std : : make_shared < UnionBlockInputStream < > > ( pipeline . streams , pipeline . stream_with_non_joined_data , max_streams ) ;
pipeline . stream_with_non_joined_data = nullptr ;
pipeline . streams . resize ( 1 ) ;
2017-04-01 07:20:54 +00:00
}
2018-02-21 03:26:06 +00:00
else if ( pipeline . stream_with_non_joined_data )
2017-04-01 07:20:54 +00:00
{
2018-02-21 03:26:06 +00:00
pipeline . streams . push_back ( pipeline . stream_with_non_joined_data ) ;
pipeline . stream_with_non_joined_data = nullptr ;
2017-04-01 07:20:54 +00:00
}
2012-05-09 13:12:38 +00:00
}
2017-04-02 17:37:49 +00:00
/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined.
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executePreLimit ( Pipeline & pipeline )
2012-05-09 13:12:38 +00:00
{
2017-04-01 07:20:54 +00:00
size_t limit_length = 0 ;
size_t limit_offset = 0 ;
getLimitLengthAndOffset ( query , limit_length , limit_offset ) ;
2017-04-02 17:37:49 +00:00
/// If there is LIMIT
2017-04-01 07:20:54 +00:00
if ( query . limit_length )
{
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
2017-12-21 08:14:26 +00:00
stream = std : : make_shared < LimitBlockInputStream > ( stream , limit_length + limit_offset , 0 , false ) ;
2017-04-01 07:20:54 +00:00
} ) ;
}
2012-06-24 23:17:06 +00:00
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeLimitBy ( Pipeline & pipeline )
2016-12-28 15:12:54 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! query . limit_by_value | | ! query . limit_by_expression_list )
return ;
Names columns ;
for ( const auto & elem : query . limit_by_expression_list - > children )
2018-03-01 05:24:56 +00:00
columns . emplace_back ( elem - > getColumnName ( ) ) ;
2018-02-25 06:34:20 +00:00
size_t value = safeGet < UInt64 > ( typeid_cast < ASTLiteral & > ( * query . limit_by_value ) . value ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
2018-02-25 06:34:20 +00:00
stream = std : : make_shared < LimitByBlockInputStream > ( stream , value , columns ) ;
2017-04-01 07:20:54 +00:00
} ) ;
2016-12-28 15:12:54 +00:00
}
2018-02-26 06:12:59 +00:00
bool hasWithTotalsInAnySubqueryInFromClause ( const ASTSelectQuery & query )
{
if ( query . group_by_with_totals )
return true ;
/** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard.
* In other cases , totals will be computed on the initiating server of the query , and it is not necessary to read the data to the end .
*/
auto query_table = query . table ( ) ;
if ( query_table )
{
auto ast_union = typeid_cast < const ASTSelectWithUnionQuery * > ( query_table . get ( ) ) ;
if ( ast_union )
{
for ( const auto & elem : ast_union - > list_of_selects - > children )
if ( hasWithTotalsInAnySubqueryInFromClause ( typeid_cast < const ASTSelectQuery & > ( * elem ) ) )
return true ;
}
}
return false ;
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeLimit ( Pipeline & pipeline )
2012-06-24 23:17:06 +00:00
{
2017-04-01 07:20:54 +00:00
size_t limit_length = 0 ;
size_t limit_offset = 0 ;
getLimitLengthAndOffset ( query , limit_length , limit_offset ) ;
2017-04-02 17:37:49 +00:00
/// If there is LIMIT
2017-04-01 07:20:54 +00:00
if ( query . limit_length )
{
2017-04-02 17:37:49 +00:00
/** Rare case:
* if there is no WITH TOTALS and there is a subquery in FROM , and there is WITH TOTALS on one of the levels ,
* then when using LIMIT , you should read the data to the end , rather than cancel the query earlier ,
* because if you cancel the query , we will not get ` totals ` data from the remote server .
2017-04-01 07:20:54 +00:00
*
2017-04-02 17:37:49 +00:00
* Another case :
* if there is WITH TOTALS and there is no ORDER BY , then read the data to the end ,
* otherwise TOTALS is counted according to incomplete data .
2017-04-01 07:20:54 +00:00
*/
bool always_read_till_end = false ;
if ( query . group_by_with_totals & & ! query . order_expression_list )
always_read_till_end = true ;
2018-02-26 06:12:59 +00:00
if ( ! query . group_by_with_totals & & hasWithTotalsInAnySubqueryInFromClause ( query ) )
always_read_till_end = true ;
2017-04-01 07:20:54 +00:00
2018-02-21 03:26:06 +00:00
pipeline . transform ( [ & ] ( auto & stream )
2017-04-01 07:20:54 +00:00
{
stream = std : : make_shared < LimitBlockInputStream > ( stream , limit_length , limit_offset , always_read_till_end ) ;
} ) ;
}
2011-08-28 05:13:24 +00:00
}
2018-02-28 02:32:34 +00:00
void InterpreterSelectQuery : : executeExtremes ( Pipeline & pipeline )
{
if ( ! context . getSettingsRef ( ) . extremes )
return ;
pipeline . transform ( [ & ] ( auto & stream )
{
if ( IProfilingBlockInputStream * p_stream = dynamic_cast < IProfilingBlockInputStream * > ( stream . get ( ) ) )
p_stream - > enableExtremes ( ) ;
} ) ;
}
2018-02-21 03:26:06 +00:00
void InterpreterSelectQuery : : executeSubqueriesInSetsAndJoins ( Pipeline & pipeline , SubqueriesForSets & subqueries_for_sets )
2014-03-04 11:26:55 +00:00
{
2017-05-24 20:13:04 +00:00
const Settings & settings = context . getSettingsRef ( ) ;
2018-02-21 03:26:06 +00:00
executeUnion ( pipeline ) ;
2018-03-11 00:15:26 +00:00
pipeline . firstStream ( ) = std : : make_shared < CreatingSetsBlockInputStream > (
pipeline . firstStream ( ) , subqueries_for_sets ,
SizeLimits ( settings . max_rows_to_transfer , settings . max_bytes_to_transfer , settings . transfer_overflow_mode ) ) ;
2016-11-12 17:55:40 +00:00
}
2014-03-04 11:26:55 +00:00
2015-05-06 23:35:37 +00:00
void InterpreterSelectQuery : : ignoreWithTotals ( )
{
2017-04-01 07:20:54 +00:00
query . group_by_with_totals = false ;
2015-05-06 23:35:37 +00:00
}
2015-06-05 21:28:04 +00:00
void InterpreterSelectQuery : : initSettings ( )
{
2017-04-01 07:20:54 +00:00
if ( query . settings )
InterpreterSetQuery ( query . settings , context ) . executeForCurrentContext ( ) ;
2015-06-05 21:28:04 +00:00
}
2011-08-28 05:13:24 +00:00
}