Replace distributed_insert_replicas_preferences with distributed_insert_skip_read_only_replicas

v2: fix test
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-03-14 15:12:46 +01:00
parent 63f92b2c16
commit c7b919b19e
14 changed files with 77 additions and 61 deletions

View File

@ -2817,16 +2817,16 @@ Possible values:
Default value: 0.
## distributed_insert_replicas_preferences {#distributed_insert_replicas_preferences}
## distributed_insert_skip_read_only_replicas {#distributed_insert_skip_read_only_replicas}
Adds ability to change replica preferences for INSERT queries.
Enables skipping read-only replicas for INSERT queries into Distributed.
Possible values:
- `no_preferences` - no preferences
- `prefer_non_read_only` - prefer non read-only replicas for INSERT into Distributed (but note, that they will not be excluded completely, so if you have only read-only replicas it will still try to INSERT into them, with respect to `distributed_replica_max_ignored_errors`).
- 0 — INSERT was as usual, if it will go to read-only replica it will fail
- 1 — Initiator will skip read-only replicas before sending data to shards.
Default value: `prefer_non_read_only`
Default value: `0`
## distributed_foreground_insert {#distributed_foreground_insert}

View File

@ -8,7 +8,6 @@ namespace ProfileEvents
extern const Event DistributedConnectionUsable;
extern const Event DistributedConnectionMissingTable;
extern const Event DistributedConnectionStaleReplica;
extern const Event DistributedConnectionReadOnlyReplica;
}
namespace DB
@ -75,7 +74,6 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
{
result.is_readonly = true;
LOG_TRACE(log, "Table {}.{} is readonly on server {}", table_to_check->database, table_to_check->table, result.entry->getDescription());
ProfileEvents::increment(ProfileEvents::DistributedConnectionReadOnlyReplica);
}
const UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;

View File

@ -158,6 +158,21 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyCheckedForInsert(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); };
return getManyImpl(settings, pool_mode, try_get_entry,
/*skip_unavailable_endpoints=*/ std::nullopt,
/*priority_func=*/ {},
settings.distributed_insert_skip_read_only_replicas);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings)
{
const size_t offset = settings.load_balancing_first_offset % nested_pools.size();
@ -171,7 +186,8 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
GetPriorityForLoadBalancing::Func priority_func,
bool skip_read_only_replicas)
{
if (nested_pools.empty())
throw DB::Exception(
@ -203,7 +219,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value;
bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value;
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func);
return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, skip_read_only_replicas, try_get_entry, priority_func);
}
ConnectionPoolWithFailover::TryResult
@ -254,14 +270,4 @@ ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPrior
return Base::getShuffledPools(max_ignored_errors, priority_func, use_slowdown_count);
}
void sortConnectionPoolByNonReadOnlyReplicas(std::vector<ConnectionPoolWithFailover::TryResult> & results)
{
auto comparator = [&](const auto & left, const auto & right)
{
return std::forward_as_tuple(left.is_readonly, !left.is_up_to_date, left.delay)
< std::forward_as_tuple(right.is_readonly, !right.is_up_to_date, right.delay);
};
std::stable_sort(results.begin(), results.end(), comparator);
}
}

View File

@ -77,6 +77,12 @@ public:
AsyncCallback async_callback = {},
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
/// The same as getManyChecked(), but respects distributed_insert_skip_read_only_replicas setting.
std::vector<TryResult> getManyCheckedForInsert(
const ConnectionTimeouts & timeouts,
const Settings & settings,
PoolMode pool_mode,
const QualifiedTableName & table_to_check);
struct NestedPoolStatus
{
@ -107,7 +113,8 @@ private:
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
GetPriorityForLoadBalancing::Func priority_func = {},
bool skip_read_only_replicas = false);
/// Try to get a connection from the pool and check that it is good.
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
@ -125,8 +132,6 @@ private:
GetPriorityForLoadBalancing get_priority_load_balancing;
};
void sortConnectionPoolByNonReadOnlyReplicas(std::vector<ConnectionPoolWithFailover::TryResult> & results);
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
using ConnectionPoolWithFailoverPtrs = std::vector<ConnectionPoolWithFailoverPtr>;

View File

