This commit is contained in:
Dmitry Novik 2024-03-14 18:29:07 +01:00
parent 90669ef487
commit cd912074a5
6 changed files with 29 additions and 25 deletions

View File

@ -959,24 +959,22 @@ void replaceMissedSubcolumnsByConstants(
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
MissingObjectList replaceMissedSubcolumnsByConstants(
bool replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,
const ContextPtr & context [[maybe_unused]])
{
MissingObjectList missed_list;
bool has_missing_objects = false;
NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns);
if (missed_names_types.empty())
return missed_list;
return has_missing_objects;
auto * query_node = query->as<QueryNode>();
if (!query_node)
return missed_list;
missed_list.reserve(missed_names_types.size());
return has_missing_objects;
auto table_expression = extractLeftTableExpression(query_node->getJoinTree());
@ -987,12 +985,12 @@ MissingObjectList replaceMissedSubcolumnsByConstants(
constant->setAlias(table_expression->getAlias() + "." + name);
column_name_to_node[name] = buildCastFunction(constant, type, context);
missed_list.push_back({ constant->getValueStringRepresentation() + "_" + constant->getResultType()->getName(), table_expression->getAlias() + "." + name });
has_missing_objects = true;
}
replaceColumns(query, table_expression, column_name_to_node);
return missed_list;
return has_missing_objects;
}
Field FieldVisitorReplaceScalars::operator()(const Array & x) const

View File

@ -100,9 +100,7 @@ void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & available_columns,
ASTPtr query);
using MissingObjectList = std::vector<std::pair<String, String>>;
MissingObjectList replaceMissedSubcolumnsByConstants(
bool replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,

View File

@ -130,6 +130,7 @@ void SelectStreamFactory::createForShard(
createForShardImpl(
shard_info,
query_ast,
{},
main_table,
table_func_ptr,
std::move(context),
@ -143,6 +144,7 @@ void SelectStreamFactory::createForShard(
void SelectStreamFactory::createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
@ -151,13 +153,13 @@ void SelectStreamFactory::createForShardImpl(
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator,
MissingObjectList missed_list)
bool has_missing_objects)
{
auto emplace_local_stream = [&]()
{
Block shard_header;
if (context->getSettingsRef().allow_experimental_analyzer)
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze());
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree, context, SelectQueryOptions(processed_stage).analyze());
else
shard_header = header;
@ -169,15 +171,16 @@ void SelectStreamFactory::createForShardImpl(
{
Block shard_header;
if (context->getSettingsRef().allow_experimental_analyzer)
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze());
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree, context, SelectQueryOptions(processed_stage).analyze());
else
shard_header = header;
remote_shards.emplace_back(Shard{
.query = query_ast,
.query_tree = query_tree,
.main_table = main_table,
.header = shard_header,
.missing_object_list = std::move(missed_list),
.has_missing_objects = has_missing_objects,
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
@ -300,15 +303,17 @@ void SelectStreamFactory::createForShard(
auto it = objects_by_shard.find(shard_info.shard_num);
QueryTreeNodePtr modified_query = query_tree;
MissingObjectList missed_list;
bool has_missing_objects = false;
if (it != objects_by_shard.end())
missed_list = replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context);
has_missing_objects = replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context);
auto query_ast = queryNodeToDistributedSelectQuery(modified_query);
createForShardImpl(
shard_info,
query_ast,
modified_query,
main_table,
table_func_ptr,
std::move(context),
@ -317,7 +322,7 @@ void SelectStreamFactory::createForShard(
shard_count,
parallel_replicas_enabled,
std::move(shard_filter_generator),
std::move(missed_list));
has_missing_objects);
}

View File

@ -54,11 +54,13 @@ public:
{
/// Query and header may be changed depending on shard.
ASTPtr query;
QueryTreeNodePtr query_tree;
/// Used to check the table existence on remote node
StorageID main_table;
Block header;
MissingObjectList missing_object_list;
bool has_missing_objects = false;
Cluster::ShardInfo shard_info;
@ -110,6 +112,7 @@ private:
void createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
@ -118,7 +121,7 @@ private:
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator,
MissingObjectList missed_list = {});
bool has_missing_objects = false);
};
}

View File

@ -218,7 +218,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
};
pipes.emplace_back(createDelayedPipe(shard.header, lazily_create_stream, add_totals, add_extremes));
addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty());
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
@ -299,7 +299,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty());
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
}
else
@ -328,7 +328,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header, !shard.missing_object_list.empty());
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
}

View File

@ -1,9 +1,9 @@
===http===
{"query":"select 1 from remote('127.0.0.2', system, one) settings allow_experimental_analyzer = 1 format Null\n","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1}
{"query":"DESC TABLE system.one","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1}
{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1`","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1}
{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1` SETTINGS allow_experimental_analyzer = 1","status":"QueryFinish","tracestate":"some custom state","sorted_by_start_time":1}
{"query":"DESC TABLE system.one","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1`","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"query":"SELECT 1 AS `1` FROM `system`.`one` AS `__table1` SETTINGS allow_experimental_analyzer = 1","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"query":"select 1 from remote('127.0.0.2', system, one) settings allow_experimental_analyzer = 1 format Null\n","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"total spans":"3","unique spans":"3","unique non-zero parent spans":"3"}
{"initial query spans with proper parent":"2"}