2022-09-15 14:09:30 +00:00
# include <Planner/PlannerJoinTree.h>
2023-03-20 10:29:27 +00:00
# include <Common/scope_guard_safe.h>
# include <Columns/ColumnAggregateFunction.h>
2022-09-15 14:09:30 +00:00
# include <DataTypes/DataTypeString.h>
2023-03-01 20:19:51 +00:00
# include <DataTypes/DataTypeAggregateFunction.h>
2023-02-24 12:46:09 +00:00
# include <DataTypes/DataTypeLowCardinality.h>
2022-09-15 14:09:30 +00:00
# include <Functions/FunctionFactory.h>
2023-03-20 10:29:27 +00:00
# include <AggregateFunctions/AggregateFunctionCount.h>
2022-10-12 14:16:01 +00:00
# include <Access/Common/AccessFlags.h>
# include <Access/ContextAccess.h>
2022-09-15 14:09:30 +00:00
# include <Storages/IStorage.h>
2023-07-11 11:57:19 +00:00
# include <Storages/MergeTree/MergeTreeData.h>
2022-09-28 11:20:24 +00:00
# include <Storages/StorageDictionary.h>
2023-03-27 13:07:24 +00:00
# include <Storages/StorageDistributed.h>
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
# include <Analyzer/ConstantNode.h>
2022-09-15 14:09:30 +00:00
# include <Analyzer/ColumnNode.h>
2023-03-20 10:29:27 +00:00
# include <Analyzer/FunctionNode.h>
2022-09-15 14:09:30 +00:00
# include <Analyzer/TableNode.h>
# include <Analyzer/TableFunctionNode.h>
# include <Analyzer/QueryNode.h>
# include <Analyzer/UnionNode.h>
# include <Analyzer/JoinNode.h>
# include <Analyzer/ArrayJoinNode.h>
2022-12-23 17:45:28 +00:00
# include <Analyzer/Utils.h>
2023-03-01 20:19:51 +00:00
# include <Analyzer/AggregationUtils.h>
2023-03-20 10:29:27 +00:00
# include <Analyzer/Passes/QueryAnalysisPass.h>
# include <Analyzer/QueryTreeBuilder.h>
2022-09-15 14:09:30 +00:00
2023-04-11 14:08:28 +00:00
# include <Parsers/ExpressionListParsers.h>
# include <Parsers/parseQuery.h>
2022-09-15 14:09:30 +00:00
# include <Processors/Sources/NullSource.h>
2022-10-18 11:35:04 +00:00
# include <Processors/QueryPlan/SortingStep.h>
# include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
2022-09-15 14:09:30 +00:00
# include <Processors/QueryPlan/ReadFromPreparedSource.h>
# include <Processors/QueryPlan/ExpressionStep.h>
2023-03-20 10:29:27 +00:00
# include <Processors/QueryPlan/FilterStep.h>
2022-09-15 14:09:30 +00:00
# include <Processors/QueryPlan/JoinStep.h>
# include <Processors/QueryPlan/ArrayJoinStep.h>
2023-03-01 20:19:51 +00:00
# include <Processors/Sources/SourceFromSingleChunk.h>
2022-09-15 14:09:30 +00:00
2023-08-24 10:32:13 +00:00
# include <Storages/StorageDummy.h>
2023-07-11 11:57:19 +00:00
# include <Interpreters/ArrayJoinAction.h>
2022-09-15 14:09:30 +00:00
# include <Interpreters/Context.h>
2023-07-11 11:57:19 +00:00
# include <Interpreters/HashJoin.h>
2022-09-15 14:09:30 +00:00
# include <Interpreters/IJoin.h>
2023-07-11 11:57:19 +00:00
# include <Interpreters/InterpreterSelectQuery.h>
2022-09-15 14:09:30 +00:00
# include <Interpreters/TableJoin.h>
2023-03-27 13:07:24 +00:00
# include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
2022-09-15 14:09:30 +00:00
2023-01-11 11:54:28 +00:00
# include <Planner/CollectColumnIdentifiers.h>
2022-09-15 14:09:30 +00:00
# include <Planner/Planner.h>
# include <Planner/PlannerJoins.h>
# include <Planner/PlannerActionsVisitor.h>
# include <Planner/Utils.h>
2023-03-20 10:29:27 +00:00
# include <Planner/CollectSets.h>
# include <Planner/CollectTableExpressionData.h>
2023-02-28 11:35:36 +00:00
2022-09-15 14:09:30 +00:00
namespace DB
{
2022-09-27 15:04:03 +00:00
namespace ErrorCodes
{
extern const int INVALID_JOIN_ON_EXPRESSION ;
2022-10-04 20:45:47 +00:00
extern const int LOGICAL_ERROR ;
extern const int NOT_IMPLEMENTED ;
extern const int SYNTAX_ERROR ;
2022-10-12 14:16:01 +00:00
extern const int ACCESS_DENIED ;
2022-12-23 17:45:28 +00:00
extern const int PARAMETER_OUT_OF_BOUND ;
extern const int TOO_MANY_COLUMNS ;
extern const int UNSUPPORTED_METHOD ;
2023-03-20 10:29:27 +00:00
extern const int BAD_ARGUMENTS ;
2022-09-27 15:04:03 +00:00
}
2022-09-15 14:09:30 +00:00
namespace
{
2022-10-12 14:16:01 +00:00
/// Check if current user has privileges to SELECT columns from table
void checkAccessRights ( const TableNode & table_node , const Names & column_names , const ContextPtr & query_context )
{
2023-08-24 10:54:11 +00:00
/// StorageDummy is created on preliminary stage, ignore access check for it.
2023-08-24 10:32:13 +00:00
if ( typeid_cast < const StorageDummy * > ( table_node . getStorage ( ) . get ( ) ) )
return ;
2022-10-12 14:16:01 +00:00
const auto & storage_id = table_node . getStorageID ( ) ;
const auto & storage_snapshot = table_node . getStorageSnapshot ( ) ;
if ( column_names . empty ( ) )
{
/** For a trivial queries like "SELECT count() FROM table", "SELECT 1 FROM table" access is granted if at least
* one table column is accessible .
*/
auto access = query_context - > getAccess ( ) ;
for ( const auto & column : storage_snapshot - > metadata - > getColumns ( ) )
{
if ( access - > isGranted ( AccessType : : SELECT , storage_id . database_name , storage_id . table_name , column . name ) )
return ;
}
throw Exception ( ErrorCodes : : ACCESS_DENIED ,
2023-08-06 12:48:20 +00:00
" {}: Not enough privileges. To execute this query, it's necessary to have the grant SELECT for at least one column on {} " ,
2022-10-12 14:16:01 +00:00
query_context - > getUserName ( ) ,
storage_id . getFullTableName ( ) ) ;
}
2023-06-05 11:02:23 +00:00
// In case of cross-replication we don't know what database is used for the table.
// `storage_id.hasDatabase()` can return false only on the initiator node.
// Each shard will use the default database (in the case of cross-replication shards may have different defaults).
2023-05-22 11:54:33 +00:00
if ( storage_id . hasDatabase ( ) )
query_context - > checkAccess ( AccessType : : SELECT , storage_id , column_names ) ;
2022-10-12 14:16:01 +00:00
}
2023-08-11 23:04:08 +00:00
bool shouldIgnoreQuotaAndLimits ( const TableNode & table_node )
{
const auto & storage_id = table_node . getStorageID ( ) ;
if ( ! storage_id . hasDatabase ( ) )
return false ;
if ( storage_id . database_name = = DatabaseCatalog : : SYSTEM_DATABASE )
{
static const boost : : container : : flat_set < String > tables_ignoring_quota { " quotas " , " quota_limits " , " quota_usage " , " quotas_usage " , " one " } ;
if ( tables_ignoring_quota . count ( storage_id . table_name ) )
return true ;
}
return false ;
}
2023-01-09 10:34:47 +00:00
NameAndTypePair chooseSmallestColumnToReadFromStorage ( const StoragePtr & storage , const StorageSnapshotPtr & storage_snapshot )
{
/** We need to read at least one column to find the number of rows.
* We will find a column with minimum < compressed_size , type_size , uncompressed_size > .
* Because it is the column that is cheapest to read .
*/
class ColumnWithSize
{
public :
ColumnWithSize ( NameAndTypePair column_ , ColumnSize column_size_ )
: column ( std : : move ( column_ ) )
, compressed_size ( column_size_ . data_compressed )
, uncompressed_size ( column_size_ . data_uncompressed )
, type_size ( column . type - > haveMaximumSizeOfValue ( ) ? column . type - > getMaximumSizeOfValueInMemory ( ) : 100 )
{
}
bool operator < ( const ColumnWithSize & rhs ) const
{
return std : : tie ( compressed_size , type_size , uncompressed_size )
< std : : tie ( rhs . compressed_size , rhs . type_size , rhs . uncompressed_size ) ;
}
NameAndTypePair column ;
size_t compressed_size = 0 ;
size_t uncompressed_size = 0 ;
size_t type_size = 0 ;
} ;
std : : vector < ColumnWithSize > columns_with_sizes ;
auto column_sizes = storage - > getColumnSizes ( ) ;
auto column_names_and_types = storage_snapshot - > getColumns ( GetColumnsOptions ( GetColumnsOptions : : AllPhysical ) . withSubcolumns ( ) ) ;
if ( ! column_sizes . empty ( ) )
{
for ( auto & column_name_and_type : column_names_and_types )
{
auto it = column_sizes . find ( column_name_and_type . name ) ;
if ( it = = column_sizes . end ( ) )
continue ;
columns_with_sizes . emplace_back ( column_name_and_type , it - > second ) ;
}
}
NameAndTypePair result ;
if ( ! columns_with_sizes . empty ( ) )
result = std : : min_element ( columns_with_sizes . begin ( ) , columns_with_sizes . end ( ) ) - > column ;
else
/// If we have no information about columns sizes, choose a column of minimum size of its data type
result = ExpressionActions : : getSmallestColumn ( column_names_and_types ) ;
return result ;
}
2023-02-26 22:19:01 +00:00
bool applyTrivialCountIfPossible (
QueryPlan & query_plan ,
2023-08-22 17:01:42 +00:00
SelectQueryInfo & select_query_info ,
2023-08-24 15:05:45 +00:00
const TableNode * table_node ,
const TableFunctionNode * table_function_node ,
2023-02-27 17:37:29 +00:00
const QueryTreeNodePtr & query_tree ,
2023-06-06 16:38:32 +00:00
ContextMutablePtr & query_context ,
2023-02-26 22:19:01 +00:00
const Names & columns_names )
{
const auto & settings = query_context - > getSettingsRef ( ) ;
if ( ! settings . optimize_trivial_count_query )
return false ;
2023-08-24 15:05:45 +00:00
const auto & storage = table_node ? table_node - > getStorage ( ) : table_function_node - > getStorage ( ) ;
2023-06-23 08:44:13 +00:00
if ( ! storage - > supportsTrivialCountOptimization ( ) )
return false ;
2023-03-20 10:29:27 +00:00
auto storage_id = storage - > getStorageID ( ) ;
auto row_policy_filter = query_context - > getRowPolicyFilter ( storage_id . getDatabaseName ( ) ,
storage_id . getTableName ( ) ,
RowPolicyFilterType : : SELECT_FILTER ) ;
if ( row_policy_filter )
return { } ;
2023-08-31 13:39:18 +00:00
if ( select_query_info . additional_filter_ast )
return false ;
2023-03-20 10:29:27 +00:00
/** Transaction check here is necessary because
* MergeTree maintains total count for all parts in Active state and it simply returns that number for trivial select count ( ) from table query .
* But if we have current transaction , then we should return number of rows in current snapshot ( that may include parts in Outdated state ) ,
* so we have to use totalRowsByPartitionPredicate ( ) instead of totalRows even for trivial query
* See https : //github.com/ClickHouse/ClickHouse/pull/24258/files#r828182031
*/
if ( query_context - > getCurrentTransaction ( ) )
return false ;
2023-02-27 10:56:59 +00:00
/// can't apply if FINAL
2023-08-24 15:05:45 +00:00
if ( table_node & & table_node - > getTableExpressionModifiers ( ) . has_value ( ) & &
( table_node - > getTableExpressionModifiers ( ) - > hasFinal ( ) | | table_node - > getTableExpressionModifiers ( ) - > hasSampleSizeRatio ( ) | |
table_node - > getTableExpressionModifiers ( ) - > hasSampleOffsetRatio ( ) ) )
return false ;
else if ( table_function_node & & table_function_node - > getTableExpressionModifiers ( ) . has_value ( ) & &
( table_function_node - > getTableExpressionModifiers ( ) - > hasFinal ( ) | | table_function_node - > getTableExpressionModifiers ( ) - > hasSampleSizeRatio ( ) | |
table_function_node - > getTableExpressionModifiers ( ) - > hasSampleOffsetRatio ( ) ) )
2023-02-27 17:37:29 +00:00
return false ;
2023-03-20 10:29:27 +00:00
// TODO: It's possible to optimize count() given only partition predicates
2023-02-27 17:37:29 +00:00
auto & main_query_node = query_tree - > as < QueryNode & > ( ) ;
2023-03-20 10:29:27 +00:00
if ( main_query_node . hasGroupBy ( ) | | main_query_node . hasPrewhere ( ) | | main_query_node . hasWhere ( ) )
2023-02-27 10:56:59 +00:00
return false ;
2023-03-20 10:29:27 +00:00
if ( storage - > hasLightweightDeletedMask ( ) )
2023-02-27 17:37:29 +00:00
return false ;
2023-02-26 22:19:01 +00:00
2023-06-06 16:38:32 +00:00
if ( settings . allow_experimental_query_deduplication
2023-03-01 20:19:51 +00:00
| | settings . empty_result_for_aggregation_by_empty_set )
2023-02-26 22:19:01 +00:00
return false ;
2023-02-27 17:37:29 +00:00
QueryTreeNodes aggregates = collectAggregateFunctionNodes ( query_tree ) ;
2023-02-26 22:19:01 +00:00
if ( aggregates . size ( ) ! = 1 )
return false ;
2023-03-01 20:19:51 +00:00
const auto & function_node = aggregates . front ( ) . get ( ) - > as < const FunctionNode & > ( ) ;
chassert ( function_node . getAggregateFunction ( ) ! = nullptr ) ;
const auto * count_func = typeid_cast < const AggregateFunctionCount * > ( function_node . getAggregateFunction ( ) . get ( ) ) ;
2023-02-26 22:19:01 +00:00
if ( ! count_func )
return false ;
2023-08-22 17:01:42 +00:00
/// Some storages can optimize trivial count in read() method instead of totalRows() because it still can
/// require reading some data (but much faster than reading columns).
/// Set a special flag in query info so the storage will see it and optimize count in read() method.
select_query_info . optimize_trivial_count = true ;
2023-03-20 10:29:27 +00:00
/// Get number of rows
std : : optional < UInt64 > num_rows = storage - > totalRows ( settings ) ;
2023-02-26 22:19:01 +00:00
if ( ! num_rows )
return false ;
2023-06-06 16:38:32 +00:00
if ( settings . max_parallel_replicas > 1 )
{
if ( ! settings . parallel_replicas_custom_key . value . empty ( ) | | settings . allow_experimental_parallel_reading_from_replicas = = 0 )
return false ;
/// The query could use trivial count if it didn't use parallel replicas, so let's disable it
query_context - > setSetting ( " allow_experimental_parallel_reading_from_replicas " , Field ( 0 ) ) ;
query_context - > setSetting ( " max_parallel_replicas " , UInt64 { 0 } ) ;
LOG_TRACE ( & Poco : : Logger : : get ( " Planner " ) , " Disabling parallel replicas to be able to use a trivial count optimization " ) ;
}
2023-03-20 10:29:27 +00:00
/// Set aggregation state
2023-02-26 22:19:01 +00:00
const AggregateFunctionCount & agg_count = * count_func ;
std : : vector < char > state ( agg_count . sizeOfData ( ) ) ;
AggregateDataPtr place = state . data ( ) ;
agg_count . create ( place ) ;
2023-02-28 11:35:36 +00:00
SCOPE_EXIT_MEMORY_SAFE ( agg_count . destroy ( place ) ) ;
2023-02-26 22:19:01 +00:00
agg_count . set ( place , num_rows . value ( ) ) ;
2023-03-01 20:19:51 +00:00
auto column = ColumnAggregateFunction : : create ( function_node . getAggregateFunction ( ) ) ;
2023-02-26 22:19:01 +00:00
column - > insertFrom ( place ) ;
/// get count() argument type
DataTypes argument_types ;
argument_types . reserve ( columns_names . size ( ) ) ;
{
2023-08-24 15:05:45 +00:00
const Block source_header = table_node ? table_node - > getStorageSnapshot ( ) - > getSampleBlockForColumns ( columns_names )
: table_function_node - > getStorageSnapshot ( ) - > getSampleBlockForColumns ( columns_names ) ;
2023-02-26 22:19:01 +00:00
for ( const auto & column_name : columns_names )
argument_types . push_back ( source_header . getByName ( column_name ) . type ) ;
}
Block block_with_count {
{ std : : move ( column ) ,
2023-03-01 20:19:51 +00:00
std : : make_shared < DataTypeAggregateFunction > ( function_node . getAggregateFunction ( ) , argument_types , Array { } ) ,
2023-02-26 22:19:01 +00:00
columns_names . front ( ) } } ;
auto source = std : : make_shared < SourceFromSingleChunk > ( block_with_count ) ;
auto prepared_count = std : : make_unique < ReadFromPreparedSource > ( Pipe ( std : : move ( source ) ) ) ;
prepared_count - > setStepDescription ( " Optimized trivial count " ) ;
query_plan . addStep ( std : : move ( prepared_count ) ) ;
return true ;
}
2023-03-03 16:01:50 +00:00
void prepareBuildQueryPlanForTableExpression ( const QueryTreeNodePtr & table_expression , PlannerContextPtr & planner_context )
{
const auto & query_context = planner_context - > getQueryContext ( ) ;
const auto & settings = query_context - > getSettingsRef ( ) ;
auto & table_expression_data = planner_context - > getTableExpressionDataOrThrow ( table_expression ) ;
auto columns_names = table_expression_data . getColumnNames ( ) ;
auto * table_node = table_expression - > as < TableNode > ( ) ;
auto * table_function_node = table_expression - > as < TableFunctionNode > ( ) ;
auto * query_node = table_expression - > as < QueryNode > ( ) ;
auto * union_node = table_expression - > as < UnionNode > ( ) ;
/** The current user must have the SELECT privilege.
* We do not check access rights for table functions because they have been already checked in ITableFunction : : execute ( ) .
*/
if ( table_node )
{
auto column_names_with_aliases = columns_names ;
const auto & alias_columns_names = table_expression_data . getAliasColumnsNames ( ) ;
column_names_with_aliases . insert ( column_names_with_aliases . end ( ) , alias_columns_names . begin ( ) , alias_columns_names . end ( ) ) ;
checkAccessRights ( * table_node , column_names_with_aliases , query_context ) ;
}
if ( columns_names . empty ( ) )
{
NameAndTypePair additional_column_to_read ;
if ( table_node | | table_function_node )
{
const auto & storage = table_node ? table_node - > getStorage ( ) : table_function_node - > getStorage ( ) ;
const auto & storage_snapshot = table_node ? table_node - > getStorageSnapshot ( ) : table_function_node - > getStorageSnapshot ( ) ;
additional_column_to_read = chooseSmallestColumnToReadFromStorage ( storage , storage_snapshot ) ;
}
else if ( query_node | | union_node )
{
const auto & projection_columns = query_node ? query_node - > getProjectionColumns ( ) : union_node - > computeProjectionColumns ( ) ;
NamesAndTypesList projection_columns_list ( projection_columns . begin ( ) , projection_columns . end ( ) ) ;
additional_column_to_read = ExpressionActions : : getSmallestColumn ( projection_columns_list ) ;
}
else
{
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Expected table, table function, query or union. Actual {} " ,
table_expression - > formatASTForErrorMessage ( ) ) ;
}
auto & global_planner_context = planner_context - > getGlobalPlannerContext ( ) ;
const auto & column_identifier = global_planner_context - > createColumnIdentifier ( additional_column_to_read , table_expression ) ;
columns_names . push_back ( additional_column_to_read . name ) ;
table_expression_data . addColumn ( additional_column_to_read , column_identifier ) ;
}
/// Limitation on the number of columns to read
if ( settings . max_columns_to_read & & columns_names . size ( ) > settings . max_columns_to_read )
throw Exception ( ErrorCodes : : TOO_MANY_COLUMNS ,
" Limit for number of columns to read exceeded. Requested: {}, maximum: {} " ,
columns_names . size ( ) ,
settings . max_columns_to_read ) ;
}
2023-03-20 10:29:27 +00:00
void updatePrewhereOutputsIfNeeded ( SelectQueryInfo & table_expression_query_info ,
const Names & column_names ,
const StorageSnapshotPtr & storage_snapshot )
{
if ( ! table_expression_query_info . prewhere_info )
return ;
auto & prewhere_actions = table_expression_query_info . prewhere_info - > prewhere_actions ;
NameSet required_columns ;
if ( column_names . size ( ) = = 1 )
required_columns . insert ( column_names [ 0 ] ) ;
auto & table_expression_modifiers = table_expression_query_info . table_expression_modifiers ;
if ( table_expression_modifiers )
{
if ( table_expression_modifiers - > hasSampleSizeRatio ( ) | |
table_expression_query_info . planner_context - > getQueryContext ( ) - > getSettingsRef ( ) . parallel_replicas_count > 1 )
{
/// We evaluate sampling for Merge lazily so we need to get all the columns
if ( storage_snapshot - > storage . getName ( ) = = " Merge " )
{
const auto columns = storage_snapshot - > getMetadataForQuery ( ) - > getColumns ( ) . getAll ( ) ;
for ( const auto & column : columns )
required_columns . insert ( column . name ) ;
}
else
{
auto columns_required_for_sampling = storage_snapshot - > getMetadataForQuery ( ) - > getColumnsRequiredForSampling ( ) ;
required_columns . insert ( columns_required_for_sampling . begin ( ) , columns_required_for_sampling . end ( ) ) ;
}
}
if ( table_expression_modifiers - > hasFinal ( ) )
{
auto columns_required_for_final = storage_snapshot - > getMetadataForQuery ( ) - > getColumnsRequiredForFinal ( ) ;
required_columns . insert ( columns_required_for_final . begin ( ) , columns_required_for_final . end ( ) ) ;
}
}
std : : unordered_set < const ActionsDAG : : Node * > required_output_nodes ;
for ( const auto * input : prewhere_actions - > getInputs ( ) )
{
if ( required_columns . contains ( input - > result_name ) )
required_output_nodes . insert ( input ) ;
}
if ( required_output_nodes . empty ( ) )
return ;
auto & prewhere_outputs = prewhere_actions - > getOutputs ( ) ;
for ( const auto & output : prewhere_outputs )
{
auto required_output_node_it = required_output_nodes . find ( output ) ;
if ( required_output_node_it = = required_output_nodes . end ( ) )
continue ;
required_output_nodes . erase ( required_output_node_it ) ;
}
prewhere_outputs . insert ( prewhere_outputs . end ( ) , required_output_nodes . begin ( ) , required_output_nodes . end ( ) ) ;
}
2023-03-27 13:07:24 +00:00
FilterDAGInfo buildRowPolicyFilterIfNeeded ( const StoragePtr & storage ,
SelectQueryInfo & table_expression_query_info ,
PlannerContextPtr & planner_context )
{
auto storage_id = storage - > getStorageID ( ) ;
const auto & query_context = planner_context - > getQueryContext ( ) ;
auto row_policy_filter = query_context - > getRowPolicyFilter ( storage_id . getDatabaseName ( ) , storage_id . getTableName ( ) , RowPolicyFilterType : : SELECT_FILTER ) ;
if ( ! row_policy_filter )
return { } ;
2023-03-20 10:29:27 +00:00
2023-04-06 19:18:26 +00:00
return buildFilterInfo ( row_policy_filter - > expression , table_expression_query_info . table_expression , planner_context ) ;
2023-03-27 13:07:24 +00:00
}
FilterDAGInfo buildCustomKeyFilterIfNeeded ( const StoragePtr & storage ,
2023-03-28 07:47:37 +00:00
SelectQueryInfo & table_expression_query_info ,
PlannerContextPtr & planner_context )
2023-03-27 13:07:24 +00:00
{
const auto & query_context = planner_context - > getQueryContext ( ) ;
const auto & settings = query_context - > getSettingsRef ( ) ;
if ( settings . parallel_replicas_count < = 1 | | settings . parallel_replicas_custom_key . value . empty ( ) )
return { } ;
auto custom_key_ast = parseCustomKeyForTable ( settings . parallel_replicas_custom_key , * query_context ) ;
if ( ! custom_key_ast )
throw DB : : Exception (
ErrorCodes : : BAD_ARGUMENTS ,
2023-03-27 13:51:42 +00:00
" Parallel replicas processing with custom_key has been requested "
2023-03-27 13:07:24 +00:00
" (setting 'max_parallel_replcias'), but the table does not have custom_key defined for it "
" or it's invalid (setting 'parallel_replicas_custom_key') " ) ;
LOG_TRACE ( & Poco : : Logger : : get ( " Planner " ) , " Processing query on a replica using custom_key '{}' " , settings . parallel_replicas_custom_key . value ) ;
auto parallel_replicas_custom_filter_ast = getCustomKeyFilterForParallelReplica (
settings . parallel_replicas_count ,
settings . parallel_replica_offset ,
std : : move ( custom_key_ast ) ,
settings . parallel_replicas_custom_key_filter_type ,
* storage ,
query_context ) ;
2023-04-06 19:18:26 +00:00
return buildFilterInfo ( parallel_replicas_custom_filter_ast , table_expression_query_info . table_expression , planner_context ) ;
2023-03-20 10:29:27 +00:00
}
2023-04-04 20:33:59 +00:00
/// Apply filters from additional_table_filters setting
FilterDAGInfo buildAdditionalFiltersIfNeeded ( const StoragePtr & storage ,
const String & table_expression_alias ,
SelectQueryInfo & table_expression_query_info ,
PlannerContextPtr & planner_context )
{
const auto & query_context = planner_context - > getQueryContext ( ) ;
const auto & settings = query_context - > getSettingsRef ( ) ;
auto const & additional_filters = settings . additional_table_filters . value ;
if ( additional_filters . empty ( ) )
return { } ;
auto const & storage_id = storage - > getStorageID ( ) ;
ASTPtr additional_filter_ast ;
2023-05-09 19:19:37 +00:00
for ( const auto & additional_filter : additional_filters )
2023-04-04 20:33:59 +00:00
{
2023-05-09 19:19:37 +00:00
const auto & tuple = additional_filter . safeGet < const Tuple & > ( ) ;
2023-04-04 20:33:59 +00:00
auto const & table = tuple . at ( 0 ) . safeGet < String > ( ) ;
auto const & filter = tuple . at ( 1 ) . safeGet < String > ( ) ;
if ( table = = table_expression_alias | |
( table = = storage_id . getTableName ( ) & & query_context - > getCurrentDatabase ( ) = = storage_id . getDatabaseName ( ) ) | |
( table = = storage_id . getFullNameNotQuoted ( ) ) )
{
ParserExpression parser ;
additional_filter_ast = parseQuery (
parser , filter . data ( ) , filter . data ( ) + filter . size ( ) ,
" additional filter " , settings . max_query_size , settings . max_parser_depth ) ;
break ;
}
}
if ( ! additional_filter_ast )
return { } ;
2023-04-05 16:13:03 +00:00
table_expression_query_info . additional_filter_ast = additional_filter_ast ;
2023-04-06 19:18:26 +00:00
return buildFilterInfo ( additional_filter_ast , table_expression_query_info . table_expression , planner_context ) ;
2023-04-04 20:33:59 +00:00
}
2023-03-03 16:01:50 +00:00
JoinTreeQueryPlan buildQueryPlanForTableExpression ( QueryTreeNodePtr table_expression ,
2022-12-23 17:45:28 +00:00
const SelectQueryInfo & select_query_info ,
2022-09-15 14:09:30 +00:00
const SelectQueryOptions & select_query_options ,
2022-12-23 17:45:28 +00:00
PlannerContextPtr & planner_context ,
2023-03-03 16:01:50 +00:00
bool is_single_table_expression ,
bool wrap_read_columns_in_subquery )
2022-09-15 14:09:30 +00:00
{
2022-12-23 17:45:28 +00:00
const auto & query_context = planner_context - > getQueryContext ( ) ;
const auto & settings = query_context - > getSettingsRef ( ) ;
2023-03-03 16:01:50 +00:00
auto & table_expression_data = planner_context - > getTableExpressionDataOrThrow ( table_expression ) ;
2022-12-23 17:45:28 +00:00
QueryProcessingStage : : Enum from_stage = QueryProcessingStage : : Enum : : FetchColumns ;
2023-03-03 16:01:50 +00:00
if ( wrap_read_columns_in_subquery )
{
auto columns = table_expression_data . getColumns ( ) ;
table_expression = buildSubqueryToReadColumnsFromTableExpression ( columns , table_expression , query_context ) ;
}
2022-09-15 14:09:30 +00:00
auto * table_node = table_expression - > as < TableNode > ( ) ;
auto * table_function_node = table_expression - > as < TableFunctionNode > ( ) ;
auto * query_node = table_expression - > as < QueryNode > ( ) ;
auto * union_node = table_expression - > as < UnionNode > ( ) ;
QueryPlan query_plan ;
if ( table_node | | table_function_node )
{
const auto & storage = table_node ? table_node - > getStorage ( ) : table_function_node - > getStorage ( ) ;
const auto & storage_snapshot = table_node ? table_node - > getStorageSnapshot ( ) : table_function_node - > getStorageSnapshot ( ) ;
auto table_expression_query_info = select_query_info ;
table_expression_query_info . table_expression = table_expression ;
2023-08-24 10:32:56 +00:00
table_expression_query_info . filter_actions_dag = table_expression_data . getFilterActions ( ) ;
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
size_t max_streams = settings . max_threads ;
size_t max_threads_execute_query = settings . max_threads ;
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers .
* If we have 20 remote servers , and max_threads = 8 , then it would not be efficient to
* connect and ask only 8 servers at a time .
* To simultaneously query more remote servers ,
* instead of max_threads , max_distributed_connections is used .
*/
bool is_remote = table_expression_data . isRemote ( ) ;
if ( is_remote )
{
max_streams = settings . max_distributed_connections ;
max_threads_execute_query = settings . max_distributed_connections ;
}
UInt64 max_block_size = settings . max_block_size ;
auto & main_query_node = select_query_info . query_tree - > as < QueryNode & > ( ) ;
if ( is_single_table_expression )
{
size_t limit_length = 0 ;
if ( main_query_node . hasLimit ( ) )
{
/// Constness of limit is validated during query analysis stage
limit_length = main_query_node . getLimit ( ) - > as < ConstantNode & > ( ) . getValue ( ) . safeGet < UInt64 > ( ) ;
}
size_t limit_offset = 0 ;
if ( main_query_node . hasOffset ( ) )
{
/// Constness of offset is validated during query analysis stage
limit_offset = main_query_node . getOffset ( ) - > as < ConstantNode & > ( ) . getValue ( ) . safeGet < UInt64 > ( ) ;
}
/** If not specified DISTINCT, WHERE, GROUP BY, HAVING, ORDER BY, JOIN, LIMIT BY, LIMIT WITH TIES
* but LIMIT is specified , and limit + offset < max_block_size ,
* then as the block size we will use limit + offset ( not to read more from the table than requested ) ,
* and also set the number of threads to 1.
*/
if ( main_query_node . hasLimit ( ) & &
! main_query_node . isDistinct ( ) & &
! main_query_node . isLimitWithTies ( ) & &
! main_query_node . hasPrewhere ( ) & &
! main_query_node . hasWhere ( ) & &
select_query_info . filter_asts . empty ( ) & &
! main_query_node . hasGroupBy ( ) & &
! main_query_node . hasHaving ( ) & &
! main_query_node . hasOrderBy ( ) & &
! main_query_node . hasLimitBy ( ) & &
! select_query_info . need_aggregate & &
! select_query_info . has_window & &
limit_length < = std : : numeric_limits < UInt64 > : : max ( ) - limit_offset )
{
if ( limit_length + limit_offset < max_block_size )
{
max_block_size = std : : max < UInt64 > ( 1 , limit_length + limit_offset ) ;
max_streams = 1 ;
max_threads_execute_query = 1 ;
}
if ( limit_length + limit_offset < select_query_info . local_storage_limits . local_limits . size_limits . max_rows )
{
table_expression_query_info . limit = limit_length + limit_offset ;
}
}
if ( ! max_block_size )
throw Exception ( ErrorCodes : : PARAMETER_OUT_OF_BOUND ,
" Setting 'max_block_size' cannot be zero " ) ;
}
if ( max_streams = = 0 )
max_streams = 1 ;
2023-01-13 16:53:53 +00:00
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads
2022-12-23 17:45:28 +00:00
if ( max_streams > 1 & & ! is_remote )
max_streams = static_cast < size_t > ( max_streams * settings . max_streams_to_max_threads_ratio ) ;
2022-09-15 14:09:30 +00:00
if ( table_node )
table_expression_query_info . table_expression_modifiers = table_node - > getTableExpressionModifiers ( ) ;
else
table_expression_query_info . table_expression_modifiers = table_function_node - > getTableExpressionModifiers ( ) ;
2023-03-20 10:29:27 +00:00
bool need_rewrite_query_with_final = storage - > needRewriteQueryWithFinal ( table_expression_data . getColumnNames ( ) ) ;
2022-10-12 11:26:02 +00:00
if ( need_rewrite_query_with_final )
2022-09-15 14:09:30 +00:00
{
2022-10-12 11:26:02 +00:00
if ( table_expression_query_info . table_expression_modifiers )
{
const auto & table_expression_modifiers = table_expression_query_info . table_expression_modifiers ;
auto sample_size_ratio = table_expression_modifiers - > getSampleSizeRatio ( ) ;
auto sample_offset_ratio = table_expression_modifiers - > getSampleOffsetRatio ( ) ;
table_expression_query_info . table_expression_modifiers = TableExpressionModifiers ( true /*has_final*/ ,
sample_size_ratio ,
sample_offset_ratio ) ;
}
else
{
table_expression_query_info . table_expression_modifiers = TableExpressionModifiers ( true /*has_final*/ ,
{ } /*sample_size_ratio*/ ,
{ } /*sample_offset_ratio*/ ) ;
}
2022-09-15 14:09:30 +00:00
}
2023-03-01 20:19:51 +00:00
/// Apply trivial_count optimization if possible
2023-03-20 10:29:27 +00:00
bool is_trivial_count_applied = ! select_query_options . only_analyze & &
is_single_table_expression & &
2023-08-24 15:05:45 +00:00
( table_node | | table_function_node ) & &
2023-03-20 10:29:27 +00:00
select_query_info . has_aggregates & &
2023-08-31 13:39:18 +00:00
settings . additional_table_filters . value . empty ( ) & &
2023-08-24 15:05:45 +00:00
applyTrivialCountIfPossible ( query_plan , table_expression_query_info , table_node , table_function_node , select_query_info . query_tree , planner_context - > getMutableQueryContext ( ) , table_expression_data . getColumnNames ( ) ) ;
2022-10-12 11:26:02 +00:00
2023-02-26 22:37:06 +00:00
if ( is_trivial_count_applied )
2022-09-15 14:09:30 +00:00
{
2023-03-06 10:47:32 +00:00
from_stage = QueryProcessingStage : : WithMergeableState ;
2022-12-23 17:45:28 +00:00
}
else
{
2023-03-03 17:44:08 +00:00
if ( ! select_query_options . only_analyze )
{
2023-07-11 11:57:19 +00:00
auto storage_merge_tree = std : : dynamic_pointer_cast < MergeTreeData > ( storage ) ;
if ( storage_merge_tree & & query_context - > canUseParallelReplicasOnInitiator ( )
& & settings . parallel_replicas_min_number_of_rows_per_replica > 0 )
{
/// This is trash
/// It uses the old InterpreterSelectQuery to do the estimation of how many rows will be read
/// Ideally we should be able to use estimateNumberOfRowsToRead over the storage, but to do this
/// properly we need all the actions/filters, which aren't available yet
/// If we instead delay this check for later several things will happen:
/// * The header might be different (updatePrewhereOutputsIfNeeded)
/// * The storage will have been initiated (thus already preparing parallel replicas)
auto query_options = SelectQueryOptions (
QueryProcessingStage : : WithMergeableState ,
/* depth */ 1 ,
/* is_subquery_= */ true )
. ignoreProjections ( )
. ignoreAlias ( ) ;
InterpreterSelectQuery select (
table_expression_query_info . original_query ,
query_context ,
query_options ,
table_expression_query_info . prepared_sets ) ;
select . adjustParallelReplicasAfterAnalysis ( ) ;
planner_context - > getMutableQueryContext ( ) - > setSetting (
" allow_experimental_parallel_reading_from_replicas " ,
select . getContext ( ) - > getSettingsRef ( ) . allow_experimental_parallel_reading_from_replicas . operator Field ( ) ) ;
planner_context - > getMutableQueryContext ( ) - > setSetting (
" max_parallel_replicas " , select . getContext ( ) - > getSettingsRef ( ) . max_parallel_replicas . operator Field ( ) ) ;
}
2023-03-20 10:29:27 +00:00
const auto & prewhere_actions = table_expression_data . getPrewhereFilterActions ( ) ;
if ( prewhere_actions )
{
table_expression_query_info . prewhere_info = std : : make_shared < PrewhereInfo > ( ) ;
table_expression_query_info . prewhere_info - > prewhere_actions = prewhere_actions ;
table_expression_query_info . prewhere_info - > prewhere_column_name = prewhere_actions - > getOutputs ( ) . at ( 0 ) - > result_name ;
table_expression_query_info . prewhere_info - > remove_prewhere_column = true ;
table_expression_query_info . prewhere_info - > need_filter = true ;
}
updatePrewhereOutputsIfNeeded ( table_expression_query_info , table_expression_data . getColumnNames ( ) , storage_snapshot ) ;
2023-03-27 13:07:24 +00:00
const auto & columns_names = table_expression_data . getColumnNames ( ) ;
2023-03-20 10:29:27 +00:00
2023-03-27 13:07:24 +00:00
std : : vector < std : : pair < FilterDAGInfo , std : : string > > where_filters ;
const auto add_filter = [ & ] ( const FilterDAGInfo & filter_info , std : : string description )
2023-03-20 10:29:27 +00:00
{
2023-03-27 13:07:24 +00:00
if ( ! filter_info . actions )
return ;
2023-03-20 10:29:27 +00:00
bool is_final = table_expression_query_info . table_expression_modifiers & &
table_expression_query_info . table_expression_modifiers - > hasFinal ( ) ;
bool optimize_move_to_prewhere = settings . optimize_move_to_prewhere & & ( ! is_final | | settings . optimize_move_to_prewhere_if_final ) ;
if ( storage - > supportsPrewhere ( ) & & optimize_move_to_prewhere )
{
if ( ! table_expression_query_info . prewhere_info )
table_expression_query_info . prewhere_info = std : : make_shared < PrewhereInfo > ( ) ;
if ( ! table_expression_query_info . prewhere_info - > prewhere_actions )
{
2023-03-27 13:07:24 +00:00
table_expression_query_info . prewhere_info - > prewhere_actions = filter_info . actions ;
table_expression_query_info . prewhere_info - > prewhere_column_name = filter_info . column_name ;
table_expression_query_info . prewhere_info - > remove_prewhere_column = filter_info . do_remove_column ;
2023-03-20 10:29:27 +00:00
}
else
{
2023-03-27 13:07:24 +00:00
table_expression_query_info . prewhere_info - > row_level_filter = filter_info . actions ;
table_expression_query_info . prewhere_info - > row_level_column_name = filter_info . column_name ;
2023-03-20 10:29:27 +00:00
}
table_expression_query_info . prewhere_info - > need_filter = true ;
2023-03-27 13:07:24 +00:00
}
else
{
where_filters . emplace_back ( filter_info , std : : move ( description ) ) ;
}
} ;
auto row_policy_filter_info = buildRowPolicyFilterIfNeeded ( storage , table_expression_query_info , planner_context ) ;
add_filter ( row_policy_filter_info , " Row-level security filter " ) ;
if ( query_context - > getParallelReplicasMode ( ) = = Context : : ParallelReplicasMode : : CUSTOM_KEY )
{
if ( settings . parallel_replicas_count > 1 )
{
auto parallel_replicas_custom_key_filter_info = buildCustomKeyFilterIfNeeded ( storage , table_expression_query_info , planner_context ) ;
add_filter ( parallel_replicas_custom_key_filter_info , " Parallel replicas custom key filter " ) ;
}
else
{
2023-03-28 07:47:37 +00:00
if ( auto * distributed = typeid_cast < StorageDistributed * > ( storage . get ( ) ) ;
2023-03-27 13:07:24 +00:00
distributed & & canUseCustomKey ( settings , * distributed - > getCluster ( ) , * query_context ) )
{
table_expression_query_info . use_custom_key = true ;
planner_context - > getMutableQueryContext ( ) - > setSetting ( " distributed_group_by_no_merge " , 2 ) ;
}
2023-03-20 10:29:27 +00:00
}
}
2023-04-04 20:33:59 +00:00
const auto & table_expression_alias = table_expression - > getAlias ( ) ;
auto additional_filters_info = buildAdditionalFiltersIfNeeded ( storage , table_expression_alias , table_expression_query_info , planner_context ) ;
add_filter ( additional_filters_info , " additional filter " ) ;
2023-03-03 17:44:08 +00:00
from_stage = storage - > getQueryProcessingStage ( query_context , select_query_options . to_stage , storage_snapshot , table_expression_query_info ) ;
storage - > read ( query_plan , columns_names , storage_snapshot , table_expression_query_info , query_context , from_stage , max_block_size , max_streams ) ;
2023-03-04 17:46:40 +00:00
2023-03-27 13:07:24 +00:00
for ( const auto & filter_info_and_description : where_filters )
2023-03-20 10:29:27 +00:00
{
2023-03-27 13:07:24 +00:00
const auto & [ filter_info , description ] = filter_info_and_description ;
if ( query_plan . isInitialized ( ) & &
from_stage = = QueryProcessingStage : : FetchColumns & &
filter_info . actions )
{
auto filter_step = std : : make_unique < FilterStep > ( query_plan . getCurrentDataStream ( ) ,
filter_info . actions ,
filter_info . column_name ,
filter_info . do_remove_column ) ;
filter_step - > setStepDescription ( description ) ;
query_plan . addStep ( std : : move ( filter_step ) ) ;
}
2023-03-20 10:29:27 +00:00
}
2023-03-04 17:46:40 +00:00
if ( query_context - > hasQueryContext ( ) & & ! select_query_options . is_internal )
{
auto local_storage_id = storage - > getStorageID ( ) ;
query_context - > getQueryContext ( ) - > addQueryAccessInfo (
backQuoteIfNeed ( local_storage_id . getDatabaseName ( ) ) ,
local_storage_id . getFullTableName ( ) ,
columns_names ,
{ } ,
{ } ) ;
}
2023-03-03 17:44:08 +00:00
}
2023-02-26 22:37:06 +00:00
if ( query_plan . isInitialized ( ) )
{
/** Specify the number of threads only if it wasn't specified in storage.
*
* But in case of remote query and prefer_localhost_replica = 1 ( default )
* The inner local query ( that is done in the same process , without
* network interaction ) , it will setMaxThreads earlier and distributed
* query will not update it .
*/
if ( ! query_plan . getMaxThreads ( ) | | is_remote )
query_plan . setMaxThreads ( max_threads_execute_query ) ;
2023-05-07 04:29:04 +00:00
query_plan . setConcurrencyControl ( settings . use_concurrency_control ) ;
2023-02-26 22:37:06 +00:00
}
else
{
/// Create step which reads from empty source if storage has no data.
2023-03-20 10:29:27 +00:00
auto source_header = storage_snapshot - > getSampleBlockForColumns ( table_expression_data . getColumnNames ( ) ) ;
2023-02-26 22:37:06 +00:00
Pipe pipe ( std : : make_shared < NullSource > ( source_header ) ) ;
auto read_from_pipe = std : : make_unique < ReadFromPreparedSource > ( std : : move ( pipe ) ) ;
read_from_pipe - > setStepDescription ( " Read from NullSource " ) ;
query_plan . addStep ( std : : move ( read_from_pipe ) ) ;
}
2022-09-15 14:09:30 +00:00
}
}
else if ( query_node | | union_node )
{
2023-03-02 12:43:03 +00:00
if ( select_query_options . only_analyze )
{
auto projection_columns = query_node ? query_node - > getProjectionColumns ( ) : union_node - > computeProjectionColumns ( ) ;
Block source_header ;
for ( auto & projection_column : projection_columns )
source_header . insert ( ColumnWithTypeAndName ( projection_column . type , projection_column . name ) ) ;
Pipe pipe ( std : : make_shared < NullSource > ( source_header ) ) ;
auto read_from_pipe = std : : make_unique < ReadFromPreparedSource > ( std : : move ( pipe ) ) ;
read_from_pipe - > setStepDescription ( " Read from NullSource " ) ;
query_plan . addStep ( std : : move ( read_from_pipe ) ) ;
}
else
2023-03-01 16:10:31 +00:00
{
2023-02-24 12:46:09 +00:00
auto subquery_options = select_query_options . subquery ( ) ;
Planner subquery_planner ( table_expression , subquery_options , planner_context - > getGlobalPlannerContext ( ) ) ;
/// Propagate storage limits to subquery
subquery_planner . addStorageLimits ( * select_query_info . storage_limits ) ;
subquery_planner . buildQueryPlanIfNeeded ( ) ;
query_plan = std : : move ( subquery_planner ) . extractQueryPlan ( ) ;
2023-03-01 16:10:31 +00:00
}
2022-09-15 14:09:30 +00:00
}
else
{
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Expected table, table function, query or union. Actual {} " ,
table_expression - > formatASTForErrorMessage ( ) ) ;
2022-09-15 14:09:30 +00:00
}
2022-12-23 17:45:28 +00:00
if ( from_stage = = QueryProcessingStage : : FetchColumns )
2022-09-15 14:09:30 +00:00
{
2022-12-23 17:45:28 +00:00
auto rename_actions_dag = std : : make_shared < ActionsDAG > ( query_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ) ;
ActionsDAG : : NodeRawConstPtrs updated_actions_dag_outputs ;
2022-10-19 16:49:17 +00:00
2022-12-23 17:45:28 +00:00
for ( auto & output_node : rename_actions_dag - > getOutputs ( ) )
{
const auto * column_identifier = table_expression_data . getColumnIdentifierOrNull ( output_node - > result_name ) ;
if ( ! column_identifier )
continue ;
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
updated_actions_dag_outputs . push_back ( & rename_actions_dag - > addAlias ( * output_node , * column_identifier ) ) ;
}
2022-11-08 13:25:23 +00:00
2022-12-23 17:45:28 +00:00
rename_actions_dag - > getOutputs ( ) = std : : move ( updated_actions_dag_outputs ) ;
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
auto rename_step = std : : make_unique < ExpressionStep > ( query_plan . getCurrentDataStream ( ) , rename_actions_dag ) ;
rename_step - > setStepDescription ( " Change column names to column identifiers " ) ;
query_plan . addStep ( std : : move ( rename_step ) ) ;
}
2023-02-02 18:25:32 +00:00
else
{
2023-08-14 22:22:05 +00:00
SelectQueryOptions analyze_query_options = SelectQueryOptions ( from_stage ) . analyze ( ) ;
2023-02-02 18:25:32 +00:00
Planner planner ( select_query_info . query_tree ,
2023-08-14 22:22:05 +00:00
analyze_query_options ,
2023-02-15 14:03:52 +00:00
select_query_info . planner_context ) ;
2023-02-02 18:25:32 +00:00
planner . buildQueryPlanIfNeeded ( ) ;
auto expected_header = planner . getQueryPlan ( ) . getCurrentDataStream ( ) . header ;
2023-02-25 19:16:51 +00:00
if ( ! blocksHaveEqualStructure ( query_plan . getCurrentDataStream ( ) . header , expected_header ) )
{
2023-03-03 16:01:50 +00:00
materializeBlockInplace ( expected_header ) ;
2023-02-25 19:16:51 +00:00
auto rename_actions_dag = ActionsDAG : : makeConvertingActions (
query_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ,
expected_header . getColumnsWithTypeAndName ( ) ,
ActionsDAG : : MatchColumnsMode : : Position ,
true /*ignore_constant_values*/ ) ;
auto rename_step = std : : make_unique < ExpressionStep > ( query_plan . getCurrentDataStream ( ) , std : : move ( rename_actions_dag ) ) ;
std : : string step_description = table_expression_data . isRemote ( ) ? " Change remote column names to local column names " : " Change column names " ;
rename_step - > setStepDescription ( std : : move ( step_description ) ) ;
query_plan . addStep ( std : : move ( rename_step ) ) ;
}
2023-02-02 18:25:32 +00:00
}
2022-12-23 17:45:28 +00:00
return { std : : move ( query_plan ) , from_stage } ;
2022-09-15 14:09:30 +00:00
}
2022-12-23 17:45:28 +00:00
JoinTreeQueryPlan buildQueryPlanForJoinNode ( const QueryTreeNodePtr & join_table_expression ,
JoinTreeQueryPlan left_join_tree_query_plan ,
JoinTreeQueryPlan right_join_tree_query_plan ,
2022-12-23 19:33:46 +00:00
const ColumnIdentifierSet & outer_scope_columns ,
2022-09-15 14:09:30 +00:00
PlannerContextPtr & planner_context )
{
2022-12-23 17:45:28 +00:00
auto & join_node = join_table_expression - > as < JoinNode & > ( ) ;
if ( left_join_tree_query_plan . from_stage ! = QueryProcessingStage : : FetchColumns )
throw Exception ( ErrorCodes : : UNSUPPORTED_METHOD ,
" JOIN {} left table expression expected to process query to fetch columns stage. Actual {} " ,
join_node . formatASTForErrorMessage ( ) ,
QueryProcessingStage : : toString ( left_join_tree_query_plan . from_stage ) ) ;
auto left_plan = std : : move ( left_join_tree_query_plan . query_plan ) ;
2022-09-15 14:09:30 +00:00
auto left_plan_output_columns = left_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ;
2022-12-23 17:45:28 +00:00
if ( right_join_tree_query_plan . from_stage ! = QueryProcessingStage : : FetchColumns )
throw Exception ( ErrorCodes : : UNSUPPORTED_METHOD ,
" JOIN {} right table expression expected to process query to fetch columns stage. Actual {} " ,
join_node . formatASTForErrorMessage ( ) ,
QueryProcessingStage : : toString ( right_join_tree_query_plan . from_stage ) ) ;
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
auto right_plan = std : : move ( right_join_tree_query_plan . query_plan ) ;
2022-09-15 14:09:30 +00:00
auto right_plan_output_columns = right_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ;
JoinClausesAndActions join_clauses_and_actions ;
2022-09-28 11:20:24 +00:00
JoinKind join_kind = join_node . getKind ( ) ;
2023-06-06 19:03:23 +00:00
JoinStrictness join_strictness = join_node . getStrictness ( ) ;
2022-09-15 14:09:30 +00:00
2022-11-10 18:17:10 +00:00
std : : optional < bool > join_constant ;
2023-06-06 19:03:23 +00:00
if ( join_strictness = = JoinStrictness : : All )
2022-12-23 17:45:28 +00:00
join_constant = tryExtractConstantFromJoinNode ( join_table_expression ) ;
2022-11-10 18:17:10 +00:00
2022-09-28 11:20:24 +00:00
if ( join_constant )
{
/** If there is JOIN with always true constant, we transform it to cross.
* If there is JOIN with always false constant , we do not process JOIN keys .
* It is expected by join algorithm to handle such case .
*
* Example : SELECT * FROM test_table AS t1 INNER JOIN test_table AS t2 ON 1 ;
*/
if ( * join_constant )
join_kind = JoinKind : : Cross ;
}
else if ( join_node . isOnJoinExpression ( ) )
2022-09-15 14:09:30 +00:00
{
join_clauses_and_actions = buildJoinClausesAndActions ( left_plan_output_columns ,
right_plan_output_columns ,
2022-12-23 17:45:28 +00:00
join_table_expression ,
2022-09-15 14:09:30 +00:00
planner_context ) ;
join_clauses_and_actions . left_join_expressions_actions - > projectInput ( ) ;
auto left_join_expressions_actions_step = std : : make_unique < ExpressionStep > ( left_plan . getCurrentDataStream ( ) , join_clauses_and_actions . left_join_expressions_actions ) ;
left_join_expressions_actions_step - > setStepDescription ( " JOIN actions " ) ;
left_plan . addStep ( std : : move ( left_join_expressions_actions_step ) ) ;
join_clauses_and_actions . right_join_expressions_actions - > projectInput ( ) ;
auto right_join_expressions_actions_step = std : : make_unique < ExpressionStep > ( right_plan . getCurrentDataStream ( ) , join_clauses_and_actions . right_join_expressions_actions ) ;
right_join_expressions_actions_step - > setStepDescription ( " JOIN actions " ) ;
right_plan . addStep ( std : : move ( right_join_expressions_actions_step ) ) ;
}
std : : unordered_map < ColumnIdentifier , DataTypePtr > left_plan_column_name_to_cast_type ;
std : : unordered_map < ColumnIdentifier , DataTypePtr > right_plan_column_name_to_cast_type ;
if ( join_node . isUsingJoinExpression ( ) )
{
auto & join_node_using_columns_list = join_node . getJoinExpression ( ) - > as < ListNode & > ( ) ;
for ( auto & join_node_using_node : join_node_using_columns_list . getNodes ( ) )
{
auto & join_node_using_column_node = join_node_using_node - > as < ColumnNode & > ( ) ;
auto & inner_columns_list = join_node_using_column_node . getExpressionOrThrow ( ) - > as < ListNode & > ( ) ;
auto & left_inner_column_node = inner_columns_list . getNodes ( ) . at ( 0 ) ;
auto & left_inner_column = left_inner_column_node - > as < ColumnNode & > ( ) ;
auto & right_inner_column_node = inner_columns_list . getNodes ( ) . at ( 1 ) ;
auto & right_inner_column = right_inner_column_node - > as < ColumnNode & > ( ) ;
const auto & join_node_using_column_node_type = join_node_using_column_node . getColumnType ( ) ;
if ( ! left_inner_column . getColumnType ( ) - > equals ( * join_node_using_column_node_type ) )
{
2022-10-19 16:49:17 +00:00
const auto & left_inner_column_identifier = planner_context - > getColumnNodeIdentifierOrThrow ( left_inner_column_node ) ;
2022-09-15 14:09:30 +00:00
left_plan_column_name_to_cast_type . emplace ( left_inner_column_identifier , join_node_using_column_node_type ) ;
}
if ( ! right_inner_column . getColumnType ( ) - > equals ( * join_node_using_column_node_type ) )
{
2022-10-19 16:49:17 +00:00
const auto & right_inner_column_identifier = planner_context - > getColumnNodeIdentifierOrThrow ( right_inner_column_node ) ;
2022-09-15 14:09:30 +00:00
right_plan_column_name_to_cast_type . emplace ( right_inner_column_identifier , join_node_using_column_node_type ) ;
}
}
}
auto join_cast_plan_output_nodes = [ & ] ( QueryPlan & plan_to_add_cast , std : : unordered_map < std : : string , DataTypePtr > & plan_column_name_to_cast_type )
{
auto cast_actions_dag = std : : make_shared < ActionsDAG > ( plan_to_add_cast . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ) ;
for ( auto & output_node : cast_actions_dag - > getOutputs ( ) )
{
auto it = plan_column_name_to_cast_type . find ( output_node - > result_name ) ;
if ( it = = plan_column_name_to_cast_type . end ( ) )
continue ;
const auto & cast_type = it - > second ;
2023-01-31 13:54:04 +00:00
output_node = & cast_actions_dag - > addCast ( * output_node , cast_type , output_node - > result_name ) ;
2022-09-15 14:09:30 +00:00
}
cast_actions_dag - > projectInput ( ) ;
auto cast_join_columns_step
= std : : make_unique < ExpressionStep > ( plan_to_add_cast . getCurrentDataStream ( ) , std : : move ( cast_actions_dag ) ) ;
cast_join_columns_step - > setStepDescription ( " Cast JOIN USING columns " ) ;
plan_to_add_cast . addStep ( std : : move ( cast_join_columns_step ) ) ;
} ;
if ( ! left_plan_column_name_to_cast_type . empty ( ) )
join_cast_plan_output_nodes ( left_plan , left_plan_column_name_to_cast_type ) ;
if ( ! right_plan_column_name_to_cast_type . empty ( ) )
join_cast_plan_output_nodes ( right_plan , right_plan_column_name_to_cast_type ) ;
const auto & query_context = planner_context - > getQueryContext ( ) ;
2022-10-18 11:35:04 +00:00
const auto & settings = query_context - > getSettingsRef ( ) ;
bool join_use_nulls = settings . join_use_nulls ;
2022-09-15 14:09:30 +00:00
auto to_nullable_function = FunctionFactory : : instance ( ) . get ( " toNullable " , query_context ) ;
auto join_cast_plan_columns_to_nullable = [ & ] ( QueryPlan & plan_to_add_cast )
{
auto cast_actions_dag = std : : make_shared < ActionsDAG > ( plan_to_add_cast . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ) ;
for ( auto & output_node : cast_actions_dag - > getOutputs ( ) )
{
if ( planner_context - > getGlobalPlannerContext ( ) - > hasColumnIdentifier ( output_node - > result_name ) )
2023-02-24 12:46:09 +00:00
{
DataTypePtr type_to_check = output_node - > result_type ;
if ( const auto * type_to_check_low_cardinality = typeid_cast < const DataTypeLowCardinality * > ( type_to_check . get ( ) ) )
type_to_check = type_to_check_low_cardinality - > getDictionaryType ( ) ;
if ( type_to_check - > canBeInsideNullable ( ) )
output_node = & cast_actions_dag - > addFunction ( to_nullable_function , { output_node } , output_node - > result_name ) ;
}
2022-09-15 14:09:30 +00:00
}
cast_actions_dag - > projectInput ( ) ;
auto cast_join_columns_step = std : : make_unique < ExpressionStep > ( plan_to_add_cast . getCurrentDataStream ( ) , std : : move ( cast_actions_dag ) ) ;
cast_join_columns_step - > setStepDescription ( " Cast JOIN columns to Nullable " ) ;
plan_to_add_cast . addStep ( std : : move ( cast_join_columns_step ) ) ;
} ;
if ( join_use_nulls )
{
if ( isFull ( join_kind ) )
{
join_cast_plan_columns_to_nullable ( left_plan ) ;
join_cast_plan_columns_to_nullable ( right_plan ) ;
}
else if ( isLeft ( join_kind ) )
{
join_cast_plan_columns_to_nullable ( right_plan ) ;
}
else if ( isRight ( join_kind ) )
{
join_cast_plan_columns_to_nullable ( left_plan ) ;
}
}
2023-04-05 13:42:13 +00:00
auto table_join = std : : make_shared < TableJoin > ( settings , query_context - > getGlobalTemporaryVolume ( ) ) ;
2022-09-15 14:09:30 +00:00
table_join - > getTableJoin ( ) = join_node . toASTTableJoin ( ) - > as < ASTTableJoin & > ( ) ;
2022-09-28 11:20:24 +00:00
table_join - > getTableJoin ( ) . kind = join_kind ;
if ( join_kind = = JoinKind : : Comma )
2022-10-18 11:35:04 +00:00
{
join_kind = JoinKind : : Cross ;
2022-09-15 14:09:30 +00:00
table_join - > getTableJoin ( ) . kind = JoinKind : : Cross ;
2022-10-18 11:35:04 +00:00
}
2022-09-27 15:04:03 +00:00
2022-09-28 11:20:24 +00:00
table_join - > setIsJoinWithConstant ( join_constant ! = std : : nullopt ) ;
2022-09-15 14:09:30 +00:00
if ( join_node . isOnJoinExpression ( ) )
{
const auto & join_clauses = join_clauses_and_actions . join_clauses ;
2022-09-27 15:04:03 +00:00
bool is_asof = table_join - > strictness ( ) = = JoinStrictness : : Asof ;
if ( join_clauses . size ( ) > 1 )
{
if ( is_asof )
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED ,
" ASOF join {} doesn't support multiple ORs for keys in JOIN ON section " ,
join_node . formatASTForErrorMessage ( ) ) ;
}
2022-09-15 14:09:30 +00:00
auto & table_join_clauses = table_join - > getClauses ( ) ;
for ( const auto & join_clause : join_clauses )
{
table_join_clauses . emplace_back ( ) ;
auto & table_join_clause = table_join_clauses . back ( ) ;
const auto & join_clause_left_key_nodes = join_clause . getLeftKeyNodes ( ) ;
const auto & join_clause_right_key_nodes = join_clause . getRightKeyNodes ( ) ;
size_t join_clause_key_nodes_size = join_clause_left_key_nodes . size ( ) ;
assert ( join_clause_key_nodes_size = = join_clause_right_key_nodes . size ( ) ) ;
for ( size_t i = 0 ; i < join_clause_key_nodes_size ; + + i )
{
table_join_clause . key_names_left . push_back ( join_clause_left_key_nodes [ i ] - > result_name ) ;
table_join_clause . key_names_right . push_back ( join_clause_right_key_nodes [ i ] - > result_name ) ;
}
const auto & join_clause_get_left_filter_condition_nodes = join_clause . getLeftFilterConditionNodes ( ) ;
if ( ! join_clause_get_left_filter_condition_nodes . empty ( ) )
{
if ( join_clause_get_left_filter_condition_nodes . size ( ) ! = 1 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" JOIN {} left filter conditions size must be 1. Actual {} " ,
join_node . formatASTForErrorMessage ( ) ,
join_clause_get_left_filter_condition_nodes . size ( ) ) ;
const auto & join_clause_left_filter_condition_name = join_clause_get_left_filter_condition_nodes [ 0 ] - > result_name ;
table_join_clause . analyzer_left_filter_condition_column_name = join_clause_left_filter_condition_name ;
}
const auto & join_clause_get_right_filter_condition_nodes = join_clause . getRightFilterConditionNodes ( ) ;
if ( ! join_clause_get_right_filter_condition_nodes . empty ( ) )
{
if ( join_clause_get_right_filter_condition_nodes . size ( ) ! = 1 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" JOIN {} right filter conditions size must be 1. Actual {} " ,
join_node . formatASTForErrorMessage ( ) ,
join_clause_get_right_filter_condition_nodes . size ( ) ) ;
const auto & join_clause_right_filter_condition_name = join_clause_get_right_filter_condition_nodes [ 0 ] - > result_name ;
table_join_clause . analyzer_right_filter_condition_column_name = join_clause_right_filter_condition_name ;
}
2022-09-27 15:04:03 +00:00
if ( is_asof )
{
if ( ! join_clause . hasASOF ( ) )
throw Exception ( ErrorCodes : : INVALID_JOIN_ON_EXPRESSION ,
" JOIN {} no inequality in ASOF JOIN ON section. " ,
join_node . formatASTForErrorMessage ( ) ) ;
if ( table_join_clause . key_names_left . size ( ) < = 1 )
throw Exception ( ErrorCodes : : SYNTAX_ERROR ,
" JOIN {} ASOF join needs at least one equi-join column " ,
join_node . formatASTForErrorMessage ( ) ) ;
}
if ( join_clause . hasASOF ( ) )
{
const auto & asof_conditions = join_clause . getASOFConditions ( ) ;
assert ( asof_conditions . size ( ) = = 1 ) ;
const auto & asof_condition = asof_conditions [ 0 ] ;
table_join - > setAsofInequality ( asof_condition . asof_inequality ) ;
/// Execution layer of JOIN algorithms expects that ASOF keys are last JOIN keys
std : : swap ( table_join_clause . key_names_left . at ( asof_condition . key_index ) , table_join_clause . key_names_left . back ( ) ) ;
std : : swap ( table_join_clause . key_names_right . at ( asof_condition . key_index ) , table_join_clause . key_names_right . back ( ) ) ;
}
2022-09-15 14:09:30 +00:00
}
}
else if ( join_node . isUsingJoinExpression ( ) )
{
auto & table_join_clauses = table_join - > getClauses ( ) ;
table_join_clauses . emplace_back ( ) ;
auto & table_join_clause = table_join_clauses . back ( ) ;
auto & using_list = join_node . getJoinExpression ( ) - > as < ListNode & > ( ) ;
for ( auto & join_using_node : using_list . getNodes ( ) )
{
auto & join_using_column_node = join_using_node - > as < ColumnNode & > ( ) ;
2022-10-19 16:49:17 +00:00
auto & using_join_columns_list = join_using_column_node . getExpressionOrThrow ( ) - > as < ListNode & > ( ) ;
2022-09-15 14:09:30 +00:00
auto & using_join_left_join_column_node = using_join_columns_list . getNodes ( ) . at ( 0 ) ;
auto & using_join_right_join_column_node = using_join_columns_list . getNodes ( ) . at ( 1 ) ;
2022-10-19 16:49:17 +00:00
const auto & left_column_identifier = planner_context - > getColumnNodeIdentifierOrThrow ( using_join_left_join_column_node ) ;
const auto & right_column_identifier = planner_context - > getColumnNodeIdentifierOrThrow ( using_join_right_join_column_node ) ;
2022-09-15 14:09:30 +00:00
table_join_clause . key_names_left . push_back ( left_column_identifier ) ;
table_join_clause . key_names_right . push_back ( right_column_identifier ) ;
}
}
2022-10-26 16:26:26 +00:00
const Block & left_header = left_plan . getCurrentDataStream ( ) . header ;
auto left_table_names = left_header . getNames ( ) ;
2022-09-15 14:09:30 +00:00
NameSet left_table_names_set ( left_table_names . begin ( ) , left_table_names . end ( ) ) ;
auto columns_from_joined_table = right_plan . getCurrentDataStream ( ) . header . getNamesAndTypesList ( ) ;
table_join - > setColumnsFromJoinedTable ( columns_from_joined_table , left_table_names_set , " " ) ;
for ( auto & column_from_joined_table : columns_from_joined_table )
{
2023-02-18 16:06:00 +00:00
/// Add columns from joined table only if they are presented in outer scope, otherwise they can be dropped
2023-02-05 12:21:00 +00:00
if ( planner_context - > getGlobalPlannerContext ( ) - > hasColumnIdentifier ( column_from_joined_table . name ) & &
outer_scope_columns . contains ( column_from_joined_table . name ) )
2022-09-15 14:09:30 +00:00
table_join - > addJoinedColumn ( column_from_joined_table ) ;
}
2022-10-26 16:26:26 +00:00
const Block & right_header = right_plan . getCurrentDataStream ( ) . header ;
auto join_algorithm = chooseJoinAlgorithm ( table_join , join_node . getRightTableExpression ( ) , left_header , right_header , planner_context ) ;
2022-09-15 14:09:30 +00:00
2022-09-28 11:20:24 +00:00
auto result_plan = QueryPlan ( ) ;
2022-09-15 14:09:30 +00:00
2022-09-28 11:20:24 +00:00
if ( join_algorithm - > isFilled ( ) )
{
auto filled_join_step = std : : make_unique < FilledJoinStep > (
left_plan . getCurrentDataStream ( ) ,
join_algorithm ,
2022-12-23 17:45:28 +00:00
settings . max_block_size ) ;
2022-09-15 14:09:30 +00:00
2022-09-28 11:20:24 +00:00
filled_join_step - > setStepDescription ( " Filled JOIN " ) ;
left_plan . addStep ( std : : move ( filled_join_step ) ) ;
result_plan = std : : move ( left_plan ) ;
}
else
{
2022-10-18 11:35:04 +00:00
auto add_sorting = [ & ] ( QueryPlan & plan , const Names & key_names , JoinTableSide join_table_side )
{
SortDescription sort_description ;
sort_description . reserve ( key_names . size ( ) ) ;
for ( const auto & key_name : key_names )
sort_description . emplace_back ( key_name ) ;
2022-11-09 16:07:38 +00:00
SortingStep : : Settings sort_settings ( * query_context ) ;
2022-11-01 19:51:52 +00:00
2022-10-18 11:35:04 +00:00
auto sorting_step = std : : make_unique < SortingStep > (
plan . getCurrentDataStream ( ) ,
std : : move ( sort_description ) ,
0 /*limit*/ ,
2022-11-01 19:51:52 +00:00
sort_settings ,
2022-10-18 11:35:04 +00:00
settings . optimize_sorting_by_input_stream_properties ) ;
sorting_step - > setStepDescription ( fmt : : format ( " Sort {} before JOIN " , join_table_side ) ) ;
plan . addStep ( std : : move ( sorting_step ) ) ;
} ;
auto crosswise_connection = CreateSetAndFilterOnTheFlyStep : : createCrossConnection ( ) ;
auto add_create_set = [ & settings , crosswise_connection ] ( QueryPlan & plan , const Names & key_names , JoinTableSide join_table_side )
{
auto creating_set_step = std : : make_unique < CreateSetAndFilterOnTheFlyStep > (
plan . getCurrentDataStream ( ) ,
key_names ,
settings . max_rows_in_set_to_optimize_join ,
crosswise_connection ,
join_table_side ) ;
creating_set_step - > setStepDescription ( fmt : : format ( " Create set and filter {} joined stream " , join_table_side ) ) ;
auto * step_raw_ptr = creating_set_step . get ( ) ;
plan . addStep ( std : : move ( creating_set_step ) ) ;
return step_raw_ptr ;
} ;
if ( join_algorithm - > pipelineType ( ) = = JoinPipelineType : : YShaped )
{
const auto & join_clause = table_join - > getOnlyClause ( ) ;
bool kind_allows_filtering = isInner ( join_kind ) | | isLeft ( join_kind ) | | isRight ( join_kind ) ;
2023-06-01 10:52:04 +00:00
auto has_non_const = [ ] ( const Block & block , const auto & keys )
{
for ( const auto & key : keys )
{
const auto & column = block . getByName ( key ) . column ;
if ( column & & ! isColumnConst ( * column ) )
return true ;
}
return false ;
} ;
2023-10-05 12:39:17 +00:00
/// This optimization relies on the sorting that should buffer data from both streams before emitting any rows.
/// Sorting on a stream with const keys can start returning rows immediately and pipeline may stuck.
2023-06-01 10:52:04 +00:00
/// Note: it's also doesn't work with the read-in-order optimization.
/// No checks here because read in order is not applied if we have `CreateSetAndFilterOnTheFlyStep` in the pipeline between the reading and sorting steps.
bool has_non_const_keys = has_non_const ( left_plan . getCurrentDataStream ( ) . header , join_clause . key_names_left )
& & has_non_const ( right_plan . getCurrentDataStream ( ) . header , join_clause . key_names_right ) ;
if ( settings . max_rows_in_set_to_optimize_join > 0 & & kind_allows_filtering & & has_non_const_keys )
2022-10-18 11:35:04 +00:00
{
auto * left_set = add_create_set ( left_plan , join_clause . key_names_left , JoinTableSide : : Left ) ;
auto * right_set = add_create_set ( right_plan , join_clause . key_names_right , JoinTableSide : : Right ) ;
if ( isInnerOrLeft ( join_kind ) )
right_set - > setFiltering ( left_set - > getSet ( ) ) ;
if ( isInnerOrRight ( join_kind ) )
left_set - > setFiltering ( right_set - > getSet ( ) ) ;
}
add_sorting ( left_plan , join_clause . key_names_left , JoinTableSide : : Left ) ;
add_sorting ( right_plan , join_clause . key_names_right , JoinTableSide : : Right ) ;
}
2023-01-12 15:33:00 +00:00
auto join_pipeline_type = join_algorithm - > pipelineType ( ) ;
2022-09-28 11:20:24 +00:00
auto join_step = std : : make_unique < JoinStep > (
left_plan . getCurrentDataStream ( ) ,
right_plan . getCurrentDataStream ( ) ,
std : : move ( join_algorithm ) ,
2022-12-23 17:45:28 +00:00
settings . max_block_size ,
settings . max_threads ,
2022-09-28 11:20:24 +00:00
false /*optimize_read_in_order*/ ) ;
2023-01-12 15:33:00 +00:00
join_step - > setStepDescription ( fmt : : format ( " JOIN {} " , join_pipeline_type ) ) ;
2022-09-28 11:20:24 +00:00
std : : vector < QueryPlanPtr > plans ;
plans . emplace_back ( std : : make_unique < QueryPlan > ( std : : move ( left_plan ) ) ) ;
plans . emplace_back ( std : : make_unique < QueryPlan > ( std : : move ( right_plan ) ) ) ;
result_plan . unitePlans ( std : : move ( join_step ) , { std : : move ( plans ) } ) ;
}
2022-09-15 14:09:30 +00:00
auto drop_unused_columns_after_join_actions_dag = std : : make_shared < ActionsDAG > ( result_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ) ;
2023-02-04 15:51:43 +00:00
ActionsDAG : : NodeRawConstPtrs drop_unused_columns_after_join_actions_dag_updated_outputs ;
std : : unordered_set < std : : string_view > drop_unused_columns_after_join_actions_dag_updated_outputs_names ;
2023-01-22 15:36:13 +00:00
std : : optional < size_t > first_skipped_column_node_index ;
2022-09-15 14:09:30 +00:00
2023-01-22 15:36:13 +00:00
auto & drop_unused_columns_after_join_actions_dag_outputs = drop_unused_columns_after_join_actions_dag - > getOutputs ( ) ;
size_t drop_unused_columns_after_join_actions_dag_outputs_size = drop_unused_columns_after_join_actions_dag_outputs . size ( ) ;
for ( size_t i = 0 ; i < drop_unused_columns_after_join_actions_dag_outputs_size ; + + i )
2022-09-15 14:09:30 +00:00
{
2023-01-22 15:36:13 +00:00
const auto & output = drop_unused_columns_after_join_actions_dag_outputs [ i ] ;
2022-12-23 19:33:46 +00:00
const auto & global_planner_context = planner_context - > getGlobalPlannerContext ( ) ;
2023-02-04 15:51:43 +00:00
if ( drop_unused_columns_after_join_actions_dag_updated_outputs_names . contains ( output - > result_name )
2023-01-22 15:36:13 +00:00
| | ! global_planner_context - > hasColumnIdentifier ( output - > result_name ) )
continue ;
if ( ! outer_scope_columns . contains ( output - > result_name ) )
2022-12-23 19:33:46 +00:00
{
2023-01-22 15:36:13 +00:00
if ( ! first_skipped_column_node_index )
first_skipped_column_node_index = i ;
2022-09-15 14:09:30 +00:00
continue ;
2022-12-23 19:33:46 +00:00
}
2022-09-15 14:09:30 +00:00
2023-02-04 15:51:43 +00:00
drop_unused_columns_after_join_actions_dag_updated_outputs . push_back ( output ) ;
drop_unused_columns_after_join_actions_dag_updated_outputs_names . insert ( output - > result_name ) ;
2022-09-15 14:09:30 +00:00
}
2023-01-22 15:36:13 +00:00
/** It is expected that JOIN TREE query plan will contain at least 1 column, even if there are no columns in outer scope.
*
* Example : SELECT count ( ) FROM test_table_1 AS t1 , test_table_2 AS t2 ;
*/
2023-02-04 15:51:43 +00:00
if ( drop_unused_columns_after_join_actions_dag_updated_outputs . empty ( ) & & first_skipped_column_node_index )
drop_unused_columns_after_join_actions_dag_updated_outputs . push_back ( drop_unused_columns_after_join_actions_dag_outputs [ * first_skipped_column_node_index ] ) ;
2023-01-22 15:36:13 +00:00
2023-02-04 15:51:43 +00:00
drop_unused_columns_after_join_actions_dag_outputs = std : : move ( drop_unused_columns_after_join_actions_dag_updated_outputs ) ;
2022-09-15 14:09:30 +00:00
auto drop_unused_columns_after_join_transform_step = std : : make_unique < ExpressionStep > ( result_plan . getCurrentDataStream ( ) , std : : move ( drop_unused_columns_after_join_actions_dag ) ) ;
drop_unused_columns_after_join_transform_step - > setStepDescription ( " DROP unused columns after JOIN " ) ;
result_plan . addStep ( std : : move ( drop_unused_columns_after_join_transform_step ) ) ;
2022-12-23 17:45:28 +00:00
return { std : : move ( result_plan ) , QueryProcessingStage : : FetchColumns } ;
2022-09-15 14:09:30 +00:00
}
2022-12-23 17:45:28 +00:00
JoinTreeQueryPlan buildQueryPlanForArrayJoinNode ( const QueryTreeNodePtr & array_join_table_expression ,
JoinTreeQueryPlan join_tree_query_plan ,
2023-02-04 15:51:43 +00:00
const ColumnIdentifierSet & outer_scope_columns ,
2022-09-15 14:09:30 +00:00
PlannerContextPtr & planner_context )
{
2022-12-23 17:45:28 +00:00
auto & array_join_node = array_join_table_expression - > as < ArrayJoinNode & > ( ) ;
if ( join_tree_query_plan . from_stage ! = QueryProcessingStage : : FetchColumns )
throw Exception ( ErrorCodes : : UNSUPPORTED_METHOD ,
" ARRAY JOIN {} table expression expected to process query to fetch columns stage. Actual {} " ,
array_join_node . formatASTForErrorMessage ( ) ,
QueryProcessingStage : : toString ( join_tree_query_plan . from_stage ) ) ;
auto plan = std : : move ( join_tree_query_plan . query_plan ) ;
2022-10-19 16:49:17 +00:00
auto plan_output_columns = plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ;
2022-09-15 14:09:30 +00:00
2022-10-19 16:49:17 +00:00
ActionsDAGPtr array_join_action_dag = std : : make_shared < ActionsDAG > ( plan_output_columns ) ;
2022-09-15 14:09:30 +00:00
PlannerActionsVisitor actions_visitor ( planner_context ) ;
2023-02-04 15:51:43 +00:00
std : : unordered_set < std : : string > array_join_expressions_output_nodes ;
2022-09-15 14:09:30 +00:00
2022-11-08 13:25:23 +00:00
NameSet array_join_column_names ;
2022-09-15 14:09:30 +00:00
for ( auto & array_join_expression : array_join_node . getJoinExpressions ( ) . getNodes ( ) )
{
2022-11-08 13:25:23 +00:00
const auto & array_join_column_identifier = planner_context - > getColumnNodeIdentifierOrThrow ( array_join_expression ) ;
array_join_column_names . insert ( array_join_column_identifier ) ;
2022-09-15 14:09:30 +00:00
2022-11-08 13:25:23 +00:00
auto & array_join_expression_column = array_join_expression - > as < ColumnNode & > ( ) ;
2022-09-15 14:09:30 +00:00
auto expression_dag_index_nodes = actions_visitor . visit ( array_join_action_dag , array_join_expression_column . getExpressionOrThrow ( ) ) ;
2023-02-04 15:51:43 +00:00
2022-09-15 14:09:30 +00:00
for ( auto & expression_dag_index_node : expression_dag_index_nodes )
{
2022-11-08 13:25:23 +00:00
const auto * array_join_column_node = & array_join_action_dag - > addAlias ( * expression_dag_index_node , array_join_column_identifier ) ;
2022-09-15 14:09:30 +00:00
array_join_action_dag - > getOutputs ( ) . push_back ( array_join_column_node ) ;
2023-02-04 15:51:43 +00:00
array_join_expressions_output_nodes . insert ( array_join_column_node - > result_name ) ;
2022-09-15 14:09:30 +00:00
}
}
array_join_action_dag - > projectInput ( ) ;
2022-10-19 16:49:17 +00:00
auto array_join_actions = std : : make_unique < ExpressionStep > ( plan . getCurrentDataStream ( ) , array_join_action_dag ) ;
2022-09-15 14:09:30 +00:00
array_join_actions - > setStepDescription ( " ARRAY JOIN actions " ) ;
2022-10-19 16:49:17 +00:00
plan . addStep ( std : : move ( array_join_actions ) ) ;
2022-09-15 14:09:30 +00:00
2023-02-04 15:51:43 +00:00
auto drop_unused_columns_before_array_join_actions_dag = std : : make_shared < ActionsDAG > ( plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ) ;
ActionsDAG : : NodeRawConstPtrs drop_unused_columns_before_array_join_actions_dag_updated_outputs ;
std : : unordered_set < std : : string_view > drop_unused_columns_before_array_join_actions_dag_updated_outputs_names ;
auto & drop_unused_columns_before_array_join_actions_dag_outputs = drop_unused_columns_before_array_join_actions_dag - > getOutputs ( ) ;
size_t drop_unused_columns_before_array_join_actions_dag_outputs_size = drop_unused_columns_before_array_join_actions_dag_outputs . size ( ) ;
for ( size_t i = 0 ; i < drop_unused_columns_before_array_join_actions_dag_outputs_size ; + + i )
{
const auto & output = drop_unused_columns_before_array_join_actions_dag_outputs [ i ] ;
if ( drop_unused_columns_before_array_join_actions_dag_updated_outputs_names . contains ( output - > result_name ) )
continue ;
if ( ! array_join_expressions_output_nodes . contains ( output - > result_name ) & &
! outer_scope_columns . contains ( output - > result_name ) )
continue ;
drop_unused_columns_before_array_join_actions_dag_updated_outputs . push_back ( output ) ;
drop_unused_columns_before_array_join_actions_dag_updated_outputs_names . insert ( output - > result_name ) ;
}
drop_unused_columns_before_array_join_actions_dag_outputs = std : : move ( drop_unused_columns_before_array_join_actions_dag_updated_outputs ) ;
auto drop_unused_columns_before_array_join_transform_step = std : : make_unique < ExpressionStep > ( plan . getCurrentDataStream ( ) ,
std : : move ( drop_unused_columns_before_array_join_actions_dag ) ) ;
drop_unused_columns_before_array_join_transform_step - > setStepDescription ( " DROP unused columns before ARRAY JOIN " ) ;
plan . addStep ( std : : move ( drop_unused_columns_before_array_join_transform_step ) ) ;
2022-11-08 13:25:23 +00:00
auto array_join_action = std : : make_shared < ArrayJoinAction > ( array_join_column_names , array_join_node . isLeft ( ) , planner_context - > getQueryContext ( ) ) ;
2022-10-19 16:49:17 +00:00
auto array_join_step = std : : make_unique < ArrayJoinStep > ( plan . getCurrentDataStream ( ) , std : : move ( array_join_action ) ) ;
2022-09-15 14:09:30 +00:00
array_join_step - > setStepDescription ( " ARRAY JOIN " ) ;
2022-10-19 16:49:17 +00:00
plan . addStep ( std : : move ( array_join_step ) ) ;
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
return { std : : move ( plan ) , QueryProcessingStage : : FetchColumns } ;
2022-09-15 14:09:30 +00:00
}
}
2022-12-23 17:45:28 +00:00
JoinTreeQueryPlan buildJoinTreeQueryPlan ( const QueryTreeNodePtr & query_node ,
const SelectQueryInfo & select_query_info ,
2023-08-11 23:04:08 +00:00
SelectQueryOptions & select_query_options ,
2022-12-23 19:33:46 +00:00
const ColumnIdentifierSet & outer_scope_columns ,
2022-09-15 14:09:30 +00:00
PlannerContextPtr & planner_context )
{
2023-03-03 16:01:50 +00:00
auto table_expressions_stack = buildTableExpressionsStack ( query_node - > as < QueryNode & > ( ) . getJoinTree ( ) ) ;
2023-01-12 15:33:00 +00:00
size_t table_expressions_stack_size = table_expressions_stack . size ( ) ;
bool is_single_table_expression = table_expressions_stack_size = = 1 ;
std : : vector < ColumnIdentifierSet > table_expressions_outer_scope_columns ( table_expressions_stack_size ) ;
ColumnIdentifierSet current_outer_scope_columns = outer_scope_columns ;
2023-08-11 23:04:08 +00:00
if ( is_single_table_expression )
{
auto * table_node = table_expressions_stack [ 0 ] - > as < TableNode > ( ) ;
if ( table_node & & shouldIgnoreQuotaAndLimits ( * table_node ) )
{
select_query_options . ignore_quota = true ;
select_query_options . ignore_limits = true ;
}
}
2023-03-03 16:01:50 +00:00
/// For each table, table function, query, union table expressions prepare before query plan build
for ( size_t i = 0 ; i < table_expressions_stack_size ; + + i )
{
const auto & table_expression = table_expressions_stack [ i ] ;
auto table_expression_type = table_expression - > getNodeType ( ) ;
if ( table_expression_type = = QueryTreeNodeType : : JOIN | |
table_expression_type = = QueryTreeNodeType : : ARRAY_JOIN )
continue ;
prepareBuildQueryPlanForTableExpression ( table_expression , planner_context ) ;
}
/** If left most table expression query plan is planned to stage that is not equal to fetch columns,
* then left most table expression is responsible for providing valid JOIN TREE part of final query plan .
*
* Examples : Distributed , LiveView , Merge storages .
*/
auto left_table_expression = table_expressions_stack . front ( ) ;
auto left_table_expression_query_plan = buildQueryPlanForTableExpression ( left_table_expression ,
select_query_info ,
select_query_options ,
planner_context ,
is_single_table_expression ,
false /*wrap_read_columns_in_subquery*/ ) ;
if ( left_table_expression_query_plan . from_stage ! = QueryProcessingStage : : FetchColumns )
return left_table_expression_query_plan ;
2023-01-13 16:53:53 +00:00
for ( Int64 i = static_cast < Int64 > ( table_expressions_stack_size ) - 1 ; i > = 0 ; - - i )
2023-01-12 15:33:00 +00:00
{
table_expressions_outer_scope_columns [ i ] = current_outer_scope_columns ;
2023-02-04 15:51:43 +00:00
auto & table_expression = table_expressions_stack [ i ] ;
auto table_expression_type = table_expression - > getNodeType ( ) ;
2023-01-12 15:33:00 +00:00
2023-02-04 15:51:43 +00:00
if ( table_expression_type = = QueryTreeNodeType : : JOIN )
collectTopLevelColumnIdentifiers ( table_expression , planner_context , current_outer_scope_columns ) ;
else if ( table_expression_type = = QueryTreeNodeType : : ARRAY_JOIN )
collectTopLevelColumnIdentifiers ( table_expression , planner_context , current_outer_scope_columns ) ;
2023-01-12 15:33:00 +00:00
}
2022-09-15 14:09:30 +00:00
2022-12-23 17:45:28 +00:00
std : : vector < JoinTreeQueryPlan > query_plans_stack ;
2023-01-12 15:33:00 +00:00
for ( size_t i = 0 ; i < table_expressions_stack_size ; + + i )
2022-09-15 14:09:30 +00:00
{
2023-01-12 15:33:00 +00:00
const auto & table_expression = table_expressions_stack [ i ] ;
2023-02-11 11:10:53 +00:00
auto table_expression_node_type = table_expression - > getNodeType ( ) ;
2023-01-12 15:33:00 +00:00
2023-02-11 11:10:53 +00:00
if ( table_expression_node_type = = QueryTreeNodeType : : ARRAY_JOIN )
2022-09-15 14:09:30 +00:00
{
2022-12-23 17:45:28 +00:00
if ( query_plans_stack . empty ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
2023-01-13 16:53:53 +00:00
" Expected at least 1 query plan on stack before ARRAY JOIN processing. Actual {} " ,
query_plans_stack . size ( ) ) ;
2022-12-23 17:45:28 +00:00
auto query_plan = std : : move ( query_plans_stack . back ( ) ) ;
query_plans_stack . back ( ) = buildQueryPlanForArrayJoinNode ( table_expression ,
std : : move ( query_plan ) ,
2023-02-04 15:51:43 +00:00
table_expressions_outer_scope_columns [ i ] ,
2022-12-23 17:45:28 +00:00
planner_context ) ;
2022-09-15 14:09:30 +00:00
}
2023-02-11 11:10:53 +00:00
else if ( table_expression_node_type = = QueryTreeNodeType : : JOIN )
2022-09-15 14:09:30 +00:00
{
2023-01-13 16:53:53 +00:00
if ( query_plans_stack . size ( ) < 2 )
2022-12-23 17:45:28 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Expected at least 2 query plans on stack before JOIN processing. Actual {} " ,
2023-01-13 16:53:53 +00:00
query_plans_stack . size ( ) ) ;
2022-12-23 17:45:28 +00:00
auto right_query_plan = std : : move ( query_plans_stack . back ( ) ) ;
query_plans_stack . pop_back ( ) ;
auto left_query_plan = std : : move ( query_plans_stack . back ( ) ) ;
query_plans_stack . pop_back ( ) ;
query_plans_stack . push_back ( buildQueryPlanForJoinNode ( table_expression ,
std : : move ( left_query_plan ) ,
std : : move ( right_query_plan ) ,
2023-01-12 15:33:00 +00:00
table_expressions_outer_scope_columns [ i ] ,
2022-12-23 17:45:28 +00:00
planner_context ) ) ;
2022-09-15 14:09:30 +00:00
}
2022-12-23 17:45:28 +00:00
else
2022-09-15 14:09:30 +00:00
{
2023-03-03 16:01:50 +00:00
if ( table_expression = = left_table_expression )
{
2023-03-12 12:43:58 +00:00
query_plans_stack . push_back ( std : : move ( left_table_expression_query_plan ) ) ; /// NOLINT
2023-03-06 09:53:55 +00:00
left_table_expression = { } ;
2023-03-03 16:01:50 +00:00
continue ;
}
2023-02-09 17:30:57 +00:00
2023-03-03 16:01:50 +00:00
/** If table expression is remote and it is not left most table expression, we wrap read columns from such
* table expression in subquery .
*/
bool is_remote = planner_context - > getTableExpressionDataOrThrow ( table_expression ) . isRemote ( ) ;
2022-12-23 17:45:28 +00:00
query_plans_stack . push_back ( buildQueryPlanForTableExpression ( table_expression ,
select_query_info ,
select_query_options ,
planner_context ,
2023-03-03 16:01:50 +00:00
is_single_table_expression ,
is_remote /*wrap_read_columns_in_subquery*/ ) ) ;
2022-09-15 14:09:30 +00:00
}
}
2022-12-23 17:45:28 +00:00
2023-01-13 16:53:53 +00:00
if ( query_plans_stack . size ( ) ! = 1 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Expected 1 query plan for JOIN TREE. Actual {} " ,
query_plans_stack . size ( ) ) ;
2022-12-23 17:45:28 +00:00
return std : : move ( query_plans_stack . back ( ) ) ;
2022-09-15 14:09:30 +00:00
}
}