Review fixes

This commit is contained in:
Antonio Andelic 2024-07-02 16:33:06 +02:00
parent 39932938e2
commit cfa86b54ea
10 changed files with 71 additions and 39 deletions

View File

@ -228,6 +228,35 @@ static ThrottlerPtr getThrottler(const ContextPtr & context)
return throttler;
}
AdditionalShardFilterGenerator
getShardFilterGeneratorForCustomKey(const Cluster & cluster, ContextPtr context, const ColumnsDescription & columns)
{
if (!context->canUseParallelReplicasCustomKeyForCluster(cluster))
return {};
const auto & settings = context->getSettingsRef();
auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *context);
if (custom_key_ast == nullptr)
return {};
return [my_custom_key_ast = std::move(custom_key_ast),
column_description = columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
custom_key_range_lower = settings.parallel_replicas_custom_key_range_lower.value,
custom_key_range_upper = settings.parallel_replicas_custom_key_range_upper.value,
query_context = context,
replica_count = cluster.getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count,
replica_num - 1,
my_custom_key_ast,
{custom_key_type, custom_key_range_lower, custom_key_range_upper},
column_description,
query_context);
};
}
void executeQuery(
QueryPlan & query_plan,
@ -239,43 +268,17 @@ void executeQuery(
LoggerPtr log,
ContextPtr context,
const SelectQueryInfo & query_info,
const ColumnsDescription & columns,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator,
bool is_remote_function)
{
const Settings & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception(ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH, "Maximum distributed depth exceeded");
ClusterProxy::AdditionalShardFilterGenerator shard_filter_generator;
if (context->canUseParallelReplicasCustomKeyForCluster(*query_info.getCluster()))
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *context))
{
shard_filter_generator =
[my_custom_key_ast = std::move(custom_key_ast),
column_description = columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
custom_key_range_lower = settings.parallel_replicas_custom_key_range_lower.value,
custom_key_range_upper = settings.parallel_replicas_custom_key_range_upper.value,
query_context = context,
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
replica_count,
replica_num - 1,
my_custom_key_ast,
{custom_key_type, custom_key_range_lower, custom_key_range_upper},
column_description,
query_context);
};
}
}
const ClusterPtr & not_optimized_cluster = query_info.cluster;
std::vector<QueryPlanPtr> plans;
@ -599,6 +602,8 @@ void executeQueryWithParallelReplicasCustomKey(
ClusterProxy::SelectStreamFactory select_stream_factory
= ClusterProxy::SelectStreamFactory(header, columns_object, snapshot, processed_stage);
auto shard_filter_generator = getShardFilterGeneratorForCustomKey(*query_info.getCluster(), context, columns);
ClusterProxy::executeQuery(
query_plan,
header,
@ -609,11 +614,11 @@ void executeQueryWithParallelReplicasCustomKey(
getLogger("executeQueryWithParallelReplicasCustomKey"),
context,
query_info,
columns,
/*sharding_key_expr=*/nullptr,
/*sharding_key_column_name=*/{},
/*distributed_settings=*/{},
/*is_remote_function= */ false);
shard_filter_generator,
/*is_remote_function=*/false);
}
void executeQueryWithParallelReplicasCustomKey(

View File

@ -52,6 +52,9 @@ class SelectStreamFactory;
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table);
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
AdditionalShardFilterGenerator
getShardFilterGeneratorForCustomKey(const Cluster & cluster, ContextPtr context, const ColumnsDescription & columns);
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query
/// (currently SELECT, DESCRIBE).
@ -65,10 +68,10 @@ void executeQuery(
LoggerPtr log,
ContextPtr context,
const SelectQueryInfo & query_info,
const ColumnsDescription & columns,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator,
bool is_remote_function);
void executeQueryWithParallelReplicas(

View File

@ -461,6 +461,9 @@ protected:
/// mutation tasks of one mutation executed against different parts of the same table.
PreparedSetsCachePtr prepared_sets_cache;
/// this is a mode of parallel replicas where we set parallel_replicas_count and parallel_replicas_offset
/// and generate specific filters on the replicas (e.g. when using parallel replicas with sample key)
/// if we already use a different mode of parallel replicas we want to disable this mode
bool offset_parallel_replicas_enabled = true;
public:

View File

@ -592,6 +592,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
"or it's invalid (settings `parallel_replicas_custom_key`)");
}
}
/// We disable prefer_localhost_replica because if one of the replicas is local it will create a single local plan
/// instead of executing the query with multiple replicas
/// We can enable this setting again for custom key parallel replicas when we can generate a plan that will use both a
/// local plan and remote replicas
else if (auto * distributed = dynamic_cast<StorageDistributed *>(storage.get());
distributed && context->canUseParallelReplicasCustomKeyForCluster(*distributed->getCluster()))
{

View File

@ -846,6 +846,10 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
distributed && query_context->canUseParallelReplicasCustomKeyForCluster(*distributed->getCluster()))
{
planner_context->getMutableQueryContext()->setSetting("distributed_group_by_no_merge", 2);
/// We disable prefer_localhost_replica because if one of the replicas is local it will create a single local plan
/// instead of executing the query with multiple replicas
/// We can enable this setting again for custom key parallel replicas when we can generate a plan that will use both a
/// local plan and remote replicas
planner_context->getMutableQueryContext()->setSetting("prefer_localhost_replica", Field{0});
}
}

