From cd912074a5a94dc80daf4104942e0a9d3ed26e45 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 14 Mar 2024 18:29:07 +0100 Subject: [PATCH] Fixup --- src/DataTypes/ObjectUtils.cpp | 14 ++++++-------- src/DataTypes/ObjectUtils.h | 4 +--- .../ClusterProxy/SelectStreamFactory.cpp | 19 ++++++++++++------- .../ClusterProxy/SelectStreamFactory.h | 7 +++++-- src/Processors/QueryPlan/ReadFromRemote.cpp | 6 +++--- .../01455_opentelemetry_distributed.reference | 4 ++-- 6 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/DataTypes/ObjectUtils.cpp b/src/DataTypes/ObjectUtils.cpp index ccfa0a28f13..cdd95da6c00 100644 --- a/src/DataTypes/ObjectUtils.cpp +++ b/src/DataTypes/ObjectUtils.cpp @@ -959,24 +959,22 @@ void replaceMissedSubcolumnsByConstants( /// @expected_columns and @available_columns contain descriptions /// of extended Object columns. -MissingObjectList replaceMissedSubcolumnsByConstants( +bool replaceMissedSubcolumnsByConstants( const ColumnsDescription & expected_columns, const ColumnsDescription & available_columns, QueryTreeNodePtr & query, const ContextPtr & context [[maybe_unused]]) { - MissingObjectList missed_list; + bool has_missing_objects = false; NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns); if (missed_names_types.empty()) - return missed_list; + return has_missing_objects; auto * query_node = query->as(); if (!query_node) - return missed_list; - - missed_list.reserve(missed_names_types.size()); + return has_missing_objects; auto table_expression = extractLeftTableExpression(query_node->getJoinTree()); @@ -987,12 +985,12 @@ MissingObjectList replaceMissedSubcolumnsByConstants( constant->setAlias(table_expression->getAlias() + "." + name); column_name_to_node[name] = buildCastFunction(constant, type, context); - missed_list.push_back({ constant->getValueStringRepresentation() + "_" + constant->getResultType()->getName(), table_expression->getAlias() + "." + name }); + has_missing_objects = true; } replaceColumns(query, table_expression, column_name_to_node); - return missed_list; + return has_missing_objects; } Field FieldVisitorReplaceScalars::operator()(const Array & x) const diff --git a/src/DataTypes/ObjectUtils.h b/src/DataTypes/ObjectUtils.h index 6ef19baf5ae..7b171056f06 100644 --- a/src/DataTypes/ObjectUtils.h +++ b/src/DataTypes/ObjectUtils.h @@ -100,9 +100,7 @@ void replaceMissedSubcolumnsByConstants( const ColumnsDescription & available_columns, ASTPtr query); -using MissingObjectList = std::vector>; - -MissingObjectList replaceMissedSubcolumnsByConstants( +bool replaceMissedSubcolumnsByConstants( const ColumnsDescription & expected_columns, const ColumnsDescription & available_columns, QueryTreeNodePtr & query, diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 4fccd83c8c0..ab301e01d0a 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -130,6 +130,7 @@ void SelectStreamFactory::createForShard( createForShardImpl( shard_info, query_ast, + {}, main_table, table_func_ptr, std::move(context), @@ -143,6 +144,7 @@ void SelectStreamFactory::createForShard( void SelectStreamFactory::createForShardImpl( const Cluster::ShardInfo & shard_info, const ASTPtr & query_ast, + const QueryTreeNodePtr & query_tree, const StorageID & main_table, const ASTPtr & table_func_ptr, ContextPtr context, @@ -151,13 +153,13 @@ void SelectStreamFactory::createForShardImpl( UInt32 shard_count, bool parallel_replicas_enabled, AdditionalShardFilterGenerator shard_filter_generator, - MissingObjectList missed_list) + bool has_missing_objects) { auto emplace_local_stream = [&]() { Block shard_header; if (context->getSettingsRef().allow_experimental_analyzer) - shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze()); + shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree, context, SelectQueryOptions(processed_stage).analyze()); else shard_header = header; @@ -169,15 +171,16 @@ void SelectStreamFactory::createForShardImpl( { Block shard_header; if (context->getSettingsRef().allow_experimental_analyzer) - shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze()); + shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree, context, SelectQueryOptions(processed_stage).analyze()); else shard_header = header; remote_shards.emplace_back(Shard{ .query = query_ast, + .query_tree = query_tree, .main_table = main_table, .header = shard_header, - .missing_object_list = std::move(missed_list), + .has_missing_objects = has_missing_objects, .shard_info = shard_info, .lazy = lazy, .local_delay = local_delay, @@ -300,15 +303,17 @@ void SelectStreamFactory::createForShard( auto it = objects_by_shard.find(shard_info.shard_num); QueryTreeNodePtr modified_query = query_tree; - MissingObjectList missed_list; + + bool has_missing_objects = false; if (it != objects_by_shard.end()) - missed_list = replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context); + has_missing_objects = replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context); auto query_ast = queryNodeToDistributedSelectQuery(modified_query); createForShardImpl( shard_info, query_ast, + modified_query, main_table, table_func_ptr, std::move(context), @@ -317,7 +322,7 @@ void SelectStreamFactory::createForShard( shard_count, parallel_replicas_enabled, std::move(shard_filter_generator), - std::move(missed_list)); + has_missing_objects); } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 61694830b3d..760281284fd 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -54,11 +54,13 @@ public: { /// Query and header may be changed depending on shard. ASTPtr query; + QueryTreeNodePtr query_tree; + /// Used to check the table existence on remote node StorageID main_table; Block header; - MissingObjectList missing_object_list; + bool has_missing_objects = false; Cluster::ShardInfo shard_info; @@ -110,6 +112,7 @@ private: void createForShardImpl( const Cluster::ShardInfo & shard_info, const ASTPtr & query_ast, + const QueryTreeNodePtr & query_tree, const StorageID & main_table, const ASTPtr & table_func_ptr, ContextPtr context, @@ -118,7 +121,7 @@ private: UInt32 shard_count, bool parallel_replicas_enabled, AdditionalShardFilterGenerator shard_filter_generator, - MissingObjectList missed_list = {}); + bool has_missing_objects = false); }; } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 72848a37f6e..8c455883ab2 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -218,7 +218,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream }; pipes.emplace_back(createDelayedPipe(shard.header, lazily_create_stream, add_totals, add_extremes)); - addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty()); + addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects); } void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard) @@ -299,7 +299,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact pipes.emplace_back( createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); - addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty()); + addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects); } } else @@ -328,7 +328,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact pipes.emplace_back( createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); - addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty()); + addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects); } } diff --git a/tests/queries/0_stateless/01455_opentelemetry_distributed.reference b/tests/queries/0_stateless/01455_opentelemetry_distributed.reference index 2920b387aa2..b04a3a5ea82 100644 --- a/tests/queries/0_stateless/01455_opentelemetry_distributed.reference +++ b/tests/queries/0_stateless/01455_opentelemetry_distributed.reference @@ -1,9 +1,9 @@ ===http=== {"query":"select 1 from remote('127.0.0.2', system, one) settings allow_experimental_analyzer = 1 format Null\n","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1} {"query":"DESC TABLE system.one","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1} -{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1`","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1} +{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1` SETTINGS allow_experimental_analyzer = 1","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1} {"query":"DESC TABLE system.one","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1} -{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1`","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1} +{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1` SETTINGS allow_experimental_analyzer = 1","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1} {"query":"select 1 from remote('127.0.0.2', system, one) settings allow_experimental_analyzer = 1 format Null\n","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1} {"total spans":"3","unique spans":"3","unique non-zero parent spans":"3"} {"initial query spans with proper parent":"2"}