Merge pull request #60497 from ClickHouse/analyzer-distr-json

Analyzer: Support different ObjectJSON on shards
This commit is contained in:
Dmitry Novik 2024-03-21 16:25:30 +01:00 committed by GitHub
commit e4209218d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 428 additions and 114 deletions

View File

@ -99,6 +99,11 @@ public:
return settings_changes;
}
void clearSettingsChanges()
{
settings_changes.clear();
}
/// Returns true if query node is subquery, false otherwise
bool isSubquery() const
{

View File

@ -1,3 +1,8 @@
#include <memory>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/Utils.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypeNothing.h>
@ -907,10 +912,10 @@ static void addConstantToWithClause(const ASTPtr & query, const String & column_
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
void replaceMissedSubcolumnsByConstants(
NamesAndTypes calculateMissedSubcolumns(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
ASTPtr query)
const ColumnsDescription & available_columns
)
{
NamesAndTypes missed_names_types;
@ -947,6 +952,18 @@ void replaceMissedSubcolumnsByConstants(
[](const auto & lhs, const auto & rhs) { return lhs.name < rhs.name; });
}
return missed_names_types;
}
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
ASTPtr query)
{
NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns);
if (missed_names_types.empty())
return;
@ -959,6 +976,42 @@ void replaceMissedSubcolumnsByConstants(
addConstantToWithClause(query, name, type);
}
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
bool replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,
const ContextPtr & context [[maybe_unused]])
{
bool has_missing_objects = false;
NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns);
if (missed_names_types.empty())
return has_missing_objects;
auto * query_node = query->as<QueryNode>();
if (!query_node)
return has_missing_objects;
auto table_expression = extractLeftTableExpression(query_node->getJoinTree());
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
for (const auto & [name, type] : missed_names_types)
{
auto constant = std::make_shared<ConstantNode>(type->getDefault(), type);
constant->setAlias(table_expression->getAlias() + "." + name);
column_name_to_node[name] = buildCastFunction(constant, type, context);
has_missing_objects = true;
}
replaceColumns(query, table_expression, column_name_to_node);
return has_missing_objects;
}
Field FieldVisitorReplaceScalars::operator()(const Array & x) const
{
if (num_dimensions_to_keep == 0)

View File

@ -15,6 +15,9 @@ struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
class ColumnsDescription;
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
/// Returns number of dimensions in Array type. 0 if type is not array.
size_t getNumberOfDimensions(const IDataType & type);
@ -98,6 +101,12 @@ void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & available_columns,
ASTPtr query);
bool replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,
const ContextPtr & context);
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
/// and replaces all scalars or nested arrays to @replacement at that level.
class FieldVisitorReplaceScalars : public StaticVisitor<Field>

View File

@ -5,6 +5,9 @@
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Planner/Utils.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
@ -124,18 +127,54 @@ void SelectStreamFactory::createForShard(
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
createForShardImpl(
shard_info,
query_ast,
{},
main_table,
table_func_ptr,
std::move(context),
local_plans,
remote_shards,
shard_count,
parallel_replicas_enabled,
std::move(shard_filter_generator));
}
void SelectStreamFactory::createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator,
bool has_missing_objects)
{
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, has_missing_objects));
};
auto emplace_remote_stream = [&](bool lazy = false, time_t local_delay = 0)
{
Block shard_header;
if (context->getSettingsRef().allow_experimental_analyzer)
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree, context, SelectQueryOptions(processed_stage).analyze());
else
shard_header = header;
remote_shards.emplace_back(Shard{
.query = query_ast,
.query_tree = query_tree,
.main_table = main_table,
.header = header,
.header = shard_header,
.has_missing_objects = has_missing_objects,
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
@ -243,6 +282,44 @@ void SelectStreamFactory::createForShard(
emplace_remote_stream();
}
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator)
{
auto it = objects_by_shard.find(shard_info.shard_num);
QueryTreeNodePtr modified_query = query_tree;
bool has_missing_objects = false;
if (it != objects_by_shard.end())
has_missing_objects = replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context);
auto query_ast = queryNodeToDistributedSelectQuery(modified_query);
createForShardImpl(
shard_info,
query_ast,
modified_query,
main_table,
table_func_ptr,
std::move(context),
local_plans,
remote_shards,
shard_count,
parallel_replicas_enabled,
std::move(shard_filter_generator),
has_missing_objects);
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Client/ConnectionPool.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Cluster.h>
@ -50,10 +51,14 @@ public:
{
/// Query and header may be changed depending on shard.
ASTPtr query;
QueryTreeNodePtr query_tree;
/// Used to check the table existence on remote node
StorageID main_table;
Block header;
bool has_missing_objects = false;
Cluster::ShardInfo shard_info;
/// If we connect to replicas lazily.
@ -83,10 +88,37 @@ public:
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
void createForShard(
const Cluster::ShardInfo & shard_info,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
private:
void createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator,
bool has_missing_objects = false);
};
}

