Merge pull request #58993 from ClickHouse/revert-58992-revert_flaky

Revive: Parallel replicas custom key: skip unavailable replicas
This commit is contained in:
Igor Nikonov 2024-01-20 19:44:56 +01:00 committed by GitHub
commit 4be068c73b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 646 additions and 187 deletions

View File

@ -118,18 +118,18 @@ ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
return result;
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback);
};
{ return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); };
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
std::vector<Entry> entries;
entries.reserve(results.size());
@ -153,17 +153,17 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const ConnectionTimeouts & timeouts,
const Settings & settings, PoolMode pool_mode,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback);
};
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings)
@ -178,10 +178,12 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints)
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
throw DB::Exception(
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
if (!skip_unavailable_endpoints.has_value())
@ -203,14 +205,13 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode");
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value;
return Base::getMany(min_entries, max_entries, max_tries,
max_ignored_errors, fallback_to_stale_replicas,
try_get_entry, get_priority);
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
}
ConnectionPoolWithFailover::TryResult
@ -251,11 +252,14 @@ ConnectionPoolWithFailover::tryGetEntry(
return result;
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings & settings)
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool>
ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPriorityForLoadBalancing::Func priority_func)
{
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
if (!priority_func)
priority_func = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
return Base::getShuffledPools(max_ignored_errors, get_priority);
return Base::getShuffledPools(max_ignored_errors, priority_func);
}
}

View File

@ -54,10 +54,13 @@ public:
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/
std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
const Settings & settings, PoolMode pool_mode,
std::vector<Entry> getMany(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
@ -74,7 +77,8 @@ public:
PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
struct NestedPoolStatus
{
@ -87,7 +91,7 @@ public:
using Status = std::vector<NestedPoolStatus>;
Status getStatus() const;
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings);
std::vector<Base::ShuffledPool> getShuffledPools(const Settings & settings, GetPriorityFunc priority_func = {});
size_t getMaxErrorCup() const { return Base::max_error_cap; }
@ -96,13 +100,16 @@ public:
Base::updateSharedErrorCounts(shuffled_pools);
}
size_t getPoolSize() const { return Base::getPoolSize(); }
private:
/// Get the values of relevant settings and call Base::getMany()
std::vector<TryResult> getManyImpl(
const Settings & settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt);
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
@ -115,7 +122,7 @@ private:
const QualifiedTableName * table_to_check = nullptr,
AsyncCallback async_callback = {});
GetPriorityFunc makeGetPriorityFunc(const Settings & settings);
GetPriorityForLoadBalancing::Func makeGetPriorityFunc(const Settings & settings);
GetPriorityForLoadBalancing get_priority_load_balancing;
};

View File

@ -28,7 +28,8 @@ HedgedConnections::HedgedConnections(
const ThrottlerPtr & throttler_,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_,
AsyncCallback async_callback)
AsyncCallback async_callback,
GetPriorityForLoadBalancing::Func priority_func)
: hedged_connections_factory(
pool_,
context_->getSettingsRef(),
@ -37,7 +38,8 @@ HedgedConnections::HedgedConnections(
context_->getSettingsRef().fallback_to_stale_replicas_for_distributed_queries.value,
context_->getSettingsRef().max_parallel_replicas.value,
context_->getSettingsRef().skip_unavailable_shards.value,
table_to_check_)
table_to_check_,
priority_func)
, context(std::move(context_))
, settings(context->getSettingsRef())
, throttler(throttler_)

View File

@ -70,13 +70,15 @@ public:
size_t index;
};
HedgedConnections(const ConnectionPoolWithFailoverPtr & pool_,
HedgedConnections(
const ConnectionPoolWithFailoverPtr & pool_,
ContextPtr context_,
const ConnectionTimeouts & timeouts_,
const ThrottlerPtr & throttler,
PoolMode pool_mode,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
AsyncCallback async_callback = {});
AsyncCallback async_callback = {},
GetPriorityForLoadBalancing::Func priority_func = {});
void sendScalarsData(Scalars & data) override;

View File

