Parallel replicas custom key: skip unavailable replicas

This commit is contained in:
Igor Nikonov 2023-11-26 21:38:49 +00:00
parent 5769a88b92
commit 774347d231
13 changed files with 159 additions and 60 deletions

View File

@ -560,7 +560,7 @@ Cluster::Cluster(
initMisc();
}
void Cluster::addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
void Cluster::addShard(const Settings & settings, Addresses addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths, UInt32 weight, bool internal_replication)
{
Addresses shard_local_addresses;
@ -596,7 +596,8 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
std::move(shard_local_addresses),
std::move(shard_pool),
std::move(all_replicas_pools),
internal_replication
internal_replication,
std::move(addresses),
});
}

View File

@ -222,6 +222,7 @@ public:
/// Connection pool for each replica, contains nullptr for local replicas
ConnectionPoolPtrs per_replica_pools;
bool has_internal_replication = false;
Addresses addresses;
};
using ShardsInfo = std::vector<ShardInfo>;
@ -291,7 +292,7 @@ private:
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
void addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
void addShard(const Settings & settings, Addresses addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths = {}, UInt32 weight = 1, bool internal_replication = false);
/// Inter-server secret

View File

@ -117,7 +117,8 @@ void SelectStreamFactory::createForShard(
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled)
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator)
{
auto it = objects_by_shard.find(shard_info.shard_num);
if (it != objects_by_shard.end())
@ -139,6 +140,7 @@ void SelectStreamFactory::createForShard(
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
.shard_filter_generator = shard_filter_generator,
});
};

View File

@ -40,6 +40,7 @@ ASTPtr rewriteSelectQuery(
ASTPtr table_function_ptr = nullptr);
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
class SelectStreamFactory
{
@ -59,6 +60,7 @@ public:
/// (When there is a local replica with big delay).
bool lazy = false;
time_t local_delay = 0;
AdditionalShardFilterGenerator shard_filter_generator;
};
using Shards = std::vector<Shard>;
@ -78,7 +80,8 @@ public:
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled);
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
struct ShardPlans
{

View File

@ -235,17 +235,17 @@ void executeQuery(
if (shard_filter_generator)
{
auto shard_filter = shard_filter_generator(shard_info.shard_num);
if (shard_filter)
{
auto & select_query = query_ast_for_shard->as<ASTSelectQuery &>();
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
// auto shard_filter = shard_filter_generator(shard_info.shard_num);
// if (shard_filter)
// {
// auto & select_query = query_ast_for_shard->as<ASTSelectQuery &>();
//
// auto where_expression = select_query.where();
// if (where_expression)
// shard_filter = makeASTFunction("and", where_expression, shard_filter);
//
// select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
// }
}
// decide for each shard if parallel reading from replicas should be enabled
@ -256,7 +256,7 @@ void executeQuery(
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards, static_cast<UInt32>(shards),
parallel_replicas_enabled);
parallel_replicas_enabled, shard_filter_generator);
}
if (!remote_shards.empty())
@ -280,7 +280,7 @@ void executeQuery(
log,
shards,
query_info.storage_limits,
not_optimized_cluster->getName());
not_optimized_cluster);
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));

View File