View File

@ -204,12 +204,10 @@ void executeQuery(
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
LoggerPtr log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator)
{
@ -218,6 +216,8 @@ void executeQuery(
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded");
const ClusterPtr & not_optimized_cluster = query_info.cluster;
std::vector<QueryPlanPtr> plans;
SelectStreamFactory::Shards remote_shards;
@ -237,40 +237,81 @@ void executeQuery(
new_context->increaseDistributedDepth();
const size_t shards = cluster->getShardCount();
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
if (context->getSettingsRef().allow_experimental_analyzer)
{
const auto & shard_info = cluster->getShardsInfo()[i];
ASTPtr query_ast_for_shard = query_ast->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
const auto & shard_info = cluster->getShardsInfo()[i];
auto query_for_shard = query_info.query_tree->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
optimizeShardingKeyRewriteIn(query_for_shard, std::move(visitor_data), new_context);
}
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
stream_factory.createForShard(
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
query_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
}
}
else
{
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
{
const auto & shard_info = cluster->getShardsInfo()[i];
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
ASTPtr query_ast_for_shard = query_info.query->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
}
}
if (!remote_shards.empty())

View File

@ -58,12 +58,10 @@ void executeQuery(
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
LoggerPtr log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator);

View File

@ -1,11 +1,18 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
namespace
{
@ -119,4 +126,66 @@ void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & d
}
}
class OptimizeShardingKeyRewriteIn : public InDepthQueryTreeVisitorWithContext<OptimizeShardingKeyRewriteIn>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<OptimizeShardingKeyRewriteIn>;
OptimizeShardingKeyRewriteIn(OptimizeShardingKeyRewriteInVisitor::Data data_, ContextPtr context)
: Base(std::move(context))
, data(std::move(data_))
{}
void enterImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "in")
return;
auto & arguments = function_node->getArguments().getNodes();
auto * column = arguments[0]->as<ColumnNode>();
if (!column)
return;
auto name = column->getColumnName();
if (!data.sharding_key_expr->getRequiredColumnsWithTypes().contains(column->getColumnName()))
return;
if (auto * constant = arguments[1]->as<ConstantNode>())
{
if (isTuple(constant->getResultType()))
{
const auto & tuple = constant->getValue().get<Tuple &>();
Tuple new_tuple;
new_tuple.reserve(tuple.size());
for (const auto & child : tuple)
{
if (shardContains(child, name, data))
new_tuple.push_back(child);
}
if (new_tuple.empty())
new_tuple.push_back(tuple.back());
if (new_tuple.size() == tuple.size())
return;
arguments[1] = std::make_shared<ConstantNode>(new_tuple);
rerunFunctionResolve(function_node, getContext());
}
}
}
OptimizeShardingKeyRewriteInVisitor::Data data;
};
void optimizeShardingKeyRewriteIn(QueryTreeNodePtr & node, OptimizeShardingKeyRewriteInVisitor::Data data, ContextPtr context)
{
OptimizeShardingKeyRewriteIn visitor(std::move(data), std::move(context));
visitor.visit(node);
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/Cluster.h>
@ -44,4 +45,6 @@ struct OptimizeShardingKeyRewriteInMatcher
using OptimizeShardingKeyRewriteInVisitor = InDepthNodeVisitor<OptimizeShardingKeyRewriteInMatcher, true>;
void optimizeShardingKeyRewriteIn(QueryTreeNodePtr & node, OptimizeShardingKeyRewriteInVisitor::Data data, ContextPtr context);
}

View File

@ -14,12 +14,14 @@ namespace DB
namespace
{
void addConvertingActions(QueryPlan & plan, const Block & header)
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto get_converting_dag = [](const Block & block_, const Block & header_)
auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
@ -27,7 +29,7 @@ void addConvertingActions(QueryPlan & plan, const Block & header)
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
mode,
true);
};
@ -44,7 +46,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t shard_num,
size_t shard_count)
size_t shard_count,
bool has_missing_objects)
{
checkStackSize();
@ -74,7 +77,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
interpreter.buildQueryPlan(*query_plan);
}
addConvertingActions(*query_plan, header);
addConvertingActions(*query_plan, header, has_missing_objects);
return query_plan;
}