@ -29,7 +29,8 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
bool fallback_to_stale_replicas_,
UInt64 max_parallel_replicas_,
bool skip_unavailable_shards_,
std::shared_ptr<QualifiedTableName> table_to_check_)
std::shared_ptr<QualifiedTableName> table_to_check_,
GetPriorityForLoadBalancing::Func priority_func)
: pool(pool_)
, timeouts(timeouts_)
, table_to_check(table_to_check_)
@ -39,7 +40,7 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
, max_parallel_replicas(max_parallel_replicas_)
, skip_unavailable_shards(skip_unavailable_shards_)
{
shuffled_pools = pool->getShuffledPools(settings_);
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
for (auto shuffled_pool : shuffled_pools)
replicas.emplace_back(std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
}
@ -323,8 +324,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnect
else
{
ShuffledPool & shuffled_pool = shuffled_pools[index];
LOG_WARNING(
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
LOG_INFO(log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);

View File

@ -53,7 +53,8 @@ public:
bool fallback_to_stale_replicas_,
UInt64 max_parallel_replicas_,
bool skip_unavailable_shards_,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr,
GetPriorityForLoadBalancing::Func priority_func = {});
/// Create and return active connections according to pool_mode.
std::vector<Connection *> getManyConnections(PoolMode pool_mode, AsyncCallback async_callback = {});

View File

@ -9,7 +9,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
GetPriorityForLoadBalancing::Func
GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const
{
std::function<Priority(size_t index)> get_priority;
switch (load_balance)
@ -33,19 +34,26 @@ std::function<Priority(size_t index)> GetPriorityForLoadBalancing::getPriorityFu
get_priority = [offset](size_t i) { return i != offset ? Priority{1} : Priority{0}; };
break;
case LoadBalancing::ROUND_ROBIN:
if (last_used >= pool_size)
last_used = 0;
auto local_last_used = last_used % pool_size;
++last_used;
/* Consider pool_size equals to 5
* last_used = 1 -> get_priority: 0 1 2 3 4
* last_used = 2 -> get_priority: 4 0 1 2 3
* last_used = 3 -> get_priority: 4 3 0 1 2
* ...
* */
get_priority = [this, pool_size](size_t i)
// Example: pool_size = 5
// | local_last_used | i=0 | i=1 | i=2 | i=3 | i=4 |
// | 0 | 4 | 0 | 1 | 2 | 3 |
// | 1 | 3 | 4 | 0 | 1 | 2 |
// | 2 | 2 | 3 | 4 | 0 | 1 |
// | 3 | 1 | 2 | 3 | 4 | 0 |
// | 4 | 0 | 1 | 2 | 3 | 4 |
get_priority = [pool_size, local_last_used](size_t i)
{
++i; // To make `i` indexing start with 1 instead of 0 as `last_used` does
return Priority{static_cast<Int64>(i < last_used ? pool_size - i : i - last_used)};
size_t priority = pool_size - 1;
if (i < local_last_used)
priority = pool_size - 1 - (local_last_used - i);
if (i > local_last_used)
priority = i - local_last_used - 1;
return Priority{static_cast<Int64>(priority)};
};
break;
}

View File

@ -8,7 +8,12 @@ namespace DB
class GetPriorityForLoadBalancing
{
public:
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_) : load_balancing(load_balancing_) {}
using Func = std::function<Priority(size_t index)>;
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_, size_t last_used_ = 0)
: load_balancing(load_balancing_), last_used(last_used_)
{
}
GetPriorityForLoadBalancing() = default;
bool operator == (const GetPriorityForLoadBalancing & other) const
@ -23,7 +28,7 @@ public:
return !(*this == other);
}
std::function<Priority(size_t index)> getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
std::vector<size_t> hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools.
std::vector<size_t> hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools.

View File

@ -124,7 +124,9 @@ public:
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc());
const GetPriorityFunc & get_priority);
size_t getPoolSize() const { return nested_pools.size(); }
protected:
@ -147,7 +149,7 @@ protected:
return std::make_tuple(shared_pool_states, nested_pools, last_error_decrease_time);
}
NestedPools nested_pools;
const NestedPools nested_pools;
const time_t decrease_error_period;
const size_t max_error_cap;

