WIP on different JSONs on shards

This commit is contained in:
Dmitry Novik 2024-02-28 15:30:06 +01:00
parent 524a2ca72d
commit 0de2d766fa
12 changed files with 325 additions and 65 deletions

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Analyzer/IdentifierNode.h>
#include <Common/assert_cast.h>
@ -56,13 +57,18 @@ void IdentifierNode::updateTreeHashImpl(HashState & state) const
QueryTreeNodePtr IdentifierNode::cloneImpl() const
{
return std::make_shared<IdentifierNode>(identifier);
auto result = std::make_shared<IdentifierNode>(identifier);
result->use_parts_for_to_ast = use_parts_for_to_ast;
return result;
}
ASTPtr IdentifierNode::toASTImpl(const ConvertToASTOptions & /* options */) const
{
auto identifier_parts = identifier.getParts();
return std::make_shared<ASTIdentifier>(std::move(identifier_parts));
if (use_parts_for_to_ast)
return std::make_shared<ASTIdentifier>(std::move(identifier_parts));
else
return std::make_shared<ASTIdentifier>(identifier.getFullName());
}
}

View File

@ -52,6 +52,11 @@ public:
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
void useFullNameInToAST()
{
use_parts_for_to_ast = false;
}
protected:
bool isEqualImpl(const IQueryTreeNode & rhs) const override;
@ -64,6 +69,7 @@ protected:
private:
Identifier identifier;
std::optional<TableExpressionModifiers> table_expression_modifiers;
bool use_parts_for_to_ast = false;
static constexpr size_t children_size = 0;
};

View File

@ -1,3 +1,4 @@
#include <memory>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypeNothing.h>
@ -20,6 +21,16 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <IO/Operators.h>
#include "Analyzer/ConstantNode.h"
#include "Analyzer/FunctionNode.h"
#include "Analyzer/IQueryTreeNode.h"
#include "Analyzer/Identifier.h"
#include "Analyzer/IdentifierNode.h"
#include "Analyzer/QueryNode.h"
#include "Analyzer/Utils.h"
#include <Functions/FunctionFactory.h>
#include <Poco/Logger.h>
#include "Common/logger_useful.h"
namespace DB
@ -888,10 +899,10 @@ static void addConstantToWithClause(const ASTPtr & query, const String & column_
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
void replaceMissedSubcolumnsByConstants(
NamesAndTypes calculateMissedSubcolumns(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
ASTPtr query)
const ColumnsDescription & available_columns
)
{
NamesAndTypes missed_names_types;
@ -928,6 +939,18 @@ void replaceMissedSubcolumnsByConstants(
[](const auto & lhs, const auto & rhs) { return lhs.name < rhs.name; });
}
return missed_names_types;
}
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
ASTPtr query)
{
NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns);
if (missed_names_types.empty())
return;
@ -940,6 +963,52 @@ void replaceMissedSubcolumnsByConstants(
addConstantToWithClause(query, name, type);
}
/// @expected_columns and @available_columns contain descriptions
/// of extended Object columns.
void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,
const ContextPtr & context [[maybe_unused]])
{
NamesAndTypes missed_names_types = calculateMissedSubcolumns(expected_columns, available_columns);
if (missed_names_types.empty())
return;
auto * query_node = query->as<QueryNode>();
if (!query_node)
return;
auto table_expression = extractLeftTableExpression(query_node->getJoinTree());
auto & with_nodes = query_node->getWith().getNodes();
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
for (const auto & [name, type] : missed_names_types)
{
auto constant = std::make_shared<ConstantNode>(type->getDefault(), type);
constant->setAlias(table_expression->getAlias() + name);
// auto materialize = std::make_shared<FunctionNode>("materialize");
// auto function = FunctionFactory::instance().get("materialize", context);
// materialize->getArguments().getNodes() = { constant };
// materialize->resolveAsFunction(function->build(materialize->getArgumentColumns()));
// materialize->setAlias(name);
with_nodes.push_back(constant);
auto id = std::make_shared<IdentifierNode>(Identifier(table_expression->getAlias() + name));
id->useFullNameInToAST();
column_name_to_node[name] = id;
LOG_DEBUG(&Poco::Logger::get("replaceMissedSubcolumnsByConstants"), "Name {} Expression\n{}", name, column_name_to_node[name]->dumpTree());
}
LOG_DEBUG(&Poco::Logger::get("replaceMissedSubcolumnsByConstants"), "Table expression\n{} ", table_expression->dumpTree());
replaceColumns(query, table_expression, column_name_to_node);
LOG_DEBUG(&Poco::Logger::get("replaceMissedSubcolumnsByConstants"), "Result:\n{} ", query->dumpTree());
}
Field FieldVisitorReplaceScalars::operator()(const Array & x) const
{
if (num_dimensions_to_keep == 0)

View File

@ -3,6 +3,8 @@
#include <Core/Block.h>
#include <Core/NamesAndTypes.h>
#include <Common/FieldVisitors.h>
#include "Analyzer/IQueryTreeNode.h"
#include "Interpreters/Context_fwd.h"
#include <Storages/ColumnsDescription.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
@ -14,6 +16,9 @@ namespace DB
struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
/// Returns number of dimensions in Array type. 0 if type is not array.
size_t getNumberOfDimensions(const IDataType & type);
@ -97,6 +102,12 @@ void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & available_columns,
ASTPtr query);
void replaceMissedSubcolumnsByConstants(
const ColumnsDescription & expected_columns,
const ColumnsDescription & available_columns,
QueryTreeNodePtr & query,
const ContextPtr & context);
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
/// and replaces all scalars or nested arrays to @replacement at that level.
class FieldVisitorReplaceScalars : public StaticVisitor<Field>