@ -59,7 +59,7 @@ void executeQuery(
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
AdditionalShardFilterGenerator shard_filter_generator = {});
AdditionalShardFilterGenerator shard_filter_generator);
void executeQueryWithParallelReplicas(

View File

@ -34,7 +34,7 @@ ASTPtr getCustomKeyFilterForParallelReplica(
const ColumnsDescription & columns,
const ContextPtr & context)
{
assert(replicas_count > 1);
chassert(replicas_count > 1);
if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
{
// first we do modulo with replica count

View File

@ -18,6 +18,7 @@
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h>
#include <boost/algorithm/string/join.hpp>
@ -104,7 +105,7 @@ ReadFromRemote::ReadFromRemote(
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_)
ClusterPtr cluster_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -117,7 +118,7 @@ ReadFromRemote::ReadFromRemote(
, storage_limits(std::move(storage_limits_))
, log(log_)
, shard_count(shard_count_)
, cluster_name(cluster_name_)
, cluster(cluster_)
{
}
@ -231,13 +232,12 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
add_extremes = context->getSettingsRef().extremes;
}
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
if (context->canUseParallelReplicas())
{
const auto cluster_name = cluster->getName();
if (context->getSettingsRef().cluster_for_parallel_replicas.changed)
{
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
@ -254,29 +254,71 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
if (context->canUseParallelReplicas())
/// parallel replicas custom key case
if (shard.shard_filter_generator)
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
// The coordinator will return query result from the shard.
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
LOG_DEBUG(log, "Number of replicas {}", shard.shard_info.addresses.size());
for (size_t i = 0; i < shard.shard_info.addresses.size(); ++i)
{
const auto & address = shard.shard_info.addresses[i];
LOG_DEBUG(log, "Creating pipe for replica {}", address.toString());
auto query = shard.query->clone();
auto & select_query = query->as<ASTSelectQuery &>();
auto shard_filter = shard.shard_filter_generator(i + 1);
if (shard_filter)
{
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
const String query_string = formattedAST(query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
}
}
else
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
{
const String query_string = formattedAST(shard.query);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
if (context->canUseParallelReplicas())
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
// The coordinator will return query result from the shard.
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
}
else
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
}
}
void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)

View File

@ -38,7 +38,7 @@ public:
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_);
ClusterPtr cluster_);
String getName() const override { return "ReadFromRemote"; }
@ -59,7 +59,7 @@ private:
std::shared_ptr<const StorageLimitsList> storage_limits;
Poco::Logger * log;
UInt32 shard_count;
const String cluster_name;
ClusterPtr cluster;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);

View File

@ -431,7 +431,8 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
if (query_info.use_custom_key)
{
LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into virtual shards");
query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
// query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
query_info.cluster = cluster;
}
else
{
@ -879,30 +880,30 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
auto settings = local_context->getSettingsRef();
const auto & settings = local_context->getSettingsRef();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (query_info.use_custom_key)
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
if (query_info.getCluster()->getShardCount() == 1)
{
// we are reading from single shard with multiple replicas but didn't transform replicas
// into virtual shards with custom_key set
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards");
}
// if (query_info.getCluster()->getShardCount() == 1)
// {
// // we are reading from single shard with multiple replicas but didn't transform replicas
// // into virtual shards with custom_key set
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards");
// }
additional_shard_filter_generator =
[&, my_custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
chassert(query_info.getCluster()->getShardsInfo().size() == 1);
additional_shard_filter_generator
= [my_custom_key_ast = std::move(custom_key_ast),
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
context = local_context,
replica_count = query_info.getCluster()->getShardsInfo().front().addresses.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
shard_count,
shard_num - 1,
my_custom_key_ast,
settings.parallel_replicas_custom_key_filter_type,
this->getInMemoryMetadataPtr()->columns,
local_context);
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
};
}
}

View File

@ -94,6 +94,24 @@
</replica>
</shard>
</test_cluster_two_shards_different_databases_with_local>
<parallel_replicas_custom_key_unavailable_replica>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<!-- Unavailable replica -->
<replica>
<host>127.0.0.11</host>
<port>1234</port>
</replica>
</shard>
</parallel_replicas_custom_key_unavailable_replica>
<parallel_replicas>
<shard>
<internal_replication>false</internal_replication>

View File

@ -0,0 +1,8 @@
0 250
1 250
2 250
3 250
0 250
1 250
2 250
3 250

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS 02918_parallel_replicas;
CREATE TABLE 02918_parallel_replicas (x String, y Int32) ENGINE = MergeTree ORDER BY cityHash64(x);
INSERT INTO 02918_parallel_replicas SELECT toString(number), number % 4 FROM numbers(1000);
SET async_socket_for_remote=0;
SET async_query_sending_for_remote=0;
SELECT y, count()
FROM cluster(parallel_replicas_custom_key_unavailable_replica, currentDatabase(), 02918_parallel_replicas)
-- FROM cluster(test_cluster_one_shard_two_replicas, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
SELECT y, count()
FROM cluster(parallel_replicas_custom_key_unavailable_replica, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range';
DROP TABLE 02918_parallel_replicas;