View File

@ -117,13 +117,13 @@ void SelectStreamFactory::createForShard(
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled)
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator)
{
auto it = objects_by_shard.find(shard_info.shard_num);
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(
@ -139,6 +139,7 @@ void SelectStreamFactory::createForShard(
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
.shard_filter_generator = std::move(shard_filter_generator),
});
};

View File

@ -40,6 +40,7 @@ ASTPtr rewriteSelectQuery(
ASTPtr table_function_ptr = nullptr);
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
class SelectStreamFactory
{
@ -59,6 +60,7 @@ public:
/// (When there is a local replica with big delay).
bool lazy = false;
time_t local_delay = 0;
AdditionalShardFilterGenerator shard_filter_generator{};
};
using Shards = std::vector<Shard>;
@ -78,7 +80,8 @@ public:
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled);
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;

View File

@ -158,6 +158,13 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
new_settings.timeout_overflow_mode = settings.timeout_overflow_mode_leaf;
}
/// in case of parallel replicas custom key use round robing load balancing
/// so custom key partitions will be spread over nodes in round-robin fashion
if (context->canUseParallelReplicasCustomKey(cluster) && !settings.load_balancing.changed)
{
new_settings.load_balancing = LoadBalancing::ROUND_ROBIN;
}
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
@ -247,21 +254,6 @@ void executeQuery(
visitor.visit(query_ast_for_shard);
}
if (shard_filter_generator)
{
auto shard_filter = shard_filter_generator(shard_info.shard_num);
if (shard_filter)
{
auto & select_query = query_ast_for_shard->as<ASTSelectQuery &>();
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
}
// decide for each shard if parallel reading from replicas should be enabled
// according to settings and number of replicas declared per shard
const auto & addresses = cluster->getShardsAddresses().at(i);
@ -276,7 +268,8 @@ void executeQuery(
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled);
parallel_replicas_enabled,
shard_filter_generator);
}
if (!remote_shards.empty())

View File

@ -65,7 +65,7 @@ void executeQuery(
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator = {});
AdditionalShardFilterGenerator shard_filter_generator);
void executeQueryWithParallelReplicas(

View File

@ -5113,6 +5113,12 @@ bool Context::canUseParallelReplicasOnFollower() const
return canUseTaskBasedParallelReplicas() && getClientInfo().collaborate_with_initiator;
}
bool Context::canUseParallelReplicasCustomKey(const Cluster & cluster) const
{
return settings.max_parallel_replicas > 1 && getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY
&& cluster.getShardCount() == 1 && cluster.getShardsInfo()[0].getAllNodeCount() > 1;
}
void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache)
{
prepared_sets_cache = cache;

View File

@ -1246,6 +1246,7 @@ public:
bool canUseTaskBasedParallelReplicas() const;
bool canUseParallelReplicasOnInitiator() const;
bool canUseParallelReplicasOnFollower() const;
bool canUseParallelReplicasCustomKey(const Cluster & cluster) const;
enum class ParallelReplicasMode : uint8_t
{

View File

@ -589,9 +589,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
else if (auto * distributed = dynamic_cast<StorageDistributed *>(storage.get());
distributed && canUseCustomKey(settings, *distributed->getCluster(), *context))
distributed && context->canUseParallelReplicasCustomKey(*distributed->getCluster()))
{
query_info.use_custom_key = true;
context->setSetting("distributed_group_by_no_merge", 2);
}
}

View File