@ -30,6 +30,7 @@ namespace ProfileEvents
{
extern const Event DistributedConnectionFailTry;
extern const Event DistributedConnectionFailAtAll;
extern const Event DistributedConnectionReadOnlyReplica;
}
/// This class provides a pool with fault tolerance. It is used for pooling of connections to replicated DB.
@ -112,6 +113,7 @@ public:
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
bool skip_read_only_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority);
@ -200,8 +202,12 @@ PoolWithFailoverBase<TNestedPool>::get(size_t max_ignored_errors, bool fallback_
const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<TryResult> results = getMany(
1 /* min entries */, 1 /* max entries */, 1 /* max tries */,
max_ignored_errors, fallback_to_stale_replicas,
/* min_entries= */ 1,
/* max_entries= */ 1,
/* max_tries= */ 1,
max_ignored_errors,
fallback_to_stale_replicas,
/* skip_read_only_replicas= */ false,
try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
@ -215,6 +221,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries, size_t max_tries,
size_t max_ignored_errors,
bool fallback_to_stale_replicas,
bool skip_read_only_replicas,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority)
{
@ -266,9 +273,14 @@ PoolWithFailoverBase<TNestedPool>::getMany(
++entries_count;
if (result.is_usable)
{
++usable_count;
if (result.is_up_to_date)
++up_to_date_count;
if (!skip_read_only_replicas || !result.is_readonly)
{
++usable_count;
if (result.is_up_to_date)
++up_to_date_count;
}
else
ProfileEvents::increment(ProfileEvents::DistributedConnectionReadOnlyReplica);
}
}
else
@ -291,7 +303,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
throw DB::NetException(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"All connection tries failed. Log: \n\n{}\n", fail_messages);
std::erase_if(try_results, [](const TryResult & r) { return r.entry.isNull() || !r.is_usable; });
std::erase_if(try_results, [&](const TryResult & r) { return r.entry.isNull() || !r.is_usable || (skip_read_only_replicas && r.is_readonly); });
/// Sort so that preferred items are near the beginning.
std::stable_sort(

View File

@ -156,7 +156,7 @@
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.") \
M(DistributedConnectionMissingTable, "Number of times we rejected a replica from a distributed query, because it did not contain a table needed for the query.") \
M(DistributedConnectionStaleReplica, "Number of times we rejected a replica from a distributed query, because some table needed for a query had replication lag higher than the configured threshold.") \
M(DistributedConnectionReadOnlyReplica, "Number of times we got a read-only replica for distributed INSERT query.") \
M(DistributedConnectionReadOnlyReplica, "Number of times INSERT into Distributed prefer different replica due to current was read-only") \
M(DistributedConnectionFailAtAll, "Total count when distributed connection fails after all retries finished.") \
\
M(HedgedRequestsChangeReplica, "Total count when timeout for changing replica expired in hedged requests.") \

View File

@ -136,7 +136,7 @@ class IColumn;
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\
M(ReplicasPreferences, distributed_insert_replicas_preferences, ReplicasPreferences::PREFER_NON_READ_ONLY, "PREFER_NON_READ_ONLY - INSERT query into Distributed will try to prefer non-read-only replicas. Note, that this is only about the order, read-only replicas will not be excluded completely", 0) \
M(Bool, distributed_insert_skip_read_only_replicas, false, "If true, INSERT into Distributed will skip read-only replicas.", 0) \
M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data are sent to all nodes in a cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in the background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \
M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \
M(Milliseconds, distributed_background_insert_sleep_time_ms, 100, "Sleep time for background INSERTs into Distributed, in case of any errors delay grows exponentially.", 0) ALIAS(distributed_directory_monitor_sleep_time_ms) \

View File

@ -136,7 +136,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"async_insert_busy_timeout_max_ms", 200, 200, "The minimum value of the asynchronous insert timeout in milliseconds; async_insert_busy_timeout_ms is aliased to async_insert_busy_timeout_max_ms"},
{"async_insert_busy_timeout_increase_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases"},
{"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"},
{"distributed_insert_replicas_preferences", "no_preferences", "prefer_non_read_only", "Control preferred replicas for INSERT into Distributed"},
{"distributed_insert_skip_read_only_replicas", false, true, "If true, INSERT into Distributed will skip read-only replicas"},
{"format_template_row_format", "", "", "Template row format string can be set directly in query"},
{"format_template_resultset_format", "", "", "Template result set format string can be set in query"},
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", true, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},

View File

@ -79,10 +79,6 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
{"global", DistributedProductMode::GLOBAL},
{"allow", DistributedProductMode::ALLOW}})
IMPLEMENT_SETTING_ENUM(ReplicasPreferences, ErrorCodes::BAD_ARGUMENTS,
{{"no_preferences", ReplicasPreferences::NO_PREFERENCES},
{"prefer_non_read_only", ReplicasPreferences::PREFER_NON_READ_ONLY}})
IMPLEMENT_SETTING_ENUM(QueryCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
{{"throw", QueryCacheNondeterministicFunctionHandling::Throw},

View File

@ -174,14 +174,6 @@ enum class DistributedProductMode
DECLARE_SETTING_ENUM(DistributedProductMode)
enum class ReplicasPreferences
{
NO_PREFERENCES = 0,
/// Prefer non read-only replicas (they will not be excluded completely).
PREFER_NON_READ_ONLY,
};
DECLARE_SETTING_ENUM(ReplicasPreferences)
/// How the query cache handles queries with non-deterministic functions, e.g. now()
enum class QueryCacheNondeterministicFunctionHandling
{

View File

@ -232,10 +232,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY)
sortConnectionPoolByNonReadOnlyReplicas(result);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
connection = std::move(result.front().entry);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
@ -293,10 +290,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett
parent.storage.getContext()->getOpenTelemetrySpanLog());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = parent.pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY)
sortConnectionPoolByNonReadOnlyReplicas(result);
auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;