View File

@ -18,5 +18,6 @@ std::unique_ptr<QueryPlan> createLocalPlan(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t shard_num,
size_t shard_count);
size_t shard_count,
bool has_missing_objects);
}

View File

@ -11,6 +11,7 @@
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Interpreters/ActionsDAG.h>
#include <Common/logger_useful.h>
#include <Common/checkStackSize.h>
@ -31,12 +32,14 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static void addConvertingActions(Pipe & pipe, const Block & header)
static void addConvertingActions(Pipe & pipe, const Block & header, bool use_positions_to_match = false)
{
if (blocksHaveEqualStructure(pipe.getHeader(), header))
return;
auto get_converting_dag = [](const Block & block_, const Block & header_)
auto match_mode = use_positions_to_match ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode = match_mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
@ -44,10 +47,13 @@ static void addConvertingActions(Pipe & pipe, const Block & header)
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
mode,
true);
};
if (use_positions_to_match)
pipe.addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
auto convert_actions = std::make_shared<ExpressionActions>(get_converting_dag(pipe.getHeader(), header));
pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr
{
@ -188,7 +194,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(
query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count);
query, header, my_context, my_stage, my_shard.shard_info.shard_num, my_shard_count, my_shard.has_missing_objects);
return std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(my_context),
@ -216,7 +222,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
};
pipes.emplace_back(createDelayedPipe(shard.header, lazily_create_stream, add_totals, add_extremes));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
@ -281,7 +287,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool,
query_string,
output_stream->header,
shard.header,
context,
throttler,
scalars,
@ -297,7 +303,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
}
else
@ -305,7 +311,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
const String query_string = formattedAST(shard.query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
if (context->canUseTaskBasedParallelReplicas())
@ -326,7 +332,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), output_stream->header, shard.has_missing_objects);
}
}

View File