@ -20,12 +20,6 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
}
bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context)
{
return settings.max_parallel_replicas > 1 && context.getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY
&& cluster.getShardCount() == 1 && cluster.getShardsInfo()[0].getAllNodeCount() > 1;
}
ASTPtr getCustomKeyFilterForParallelReplica(
size_t replicas_count,
size_t replica_num,
@ -34,7 +28,7 @@ ASTPtr getCustomKeyFilterForParallelReplica(
const ColumnsDescription & columns,
const ContextPtr & context)
{
assert(replicas_count > 1);
chassert(replicas_count > 1);
if (filter_type == ParallelReplicasCustomKeyFilterType::DEFAULT)
{
// first we do modulo with replica count

View File

@ -9,9 +9,6 @@
namespace DB
{
bool canUseCustomKey(const Settings & settings, const Cluster & cluster, const Context & context);
/// Get AST for filter created from custom_key
/// replica_num is the number of the replica for which we are generating filter starting from 0
ASTPtr getCustomKeyFilterForParallelReplica(

View File

@ -809,9 +809,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
else
{
if (auto * distributed = typeid_cast<StorageDistributed *>(storage.get());
distributed && canUseCustomKey(settings, *distributed->getCluster(), *query_context))
distributed && query_context->canUseParallelReplicasCustomKey(*distributed->getCluster()))
{
table_expression_query_info.use_custom_key = true;
planner_context->getMutableQueryContext()->setSetting("distributed_group_by_no_merge", 2);
}
}

View File

@ -18,6 +18,7 @@
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h>
#include <boost/algorithm/string/join.hpp>
@ -231,8 +232,6 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
add_extremes = context->getSettingsRef().extremes;
}
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
@ -254,6 +253,57 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}
/// parallel replicas custom key case
if (shard.shard_filter_generator)
{
for (size_t i = 0; i < shard.shard_info.per_replica_pools.size(); ++i)
{
auto query = shard.query->clone();
auto & select_query = query->as<ASTSelectQuery &>();
auto shard_filter = shard.shard_filter_generator(i + 1);
if (shard_filter)
{
auto where_expression = select_query.where();
if (where_expression)
shard_filter = makeASTFunction("and", where_expression, shard_filter);
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
const String query_string = formattedAST(query);
if (!priority_func_factory.has_value())
priority_func_factory = GetPriorityForLoadBalancing(LoadBalancing::ROUND_ROBIN, randomSeed());
GetPriorityForLoadBalancing::Func priority_func
= priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize());
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool,
query_string,
output_stream->header,
context,
throttler,
scalars,
external_tables,
stage,
std::nullopt,
priority_func);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
}
}
else
{
const String query_string = formattedAST(shard.query);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
@ -277,6 +327,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
pipes.emplace_back(
createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);
}
}
void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)

View File

@ -60,6 +60,7 @@ private:
Poco::Logger * log;
UInt32 shard_count;
const String cluster_name;
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);

View File

@ -43,13 +43,24 @@ namespace ErrorCodes
}
RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: header(header_)
, query(query_)
, context(context_)
, scalars(scalars_)
, external_tables(external_tables_)
, stage(stage_)
, extension(extension_)
{}
, priority_func(priority_func_)
{
}
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
@ -100,10 +111,16 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_, priority_func_)
{
create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -117,7 +134,8 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback));
auto res = std::make_unique<HedgedConnections>(
pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback), priority_func);
if (extension && extension->replica_info)
res->setReplicaInfo(*extension->replica_info);
return res;
@ -137,14 +155,16 @@ RemoteQueryExecutor::RemoteQueryExecutor(
pool_mode,
main_table.getQualifiedName(),
std::move(async_callback),
skip_unavailable_endpoints);
skip_unavailable_endpoints,
priority_func);
connection_entries.reserve(try_results.size());
for (auto & try_result : try_results)
connection_entries.emplace_back(std::move(try_result.entry));
}
else
{
connection_entries = pool->getMany(timeouts, current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints);
connection_entries = pool->getMany(
timeouts, current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints, priority_func);
}
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);

View File

@ -50,6 +50,7 @@ public:
std::shared_ptr<TaskIterator> task_iterator = nullptr;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator = nullptr;
std::optional<IConnections::ReplicaInfo> replica_info = {};
GetPriorityForLoadBalancing::Func priority_func;
};
/// Takes already set connection.
@ -76,9 +77,15 @@ public:
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
~RemoteQueryExecutor();
@ -191,9 +198,14 @@ public:
private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_);
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func = {});
Block header;
Block totals;
@ -273,6 +285,8 @@ private:
Poco::Logger * log = nullptr;
GetPriorityForLoadBalancing::Func priority_func;
/// Send all scalars to remote servers
void sendScalars();

View File