View File

@ -5,6 +5,10 @@
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include "Analyzer/IQueryTreeNode.h"
#include "Interpreters/InterpreterSelectQueryAnalyzer.h"
#include "Interpreters/SelectQueryOptions.h"
#include "Planner/Utils.h"
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
@ -124,18 +128,55 @@ void SelectStreamFactory::createForShard(
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
createForShardImpl(
shard_info,
query_ast,
main_table,
table_func_ptr,
std::move(context),
local_plans,
remote_shards,
shard_count,
parallel_replicas_enabled,
std::move(shard_filter_generator));
}
void SelectStreamFactory::createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator)
{
auto emplace_local_stream = [&]()
{
Block shard_header;
if (context->getSettingsRef().allow_experimental_analyzer)
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze());
else
shard_header = header;
local_plans.emplace_back(createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
query_ast, shard_header, context, processed_stage, shard_info.shard_num, shard_count));
};
auto emplace_remote_stream = [&](bool lazy = false, time_t local_delay = 0)
{
Block shard_header;
if (context->getSettingsRef().allow_experimental_analyzer)
shard_header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, context, SelectQueryOptions(processed_stage).analyze());
else
shard_header = header;
remote_shards.emplace_back(Shard{
.query = query_ast,
.main_table = main_table,
.header = header,
.header = shard_header,
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
@ -243,6 +284,40 @@ void SelectStreamFactory::createForShard(
emplace_remote_stream();
}
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator)
{
auto it = objects_by_shard.find(shard_info.shard_num);
QueryTreeNodePtr modified_query = query_tree;
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, modified_query, context);
auto query_ast = queryNodeToDistributedSelectQuery(modified_query);
createForShardImpl(
shard_info,
query_ast,
main_table,
table_func_ptr,
std::move(context),
local_plans,
remote_shards,
shard_count,
parallel_replicas_enabled,
std::move(shard_filter_generator));
}
}
}

View File

@ -7,6 +7,7 @@
#include <Parsers/IAST.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageSnapshot.h>
#include "Analyzer/IQueryTreeNode.h"
namespace DB
{
@ -83,10 +84,35 @@ public:
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
void createForShard(
const Cluster::ShardInfo & shard_info,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
private:
void createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
};
}

View File