View File

@ -412,10 +412,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path,
insert_settings.applyChanges(settings_changes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings);
auto result = pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY)
sortConnectionPoolByNonReadOnlyReplicas(result);
auto result = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto connection = std::move(result.front().entry);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",

View File

@ -372,9 +372,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyChecked(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
if (settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY)
sortConnectionPoolByNonReadOnlyReplicas(results);
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
job.connection_entry = std::move(results.front().entry);
}
else

View File

@ -29,14 +29,32 @@ system stop distributed sends dist;
create table dist_batch as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', '', 'data') settings background_insert_batch=1;
system stop distributed sends dist_batch;
create table dist_single_no_internal_replication as shard_0.data engine=Distributed('default', 'shard_1', 'data');
system stop distributed sends dist_single_no_internal_replication;
create table dist_single as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', 'shard_1', 'data');
system stop distributed sends dist_single;
set prefer_localhost_replica=0;
set insert_deduplicate=0;
-- replica is readonly, avoid too many retries
set insert_keeper_max_retries=3;
-- and disable the fault injection to avoid failures
set insert_keeper_fault_injection_probability=0;
-- for internal_replicatio==false, distributed_insert_skip_read_only_replicas does not changes anything
insert into dist_single_no_internal_replication settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY }
insert into dist_single_no_internal_replication settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError TABLE_IS_READ_ONLY }
-- for internal_replicatio==true, it does
insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY }
insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError ALL_CONNECTION_TRIES_FAILED }
-- min_insert_block_size_rows is not enough, since replica will be selected
-- before, so we need to perform INSERT multiple times to ensure that at least
-- once read-only replica should be selected (if it wasn't not filtered out)
{% for i in range(1, 30) %}
insert into dist settings distributed_foreground_insert=1 values ({{ i }});
insert into dist settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values ({{ i }});
{% endfor %}
-- cannot check for background inserts, so only here
system flush logs;
@ -51,12 +69,12 @@ where
and Settings['distributed_foreground_insert'] = '1';
{% for i in range(1, 30) %}
insert into dist settings distributed_foreground_insert=0 values ({{ i }});
insert into dist settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }});
system flush distributed dist;
{% endfor %}
{% for i in range(1, 30) %}
insert into dist_batch settings distributed_foreground_insert=0 values ({{ i }});
insert into dist_batch settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }});
system flush distributed dist_batch;
{% endfor %}