@ -176,8 +176,6 @@ struct SelectQueryInfo
///
/// Configured in StorageDistributed::getQueryProcessingStage()
ClusterPtr optimized_cluster;
/// should we use custom key with the cluster
bool use_custom_key = false;
TreeRewriterResultPtr syntax_analyzer_result;

View File

@ -429,15 +429,10 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
size_t nodes = getClusterQueriedNodes(settings, cluster);
if (query_info.use_custom_key)
{
LOG_INFO(log, "Single shard cluster used with custom_key, transforming replicas into virtual shards");
query_info.cluster = cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
else
{
query_info.cluster = cluster;
if (!local_context->canUseParallelReplicasCustomKey(*cluster))
{
if (nodes > 1 && settings.optimize_skip_unused_shards)
{
/// Always calculate optimized cluster here, to avoid conditions during read()
@ -880,30 +875,22 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
auto settings = local_context->getSettingsRef();
const auto & settings = local_context->getSettingsRef();
ClusterProxy::AdditionalShardFilterGenerator additional_shard_filter_generator;
if (query_info.use_custom_key)
if (local_context->canUseParallelReplicasCustomKey(*query_info.getCluster()))
{
if (auto custom_key_ast = parseCustomKeyForTable(settings.parallel_replicas_custom_key, *local_context))
{
if (query_info.getCluster()->getShardCount() == 1)
{
// we are reading from single shard with multiple replicas but didn't transform replicas
// into virtual shards with custom_key set
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas weren't transformed into virtual shards");
}
additional_shard_filter_generator =
[&, my_custom_key_ast = std::move(custom_key_ast), shard_count = query_info.cluster->getShardCount()](uint64_t shard_num) -> ASTPtr
[my_custom_key_ast = std::move(custom_key_ast),
column_description = this->getInMemoryMetadataPtr()->columns,
custom_key_type = settings.parallel_replicas_custom_key_filter_type.value,
context = local_context,
replica_count = query_info.getCluster()->getShardsInfo().front().per_replica_pools.size()](uint64_t replica_num) -> ASTPtr
{
return getCustomKeyFilterForParallelReplica(
shard_count,
shard_num - 1,
my_custom_key_ast,
settings.parallel_replicas_custom_key_filter_type,
this->getInMemoryMetadataPtr()->columns,
local_context);
replica_count, replica_num - 1, my_custom_key_ast, custom_key_type, column_description, context);
};
}
}

View File

@ -144,6 +144,24 @@
</replica>
</shard>
</parallel_replicas>
<test_cluster_1_shard_3_replicas_1_unavailable>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<!-- Unavailable replica -->
<replica>
<host>127.0.0.3</host>
<port>1234</port>
</replica>
</shard>
</test_cluster_1_shard_3_replicas_1_unavailable>
<test_cluster_one_shard_three_replicas_localhost>
<shard>
<internal_replication>false</internal_replication>

View File

@ -87,8 +87,3 @@ def test_parallel_replicas_custom_key(start_cluster, cluster, custom_key, filter
node.contains_in_log("Processing query on a replica using custom_key")
for node in nodes
)
else:
# we first transform all replicas into shards and then append for each shard filter
assert n1.contains_in_log(
"Single shard cluster used with custom_key, transforming replicas into virtual shards"
)

View File

@ -0,0 +1,26 @@
<clickhouse>
<remote_servers>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>1234</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>1234</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,122 @@
import pytest
import uuid
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"n3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
nodes = [node1, node3]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
node1.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node3.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
node1.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
node3.query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
# populate data
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)"
)
node3.query(f"SYSTEM SYNC REPLICA {table_name}")
@pytest.mark.parametrize("use_hedged_requests", [1, 0])
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
@pytest.mark.parametrize("prefer_localhost_replica", [0, 1])
def test_parallel_replicas_custom_key_failover(
start_cluster,
use_hedged_requests,
custom_key,
filter_type,
prefer_localhost_replica,
):
cluster_name = "test_single_shard_multiple_replicas"
table = "test_table"
create_tables(cluster_name, table)
expected_result = ""
for i in range(4):
expected_result += f"{i}\t1000\n"
log_comment = uuid.uuid4()
assert (
node1.query(
f"SELECT key, count() FROM cluster('{cluster_name}', currentDatabase(), test_table) GROUP BY key ORDER BY key",
settings={
"log_comment": log_comment,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": 4,
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
"use_hedged_requests": use_hedged_requests,
# avoid considering replica delay on connection choice
# otherwise connection can be not distributed evenly among available nodes
# and so custom key secondary queries (we check it bellow)
"max_replica_delay_for_distributed_queries": 0,
},
)
== expected_result
)
for node in nodes:
node.query("system flush logs")
# the subqueries should be spread over available nodes
query_id = node1.query(
f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id"
)
assert query_id != ""
query_id = query_id[:-1]
if prefer_localhost_replica == 0:
assert (
node1.query(
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
)
== "subqueries\t4\n"
)
# currently this assert is flaky with asan and tsan builds, disable the assert in such cases for now
# will be investigated separately
if (
not node1.is_built_with_thread_sanitizer()
and not node1.is_built_with_address_sanitizer()
):
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t3\nn3\t2\n"
)