@ -204,12 +204,10 @@ void executeQuery(
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
LoggerPtr log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator)
{
@ -218,6 +216,8 @@ void executeQuery(
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded");
const ClusterPtr & not_optimized_cluster = query_info.cluster;
std::vector<QueryPlanPtr> plans;
SelectStreamFactory::Shards remote_shards;
@ -237,40 +237,81 @@ void executeQuery(
new_context->increaseDistributedDepth();
const size_t shards = cluster->getShardCount();
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
if (context->getSettingsRef().allow_experimental_analyzer)
{
const auto & shard_info = cluster->getShardsInfo()[i];
ASTPtr query_ast_for_shard = query_ast->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
const auto & shard_info = cluster->getShardsInfo()[i];
auto query_for_shard = query_info.query_tree->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
optimizeShardingKeyRewriteIn(query_for_shard, std::move(visitor_data), new_context);
}
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
stream_factory.createForShard(
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
query_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
}
}
else
{
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
{
const auto & shard_info = cluster->getShardsInfo()[i];
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
ASTPtr query_ast_for_shard = query_info.query->clone();
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseTaskBasedParallelReplicas();
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled,
shard_filter_generator);
}
}
if (!remote_shards.empty())

View File

@ -58,12 +58,10 @@ void executeQuery(
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
LoggerPtr log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator);

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTFunction.h>
@ -11,6 +12,7 @@
#include "Analyzer/IQueryTreeNode.h"
#include "Analyzer/InDepthQueryTreeVisitor.h"
#include "DataTypes/IDataType.h"
#include "Interpreters/Context_fwd.h"
namespace
{
@ -126,11 +128,15 @@ void OptimizeShardingKeyRewriteInMatcher::visit(ASTFunction & function, Data & d
}
class OptimizeShardingKeyRewriteIn : InDepthQueryTreeVisitorWithContext<OptimizeShardingKeyRewriteIn>
class OptimizeShardingKeyRewriteIn : public InDepthQueryTreeVisitorWithContext<OptimizeShardingKeyRewriteIn>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<OptimizeShardingKeyRewriteIn>;
using Base::Base;
OptimizeShardingKeyRewriteIn(OptimizeShardingKeyRewriteInVisitor::Data data_, ContextPtr context)
: Base(std::move(context))
, data(std::move(data_))
{}
void enterImpl(QueryTreeNodePtr & node)
{
@ -143,6 +149,8 @@ public:
if (!column)
return;
auto name = column->getColumnName();
if (!data.sharding_key_expr->getRequiredColumnsWithTypes().contains(column->getColumnName()))
return;
@ -150,17 +158,30 @@ public:
{
if (isTuple(constant->getResultType()))
{
auto & tuple = constant->getValue().get<Tuple &>();
std::erase_if(tuple, [&](auto & child)
const auto & tuple = constant->getValue().get<Tuple &>();
Tuple new_tuple;
new_tuple.reserve(tuple.size());
for (const auto & child : tuple)
{
return tuple.size() > 1 && !shardContains(child, name, data);
});
if (shardContains(child, name, data))
new_tuple.push_back(child);
}
if (new_tuple.empty())
new_tuple.push_back(tuple.back());
node = std::make_shared<ConstantNode>(new_tuple);
}
}
}
OptimizeShardingKeyRewriteInMatcher::Data data;
OptimizeShardingKeyRewriteInVisitor::Data data;
};
void optimizeShardingKeyRewriteIn(QueryTreeNodePtr & node, OptimizeShardingKeyRewriteInVisitor::Data data, ContextPtr context)
{
OptimizeShardingKeyRewriteIn visitor(std::move(data), std::move(context));
visitor.visit(node);
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/Cluster.h>
#include "Analyzer/IQueryTreeNode.h"
namespace DB
{
@ -44,4 +45,6 @@ struct OptimizeShardingKeyRewriteInMatcher
using OptimizeShardingKeyRewriteInVisitor = InDepthNodeVisitor<OptimizeShardingKeyRewriteInMatcher, true>;
void optimizeShardingKeyRewriteIn(QueryTreeNodePtr & node, OptimizeShardingKeyRewriteInVisitor::Data data, ContextPtr context);
}

View File

@ -216,7 +216,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
};
pipes.emplace_back(createDelayedPipe(shard.header, lazily_create_stream, add_totals, add_extremes));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), shard.header);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
@ -281,7 +281,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool,
query_string,
output_stream->header,
shard.header,
context,
throttler,
scalars,
@ -297,7 +297,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), shard.header);
}
}
else
@ -305,7 +305,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
const String query_string = formattedAST(shard.query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
if (context->canUseTaskBasedParallelReplicas())
@ -326,7 +326,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
addConvertingActions(pipes.back(), shard.header);
}
}

View File

@ -30,6 +30,7 @@
#include <Common/randomSeed.h>
#include <Common/formatReadable.h>
#include <Common/CurrentMetrics.h>
#include "Analyzer/IQueryTreeNode.h"
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -813,7 +814,8 @@ void StorageDistributed::read(
const size_t /*num_streams*/)
{
Block header;
ASTPtr query_ast;
SelectQueryInfo modified_query_info = query_info;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
@ -821,7 +823,7 @@ void StorageDistributed::read(
if (!remote_table_function_ptr)
remote_storage_id = StorageID{remote_database, remote_table};
auto query_tree_distributed = buildQueryTreeDistributed(query_info,
auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info,
storage_snapshot,
remote_storage_id,
remote_table_function_ptr);
@ -831,20 +833,24 @@ void StorageDistributed::read(
*/
for (auto & column : header)
column.column = column.column->convertToFullColumnIfConst();
query_ast = queryNodeToDistributedSelectQuery(query_tree_distributed);
modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed);
modified_query_info.query_tree = std::move(query_tree_distributed);
}
else
{
header = InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
query_ast = query_info.query;
header = InterpreterSelectQuery(modified_query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_ast,
remote_database, remote_table, remote_table_function_ptr);
if (!local_context->getSettingsRef().allow_experimental_analyzer)
{
modified_query_info.query = ClusterProxy::rewriteSelectQuery(
local_context, modified_query_info.query,
remote_database, remote_table, remote_table_function_ptr);
}
/// Return directly (with correct header) if no shard to query.
if (query_info.getCluster()->getShardsInfo().empty())
if (modified_query_info.getCluster()->getShardsInfo().empty())
{
if (local_context->getSettingsRef().allow_experimental_analyzer)
return;
@ -872,7 +878,7 @@ void StorageDistributed::read(
const auto & settings = local_context->getSettingsRef();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (local_context->canUseParallelReplicasCustomKey(*query_info.getCluster()))
if (local_context->canUseParallelReplicasCustomKey(*modified_query_info.getCluster()))
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
@ -881,7 +887,7 @@ void StorageDistributed::read(
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
context = local_context,
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
replica_count = modified_query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
@ -897,12 +903,10 @@ void StorageDistributed::read(
remote_table_function_ptr,
select_stream_factory,
log,
modified_query_ast,
local_context,
query_info,
modified_query_info,
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
distributed_settings,
additional_shard_filter_generator);