View File

@ -839,7 +839,9 @@ void StorageDistributed::read(
SelectQueryInfo modified_query_info = query_info;
if (local_context->getSettingsRef().allow_experimental_analyzer)
const auto & settings = local_context->getSettingsRef();
if (settings.allow_experimental_analyzer)
{
StorageID remote_storage_id = StorageID::createEmpty();
if (!remote_table_function_ptr)
@ -864,7 +866,6 @@ void StorageDistributed::read(
header = InterpreterSelectQuery(modified_query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
const auto & settings = local_context->getSettingsRef();
if (!settings.allow_experimental_analyzer)
{
modified_query_info.query = ClusterProxy::rewriteSelectQuery(
@ -894,6 +895,9 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey(
*modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns);
ClusterProxy::executeQuery(
query_plan,
header,
@ -904,10 +908,10 @@ void StorageDistributed::read(
log,
local_context,
modified_query_info,
getInMemoryMetadataPtr()->columns,
sharding_key_expr,
sharding_key_column_name,
distributed_settings,
shard_filter_generator,
/* is_remote_function= */ static_cast<bool>(owned_cluster));
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.

View File

@ -242,6 +242,12 @@ void StorageMergeTree::read(
local_context);
return;
}
else
LOG_WARNING(
log,
"Parallel replicas with custom key will not be used because cluster defined by 'cluster_for_parallel_replicas' ('{}') has "
"multiple shards",
cluster->getName());
}
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()

View File

@ -5490,6 +5490,12 @@ void StorageReplicatedMergeTree::read(
local_context);
return;
}
else
LOG_WARNING(
log,
"Parallel replicas with custom key will not be used because cluster defined by 'cluster_for_parallel_replicas' ('{}') has "
"multiple shards",
cluster->getName());
}
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); }

View File

@ -23,12 +23,6 @@ def start_cluster():
cluster.shutdown()
def create_tables(cluster):
n1 = nodes[0]
n1.query("DROP TABLE IF EXISTS dist_table SYNC")
n1.query(f"DROP TABLE IF EXISTS test_table ON CLUSTER {cluster} SYNC")
def insert_data(table_name, row_num, all_nodes=False):
query = (
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers({row_num})"

View File

@ -53,11 +53,13 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("use_hedged_requests", [1, 0])
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
@pytest.mark.parametrize("prefer_localhost_replica", [0, 1])
def test_parallel_replicas_custom_key_failover(
start_cluster,
use_hedged_requests,
custom_key,
filter_type,
prefer_localhost_replica,
):
cluster_name = "test_single_shard_multiple_replicas"
table = "test_table"
@ -78,6 +80,7 @@ def test_parallel_replicas_custom_key_failover(
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
"use_hedged_requests": use_hedged_requests,
"prefer_localhost_replica": prefer_localhost_replica,
# avoid considering replica delay on connection choice
# otherwise connection can be not distributed evenly among available nodes
# and so custom key secondary queries (we check it bellow)