View File

@ -0,0 +1,26 @@
<clickhouse>
<remote_servers>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,118 @@
import pytest
import uuid
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"n2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"n3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"n4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
nodes = [node1, node2, node3, node4]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(table_name):
for i in range(0, 4):
nodes[i].query(f"DROP TABLE IF EXISTS {table_name} SYNC")
nodes[i].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r{i+1}') ORDER BY (key)"
)
# populate data
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)"
)
node2.query(f"SYSTEM SYNC REPLICA {table_name}")
node3.query(f"SYSTEM SYNC REPLICA {table_name}")
node4.query(f"SYSTEM SYNC REPLICA {table_name}")
@pytest.mark.parametrize("use_hedged_requests", [1, 0])
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
def test_parallel_replicas_custom_key_load_balancing(
start_cluster,
use_hedged_requests,
custom_key,
filter_type,
):
cluster_name = "test_single_shard_multiple_replicas"
table = "test_table"
create_tables(table)
expected_result = ""
for i in range(4):
expected_result += f"{i}\t1000\n"
log_comment = uuid.uuid4()
assert (
node1.query(
f"SELECT key, count() FROM cluster('{cluster_name}', currentDatabase(), test_table) GROUP BY key ORDER BY key",
settings={
"log_comment": log_comment,
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
"use_hedged_requests": use_hedged_requests,
# avoid considering replica delay on connection choice
# otherwise connection can be not distributed evenly among available nodes
# and so custom key secondary queries (we check it bellow)
"max_replica_delay_for_distributed_queries": 0,
},
)
== expected_result
)
for node in nodes:
node.query("system flush logs")
# the subqueries should be spread over available nodes
query_id = node1.query(
f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id"
)
assert query_id != ""
query_id = query_id[:-1]
assert (
node1.query(
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
)
== "subqueries\t4\n"
)
# check queries per node
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t2\nn2\t1\nn3\t1\nn4\t1\n"
)

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable|Timeout: connect timed out' | wc -l
$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --connect_timeout_with_failover_ms 1 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable|Timeout: connect timed out' | wc -l

View File

@ -0,0 +1,29 @@
-- { echoOn }
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
0 250
1 250
2 250
3 250
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range';
0 250
1 250
2 250
3 250
SET use_hedged_requests=0;
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
0 250
1 250
2 250
3 250

View File

@ -0,0 +1,30 @@
DROP TABLE IF EXISTS 02918_parallel_replicas;
CREATE TABLE 02918_parallel_replicas (x String, y Int32) ENGINE = MergeTree ORDER BY cityHash64(x);
INSERT INTO 02918_parallel_replicas SELECT toString(number), number % 4 FROM numbers(1000);
SET prefer_localhost_replica=0;
-- { echoOn }
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range';
SET use_hedged_requests=0;
SELECT y, count()
FROM cluster(test_cluster_1_shard_3_replicas_1_unavailable, currentDatabase(), 02918_parallel_replicas)
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
-- { echoOff }
DROP TABLE 02918_parallel_replicas;