@ -847,7 +847,8 @@ void StorageDistributed::read(
const size_t /*num_streams*/)
{
Block header;
ASTPtr query_ast;
SelectQueryInfo modified_query_info = query_info;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
@ -855,7 +856,7 @@ void StorageDistributed::read(
if (!remote_table_function_ptr)
remote_storage_id = StorageID{remote_database, remote_table};
auto query_tree_distributed = buildQueryTreeDistributed(query_info,
auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info,
storage_snapshot,
remote_storage_id,
remote_table_function_ptr);
@ -865,20 +866,24 @@ void StorageDistributed::read(
*/
for (auto & column : header)
column.column = column.column->convertToFullColumnIfConst();
query_ast = queryNodeToDistributedSelectQuery(query_tree_distributed);
modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed);
modified_query_info.query_tree = std::move(query_tree_distributed);
}
else
{
header = InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
query_ast = query_info.query;
header = InterpreterSelectQuery(modified_query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_ast,
remote_database, remote_table, remote_table_function_ptr);
if (!local_context->getSettingsRef().allow_experimental_analyzer)
{
modified_query_info.query = ClusterProxy::rewriteSelectQuery(
local_context, modified_query_info.query,
remote_database, remote_table, remote_table_function_ptr);
}
/// Return directly (with correct header) if no shard to query.
if (query_info.getCluster()->getShardsInfo().empty())
if (modified_query_info.getCluster()->getShardsInfo().empty())
{
if (local_context->getSettingsRef().allow_experimental_analyzer)
return;
@ -906,7 +911,7 @@ void StorageDistributed::read(
const auto & settings = local_context->getSettingsRef();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (local_context->canUseParallelReplicasCustomKey(*query_info.getCluster()))
if (local_context->canUseParallelReplicasCustomKey(*modified_query_info.getCluster()))
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
@ -915,7 +920,7 @@ void StorageDistributed::read(
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
context = local_context,
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
replica_count = modified_query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
@ -931,12 +936,10 @@ void StorageDistributed::read(
remote_table_function_ptr,
select_stream_factory,
log,
modified_query_ast,
local_context,
query_info,
modified_query_info,
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
distributed_settings,
additional_shard_filter_generator);

View File

@ -1,26 +1,27 @@
#include <Storages/buildQueryTreeForShard.h>
#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/Utils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/removeGroupingFunctionSpecializations.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Planner/Utils.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
@ -384,6 +385,12 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
createUniqueTableAliases(query_tree_to_modify, nullptr, planner_context->getQueryContext());
// Get rid of the settings clause so we don't send them to remote. Thus newly non-important
// settings won't break any remote parser. It's also more reasonable since the query settings
// are written into the query context and will be sent by the query pipeline.
if (auto * query_node = query_tree_to_modify->as<QueryNode>())
query_node->clearSettingsChanges();
return query_tree_to_modify;
}

View File

@ -1,2 +1 @@
test_concurrent_backups_s3/test.py::test_concurrent_backups
test_distributed_type_object/test.py::test_distributed_type_object

View File

@ -59,7 +59,14 @@ def test_distributed_type_object(started_cluster):
)
expected = TSV("120\n")
assert TSV(node1.query("SELECT sum(data.k2 * id) FROM dist_table")) == expected
assert (
TSV(
node1.query(
"SELECT sum(data.k2 * id) FROM dist_table SETTINGS optimize_arithmetic_operations_in_aggregate_functions = 0"
)
)
== expected
)
node1.query("TRUNCATE TABLE local_table")
node2.query("TRUNCATE TABLE local_table")
@ -78,10 +85,11 @@ def test_distributed_type_object(started_cluster):
3\t\t\t\tfoo"""
)
# The following query is not supported by analyzer now
assert (
TSV(
node1.query(
"SELECT id, data.k1, data.k2.k3, data.k2.k4, data.k5 FROM dist_table ORDER BY id"
"SELECT id, data.k1, data.k2.k3, data.k2.k4, data.k5 FROM dist_table ORDER BY id SETTINGS allow_experimental_analyzer = 0"
)
)
== expected

View File

@ -31,7 +31,7 @@
29 2j&S)ba?XG QuQj 17163829389637435056
3 UlI+1 14144472852965836438
=============== QUERIES EXECUTED BY PARALLEL INNER QUERY ALONE ===============
0 3 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2`, toUInt64(min(`__table1`.`time`)) AS `start_ts` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2` ORDER BY `__table1`.`key` ASC, `__table1`.`value1` ASC, `__table1`.`value2` ASC LIMIT _CAST(10, \'UInt64\') SETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer = 1
0 3 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2`, toUInt64(min(`__table1`.`time`)) AS `start_ts` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2` ORDER BY `__table1`.`key` ASC, `__table1`.`value1` ASC, `__table1`.`value2` ASC LIMIT _CAST(10, \'UInt64\')
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2` ORDER BY `key` ASC, `value1` ASC, `value2` ASC LIMIT 10
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;
1 1 -- Parallel inner query alone\nSELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\nFROM join_inner_table\nPREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\nGROUP BY key, value1, value2\nORDER BY key, value1, value2\nLIMIT 10\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=1;

View File

@ -9,4 +9,4 @@
7885388429666205427
8124171311239967992
1 1 -- Simple query with analyzer and pure parallel replicas\nSELECT number\nFROM join_inner_table__fuzz_146_replicated\n SETTINGS\n allow_experimental_analyzer = 1,\n max_parallel_replicas = 2,\n cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\',\n allow_experimental_parallel_reading_from_replicas = 1;
0 2 SELECT `__table1`.`number` AS `number` FROM `default`.`join_inner_table__fuzz_146_replicated` AS `__table1` SETTINGS allow_experimental_analyzer = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\', allow_experimental_parallel_reading_from_replicas = 1
0 2 SELECT `__table1`.`number` AS `number` FROM `default`.`join_inner_table__fuzz_146_replicated` AS `__table1`

View File

@ -25,8 +25,8 @@ simple (global) join with analyzer and parallel replicas
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level = 'trace', max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level = 'trace', max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (local) join with analyzer and parallel replicas
@ -40,8 +40,8 @@ simple (local) join with analyzer and parallel replicas
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (local) join with analyzer and parallel replicas and full sorting merge join
@ -55,8 +55,8 @@ simple (local) join with analyzer and parallel replicas and full sorting merge j
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer
@ -82,8 +82,8 @@ nested join with analyzer and parallel replicas, both local
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both global
@ -97,11 +97,11 @@ nested join with analyzer and parallel replicas, both global
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
nested join with analyzer and parallel replicas, global + local
@ -115,11 +115,11 @@ nested join with analyzer and parallel replicas, global + local
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting merge join
@ -133,11 +133,11 @@ nested join with analyzer and parallel replicas, both local, both full sorting m
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting and hash join
@ -151,11 +151,11 @@ nested join with analyzer and parallel replicas, both local, both full sorting a
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting and hash join
@ -169,9 +169,9 @@ nested join with analyzer and parallel replicas, both local, both full sorting a
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done