diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h index a3212b3fb07..6819232b4be 100644 --- a/src/Analyzer/FunctionNode.h +++ b/src/Analyzer/FunctionNode.h @@ -125,7 +125,8 @@ public: { if (kind != FunctionKind::ORDINARY) throw Exception(ErrorCodes::LOGICAL_ERROR, - "Function node with name '{}' is not resolved as ordinary function"); + "Function node with name '{}' is not resolved as ordinary function", + function_name); return std::static_pointer_cast(function); } diff --git a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp index 0f5ccb3ae06..1a83d93c87d 100644 --- a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp +++ b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp @@ -16,9 +16,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -108,6 +110,18 @@ void replaceStorageInQueryTree(QueryTreeNodePtr & query_tree, const ContextPtr & } auto replacement_table_expression = std::make_shared(storage, context); + std::optional table_expression_modifiers; + + if (auto * table_node = table_expression_to_replace->as()) + table_expression_modifiers = table_node->getTableExpressionModifiers(); + else if (auto * table_function_node = table_expression_to_replace->as()) + table_expression_modifiers = table_function_node->getTableExpressionModifiers(); + else if (auto * identifier_node = table_expression_to_replace->as()) + table_expression_modifiers = identifier_node->getTableExpressionModifiers(); + + if (table_expression_modifiers) + replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); + query_tree = query_tree->cloneAndReplace(table_expression_to_replace, std::move(replacement_table_expression)); } @@ -132,11 +146,6 @@ QueryTreeNodePtr buildQueryTreeAndRunPasses(const ASTPtr & query, return query_tree; } -PlannerConfiguration buildPlannerConfiguration(const SelectQueryOptions & select_query_options) -{ - return {.only_analyze = select_query_options.only_analyze}; -} - } InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer( @@ -147,7 +156,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer( , context(buildContext(context_, select_query_options_)) , select_query_options(select_query_options_) , query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, nullptr /*storage*/)) - , planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options)) + , planner(query_tree, select_query_options) { } @@ -160,7 +169,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer( , context(buildContext(context_, select_query_options_)) , select_query_options(select_query_options_) , query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, storage_)) - , planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options)) + , planner(query_tree, select_query_options) { } @@ -172,7 +181,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer( , context(buildContext(context_, select_query_options_)) , select_query_options(select_query_options_) , query_tree(query_tree_) - , planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options)) + , planner(query_tree, select_query_options) { } diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index f84e4db4924..a4714c05a23 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -979,34 +979,28 @@ PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node, } Planner::Planner(const QueryTreeNodePtr & query_tree_, - const SelectQueryOptions & select_query_options_, - PlannerConfiguration planner_configuration_) + const SelectQueryOptions & select_query_options_) : query_tree(query_tree_) , select_query_options(select_query_options_) , planner_context(buildPlannerContext(query_tree, select_query_options, std::make_shared())) - , planner_configuration(std::move(planner_configuration_)) { } Planner::Planner(const QueryTreeNodePtr & query_tree_, const SelectQueryOptions & select_query_options_, - GlobalPlannerContextPtr global_planner_context_, - PlannerConfiguration planner_configuration_) + GlobalPlannerContextPtr global_planner_context_) : query_tree(query_tree_) , select_query_options(select_query_options_) , planner_context(buildPlannerContext(query_tree_, select_query_options, std::move(global_planner_context_))) - , planner_configuration(std::move(planner_configuration_)) { } Planner::Planner(const QueryTreeNodePtr & query_tree_, const SelectQueryOptions & select_query_options_, - PlannerContextPtr planner_context_, - PlannerConfiguration planner_configuration_) + PlannerContextPtr planner_context_) : query_tree(query_tree_) , select_query_options(select_query_options_) , planner_context(std::move(planner_context_)) - , planner_configuration(std::move(planner_configuration_)) { } @@ -1015,7 +1009,7 @@ void Planner::buildQueryPlanIfNeeded() if (query_plan.isInitialized()) return; - if (query_tree->as()) + if (query_tree->getNodeType() == QueryTreeNodeType::UNION) buildPlanForUnionNode(); else buildPlanForQueryNode(); @@ -1174,7 +1168,7 @@ void Planner::buildPlanForQueryNode() QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - if (planner_configuration.only_analyze) + if (select_query_options.only_analyze) { Block join_tree_block; diff --git a/src/Planner/Planner.h b/src/Planner/Planner.h index a87880eab6b..4427e8cc504 100644 --- a/src/Planner/Planner.h +++ b/src/Planner/Planner.h @@ -16,30 +16,22 @@ using GlobalPlannerContextPtr = std::shared_ptr; class PlannerContext; using PlannerContextPtr = std::shared_ptr; -struct PlannerConfiguration -{ - bool only_analyze = false; -}; - class Planner { public: /// Initialize planner with query tree after analysis phase Planner(const QueryTreeNodePtr & query_tree_, - const SelectQueryOptions & select_query_options_, - PlannerConfiguration planner_configuration_ = {}); + const SelectQueryOptions & select_query_options_); /// Initialize planner with query tree after query analysis phase and global planner context Planner(const QueryTreeNodePtr & query_tree_, const SelectQueryOptions & select_query_options_, - GlobalPlannerContextPtr global_planner_context_, - PlannerConfiguration planner_configuration_ = {}); + GlobalPlannerContextPtr global_planner_context_); /// Initialize planner with query tree after query analysis phase and planner context Planner(const QueryTreeNodePtr & query_tree_, const SelectQueryOptions & select_query_options_, - PlannerContextPtr planner_context_, - PlannerConfiguration planner_configuration_ = {}); + PlannerContextPtr planner_context_); const QueryPlan & getQueryPlan() const { @@ -69,7 +61,6 @@ private: QueryPlan query_plan; SelectQueryOptions select_query_options; PlannerContextPtr planner_context; - PlannerConfiguration planner_configuration; StorageLimitsList storage_limits; }; diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 0a6d38a15be..0c74479615a 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -370,9 +370,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl else { Planner planner(select_query_info.query_tree, - SelectQueryOptions(from_stage), - select_query_info.planner_context, - PlannerConfiguration{.only_analyze = true}); + SelectQueryOptions(from_stage).analyze(), + select_query_info.planner_context); planner.buildQueryPlanIfNeeded(); auto expected_header = planner.getQueryPlan().getCurrentDataStream().header; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index cafb9c1abe8..22ebd401377 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -736,9 +736,7 @@ void StorageDistributed::read( query_ast = queryNodeToSelectQuery(query_tree_with_replaced_distributed_table); - Planner planner(query_tree_with_replaced_distributed_table, - SelectQueryOptions(processed_stage), - PlannerConfiguration{.only_analyze = true}); + Planner planner(query_tree_with_replaced_distributed_table, SelectQueryOptions(processed_stage).analyze()); planner.buildQueryPlanIfNeeded(); header = planner.getQueryPlan().getCurrentDataStream().header; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 3f31dbac311..af7bd96cd0b 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -533,23 +533,34 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer { const auto & [database_name, storage, storage_lock, table_name] = storage_with_lock_and_name; const StorageID current_storage_id = storage->getStorageID(); - bool is_storage_merge_engine = storage->as(); SelectQueryInfo modified_query_info = query_info; if (modified_query_info.table_expression) { auto replacement_table_expression = std::make_shared(storage, storage_lock, storage_snapshot); + if (query_info.table_expression_modifiers) + replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers); + modified_query_info.query_tree = modified_query_info.query_tree->cloneAndReplace(modified_query_info.table_expression, replacement_table_expression); modified_query_info.table_expression = replacement_table_expression; + modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression); - if (!is_storage_merge_engine) - { - std::unordered_map column_name_to_node; + auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals(); + if (storage_snapshot->storage.supportsSubcolumns()) + get_column_options.withSubcolumns(); + + std::unordered_map column_name_to_node; + + if (!storage_snapshot->tryGetColumn(get_column_options, "_table")) column_name_to_node.emplace("_table", std::make_shared(current_storage_id.table_name)); + + if (!storage_snapshot->tryGetColumn(get_column_options, "_database")) column_name_to_node.emplace("_database", std::make_shared(current_storage_id.database_name)); + if (!column_name_to_node.empty()) + { replaceColumns(modified_query_info.query_tree, replacement_table_expression, column_name_to_node); @@ -559,6 +570,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer } else { + bool is_storage_merge_engine = storage->as(); modified_query_info.query = query_info.query->clone(); /// Original query could contain JOIN but we need only the first joined table and its columns. @@ -585,7 +597,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( const Block & header, const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, - Names & real_column_names, + Names real_column_names, ContextMutablePtr modified_context, size_t streams_num, bool concat_streams) @@ -604,13 +616,82 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( modified_select.setFinal(); } - if (modified_context->getSettingsRef().allow_experimental_analyzer) + bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer; + + auto storage_stage = storage->getQueryProcessingStage(modified_context, + QueryProcessingStage::Complete, + storage_snapshot, + modified_query_info); + if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns)) { - InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query, - modified_context, - SelectQueryOptions(processed_stage).ignoreProjections(), - storage); - builder = std::make_unique(interpreter.buildQueryPipeline()); + /// If there are only virtual columns in query, you must request at least one other column. + if (real_column_names.empty()) + real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); + + QueryPlan plan; + + StorageView * view = dynamic_cast(storage.get()); + if (!view || allow_experimental_analyzer) + { + storage->read(plan, + real_column_names, + storage_snapshot, + modified_query_info, + modified_context, + processed_stage, + max_block_size, + UInt32(streams_num)); + } + else + { + /// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read. + /// The most intuitive way is to use InterpreterSelectQuery. + + /// Intercept the settings + modified_context->setSetting("max_threads", streams_num); + modified_context->setSetting("max_streams_to_max_threads_ratio", 1); + modified_context->setSetting("max_block_size", max_block_size); + + InterpreterSelectQuery interpreter(modified_query_info.query, + modified_context, + storage, + view->getInMemoryMetadataPtr(), + SelectQueryOptions(processed_stage)); + interpreter.buildQueryPlan(plan); + } + + if (!plan.isInitialized()) + return {}; + + if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) + read_from_merge_tree->addFilterNodes(added_filter_nodes); + + builder = plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(modified_context), + BuildQueryPipelineSettings::fromContext(modified_context)); + } + else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)) + { + /// Maximum permissible parallelism is streams_num + modified_context->setSetting("max_threads", streams_num); + modified_context->setSetting("max_streams_to_max_threads_ratio", 1); + + if (allow_experimental_analyzer) + { + InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree, + modified_context, + SelectQueryOptions(processed_stage).ignoreProjections()); + builder = std::make_unique(interpreter.buildQueryPipeline()); + } + else + { + modified_select.replaceDatabaseAndTable(database_name, table_name); + /// TODO: Find a way to support projections for StorageMerge + InterpreterSelectQuery interpreter{modified_query_info.query, + modified_context, + SelectQueryOptions(processed_stage).ignoreProjections()}; + builder = std::make_unique(interpreter.buildQueryPipeline()); + } /** Materialization is needed, since from distributed storage the constants come materialized. * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, @@ -618,75 +699,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( */ builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared(stream_header); }); } - else - { - auto storage_stage - = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, storage_snapshot, modified_query_info); - if (processed_stage <= storage_stage) - { - /// If there are only virtual columns in query, you must request at least one other column. - if (real_column_names.empty()) - real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); - - QueryPlan plan; - if (StorageView * view = dynamic_cast(storage.get())) - { - /// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read. - /// The most intuitive way is to use InterpreterSelectQuery. - - /// Intercept the settings - modified_context->setSetting("max_threads", streams_num); - modified_context->setSetting("max_streams_to_max_threads_ratio", 1); - modified_context->setSetting("max_block_size", max_block_size); - - InterpreterSelectQuery( - modified_query_info.query, modified_context, storage, view->getInMemoryMetadataPtr(), SelectQueryOptions(processed_stage)) - .buildQueryPlan(plan); - } - else - { - storage->read( - plan, - real_column_names, - storage_snapshot, - modified_query_info, - modified_context, - processed_stage, - max_block_size, - UInt32(streams_num)); - } - - if (!plan.isInitialized()) - return {}; - - if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) - read_from_merge_tree->addFilterNodes(added_filter_nodes); - - builder = plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(modified_context), - BuildQueryPipelineSettings::fromContext(modified_context)); - } - else if (processed_stage > storage_stage) - { - modified_select.replaceDatabaseAndTable(database_name, table_name); - - /// Maximum permissible parallelism is streams_num - modified_context->setSetting("max_threads", streams_num); - modified_context->setSetting("max_streams_to_max_threads_ratio", 1); - - /// TODO: Find a way to support projections for StorageMerge - InterpreterSelectQuery interpreter{ - modified_query_info.query, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()}; - - builder = std::make_unique(interpreter.buildQueryPipeline()); - - /** Materialization is needed, since from distributed storage the constants come materialized. - * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, - * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. - */ - builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared(stream_header); }); - } - } if (builder->initialized()) { @@ -739,7 +751,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. - convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder); + convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage); } return builder; @@ -914,7 +926,8 @@ void ReadFromMerge::convertingSourceStream( const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases, ContextPtr local_context, - QueryPipelineBuilder & builder) + QueryPipelineBuilder & builder, + const QueryProcessingStage::Enum & processed_stage) { Block before_block_header = builder.getHeader(); @@ -940,7 +953,7 @@ void ReadFromMerge::convertingSourceStream( ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name; - if (local_context->getSettingsRef().allow_experimental_analyzer) + if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns) convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position; auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(), diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index 0d34052c7cd..05098b6df51 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -197,15 +197,18 @@ private: const Block & header, const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, - Names & real_column_names, + Names real_column_names, ContextMutablePtr modified_context, size_t streams_num, bool concat_streams = false); static void convertingSourceStream( - const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases, + const Block & header, + const StorageMetadataPtr & metadata_snapshot, + const Aliases & aliases, ContextPtr context, - QueryPipelineBuilder & builder); + QueryPipelineBuilder & builder, + const QueryProcessingStage::Enum & processed_stage); }; }