diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 9f790cdeb8e..665dd17eabe 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -2817,16 +2817,16 @@ Possible values: Default value: 0. -## distributed_insert_replicas_preferences {#distributed_insert_replicas_preferences} +## distributed_insert_skip_read_only_replicas {#distributed_insert_skip_read_only_replicas} -Adds ability to change replica preferences for INSERT queries. +Enables skipping read-only replicas for INSERT queries into Distributed. Possible values: -- `no_preferences` - no preferences -- `prefer_non_read_only` - prefer non read-only replicas for INSERT into Distributed (but note, that they will not be excluded completely, so if you have only read-only replicas it will still try to INSERT into them, with respect to `distributed_replica_max_ignored_errors`). +- 0 — INSERT was as usual, if it will go to read-only replica it will fail +- 1 — Initiator will skip read-only replicas before sending data to shards. -Default value: `prefer_non_read_only` +Default value: `0` ## distributed_foreground_insert {#distributed_foreground_insert} diff --git a/src/Client/ConnectionEstablisher.cpp b/src/Client/ConnectionEstablisher.cpp index 6b1b5aa968e..303105751ad 100644 --- a/src/Client/ConnectionEstablisher.cpp +++ b/src/Client/ConnectionEstablisher.cpp @@ -8,7 +8,6 @@ namespace ProfileEvents extern const Event DistributedConnectionUsable; extern const Event DistributedConnectionMissingTable; extern const Event DistributedConnectionStaleReplica; - extern const Event DistributedConnectionReadOnlyReplica; } namespace DB @@ -75,7 +74,6 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std:: { result.is_readonly = true; LOG_TRACE(log, "Table {}.{} is readonly on server {}", table_to_check->database, table_to_check->table, result.entry->getDescription()); - ProfileEvents::increment(ProfileEvents::DistributedConnectionReadOnlyReplica); } const UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries; diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index dddb3f45ba3..11bdb6108ca 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -158,6 +158,21 @@ std::vector ConnectionPoolWithFailover::g return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func); } +std::vector ConnectionPoolWithFailover::getManyCheckedForInsert( + const ConnectionTimeouts & timeouts, + const Settings & settings, + PoolMode pool_mode, + const QualifiedTableName & table_to_check) +{ + TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message) + { return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, /*async_callback=*/ {}); }; + + return getManyImpl(settings, pool_mode, try_get_entry, + /*skip_unavailable_endpoints=*/ std::nullopt, + /*priority_func=*/ {}, + settings.distributed_insert_skip_read_only_replicas); +} + ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings & settings) { const size_t offset = settings.load_balancing_first_offset % nested_pools.size(); @@ -171,7 +186,8 @@ std::vector ConnectionPoolWithFailover::g PoolMode pool_mode, const TryGetEntryFunc & try_get_entry, std::optional skip_unavailable_endpoints, - GetPriorityForLoadBalancing::Func priority_func) + GetPriorityForLoadBalancing::Func priority_func, + bool skip_read_only_replicas) { if (nested_pools.empty()) throw DB::Exception( @@ -203,7 +219,7 @@ std::vector ConnectionPoolWithFailover::g UInt64 max_ignored_errors = settings.distributed_replica_max_ignored_errors.value; bool fallback_to_stale_replicas = settings.fallback_to_stale_replicas_for_distributed_queries.value; - return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, try_get_entry, priority_func); + return Base::getMany(min_entries, max_entries, max_tries, max_ignored_errors, fallback_to_stale_replicas, skip_read_only_replicas, try_get_entry, priority_func); } ConnectionPoolWithFailover::TryResult @@ -254,14 +270,4 @@ ConnectionPoolWithFailover::getShuffledPools(const Settings & settings, GetPrior return Base::getShuffledPools(max_ignored_errors, priority_func, use_slowdown_count); } -void sortConnectionPoolByNonReadOnlyReplicas(std::vector & results) -{ - auto comparator = [&](const auto & left, const auto & right) - { - return std::forward_as_tuple(left.is_readonly, !left.is_up_to_date, left.delay) - < std::forward_as_tuple(right.is_readonly, !right.is_up_to_date, right.delay); - }; - std::stable_sort(results.begin(), results.end(), comparator); -} - } diff --git a/src/Client/ConnectionPoolWithFailover.h b/src/Client/ConnectionPoolWithFailover.h index 7bb4291e339..a363a50244e 100644 --- a/src/Client/ConnectionPoolWithFailover.h +++ b/src/Client/ConnectionPoolWithFailover.h @@ -77,6 +77,12 @@ public: AsyncCallback async_callback = {}, std::optional skip_unavailable_endpoints = std::nullopt, GetPriorityForLoadBalancing::Func priority_func = {}); + /// The same as getManyChecked(), but respects distributed_insert_skip_read_only_replicas setting. + std::vector getManyCheckedForInsert( + const ConnectionTimeouts & timeouts, + const Settings & settings, + PoolMode pool_mode, + const QualifiedTableName & table_to_check); struct NestedPoolStatus { @@ -107,7 +113,8 @@ private: PoolMode pool_mode, const TryGetEntryFunc & try_get_entry, std::optional skip_unavailable_endpoints = std::nullopt, - GetPriorityForLoadBalancing::Func priority_func = {}); + GetPriorityForLoadBalancing::Func priority_func = {}, + bool skip_read_only_replicas = false); /// Try to get a connection from the pool and check that it is good. /// If table_to_check is not null and the check is enabled in settings, check that replication delay @@ -125,8 +132,6 @@ private: GetPriorityForLoadBalancing get_priority_load_balancing; }; -void sortConnectionPoolByNonReadOnlyReplicas(std::vector & results); - using ConnectionPoolWithFailoverPtr = std::shared_ptr; using ConnectionPoolWithFailoverPtrs = std::vector; diff --git a/src/Common/PoolWithFailoverBase.h b/src/Common/PoolWithFailoverBase.h index 46c414d34c1..157b8f37b50 100644 --- a/src/Common/PoolWithFailoverBase.h +++ b/src/Common/PoolWithFailoverBase.h @@ -30,6 +30,7 @@ namespace ProfileEvents { extern const Event DistributedConnectionFailTry; extern const Event DistributedConnectionFailAtAll; + extern const Event DistributedConnectionReadOnlyReplica; } /// This class provides a pool with fault tolerance. It is used for pooling of connections to replicated DB. @@ -112,6 +113,7 @@ public: size_t min_entries, size_t max_entries, size_t max_tries, size_t max_ignored_errors, bool fallback_to_stale_replicas, + bool skip_read_only_replicas, const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority); @@ -200,8 +202,12 @@ PoolWithFailoverBase::get(size_t max_ignored_errors, bool fallback_ const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority) { std::vector results = getMany( - 1 /* min entries */, 1 /* max entries */, 1 /* max tries */, - max_ignored_errors, fallback_to_stale_replicas, + /* min_entries= */ 1, + /* max_entries= */ 1, + /* max_tries= */ 1, + max_ignored_errors, + fallback_to_stale_replicas, + /* skip_read_only_replicas= */ false, try_get_entry, get_priority); if (results.empty() || results[0].entry.isNull()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, @@ -215,6 +221,7 @@ PoolWithFailoverBase::getMany( size_t min_entries, size_t max_entries, size_t max_tries, size_t max_ignored_errors, bool fallback_to_stale_replicas, + bool skip_read_only_replicas, const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority) { @@ -266,9 +273,14 @@ PoolWithFailoverBase::getMany( ++entries_count; if (result.is_usable) { - ++usable_count; - if (result.is_up_to_date) - ++up_to_date_count; + if (!skip_read_only_replicas || !result.is_readonly) + { + ++usable_count; + if (result.is_up_to_date) + ++up_to_date_count; + } + else + ProfileEvents::increment(ProfileEvents::DistributedConnectionReadOnlyReplica); } } else @@ -291,7 +303,7 @@ PoolWithFailoverBase::getMany( throw DB::NetException(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "All connection tries failed. Log: \n\n{}\n", fail_messages); - std::erase_if(try_results, [](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }); + std::erase_if(try_results, [&](const TryResult & r) { return r.entry.isNull() || !r.is_usable || (skip_read_only_replicas && r.is_readonly); }); /// Sort so that preferred items are near the beginning. std::stable_sort( diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index b27c657aeaa..d8b3f01decc 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -156,7 +156,7 @@ M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.") \ M(DistributedConnectionMissingTable, "Number of times we rejected a replica from a distributed query, because it did not contain a table needed for the query.") \ M(DistributedConnectionStaleReplica, "Number of times we rejected a replica from a distributed query, because some table needed for a query had replication lag higher than the configured threshold.") \ - M(DistributedConnectionReadOnlyReplica, "Number of times we got a read-only replica for distributed INSERT query.") \ + M(DistributedConnectionReadOnlyReplica, "Number of times INSERT into Distributed prefer different replica due to current was read-only") \ M(DistributedConnectionFailAtAll, "Total count when distributed connection fails after all retries finished.") \ \ M(HedgedRequestsChangeReplica, "Total count when timeout for changing replica expired in hedged requests.") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e6d79209f66..04d30c5111d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -136,7 +136,7 @@ class IColumn; M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \ M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \ \ - M(ReplicasPreferences, distributed_insert_replicas_preferences, ReplicasPreferences::PREFER_NON_READ_ONLY, "PREFER_NON_READ_ONLY - INSERT query into Distributed will try to prefer non-read-only replicas. Note, that this is only about the order, read-only replicas will not be excluded completely", 0) \ + M(Bool, distributed_insert_skip_read_only_replicas, false, "If true, INSERT into Distributed will skip read-only replicas.", 0) \ M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data are sent to all nodes in a cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in the background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \ M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \ M(Milliseconds, distributed_background_insert_sleep_time_ms, 100, "Sleep time for background INSERTs into Distributed, in case of any errors delay grows exponentially.", 0) ALIAS(distributed_directory_monitor_sleep_time_ms) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 3245da584a1..b7f2d453ba6 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -136,7 +136,7 @@ static std::map sett {"async_insert_busy_timeout_max_ms", 200, 200, "The minimum value of the asynchronous insert timeout in milliseconds; async_insert_busy_timeout_ms is aliased to async_insert_busy_timeout_max_ms"}, {"async_insert_busy_timeout_increase_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases"}, {"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"}, - {"distributed_insert_replicas_preferences", "no_preferences", "prefer_non_read_only", "Control preferred replicas for INSERT into Distributed"}, + {"distributed_insert_skip_read_only_replicas", false, true, "If true, INSERT into Distributed will skip read-only replicas"}, {"format_template_row_format", "", "", "Template row format string can be set directly in query"}, {"format_template_resultset_format", "", "", "Template result set format string can be set in query"}, {"split_parts_ranges_into_intersecting_and_non_intersecting_final", true, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 47dadf3c36b..c3f0715ad68 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -79,10 +79,6 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) -IMPLEMENT_SETTING_ENUM(ReplicasPreferences, ErrorCodes::BAD_ARGUMENTS, - {{"no_preferences", ReplicasPreferences::NO_PREFERENCES}, - {"prefer_non_read_only", ReplicasPreferences::PREFER_NON_READ_ONLY}}) - IMPLEMENT_SETTING_ENUM(QueryCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d51271ef16e..0aa8216bb85 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -174,14 +174,6 @@ enum class DistributedProductMode DECLARE_SETTING_ENUM(DistributedProductMode) -enum class ReplicasPreferences -{ - NO_PREFERENCES = 0, - /// Prefer non read-only replicas (they will not be excluded completely). - PREFER_NON_READ_ONLY, -}; -DECLARE_SETTING_ENUM(ReplicasPreferences) - /// How the query cache handles queries with non-deterministic functions, e.g. now() enum class QueryCacheNondeterministicFunctionHandling { diff --git a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp index 0eed9eacac3..21c79597aea 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertBatch.cpp @@ -232,10 +232,7 @@ void DistributedAsyncInsertBatch::sendBatch(const SettingsChanges & settings_cha insert_settings.applyChanges(settings_changes); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); - auto result = parent.pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY) - sortConnectionPoolByNonReadOnlyReplicas(result); - + auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); connection = std::move(result.front().entry); compression_expected = connection->getCompression() == Protocol::Compression::Enable; @@ -293,10 +290,7 @@ void DistributedAsyncInsertBatch::sendSeparateFiles(const SettingsChanges & sett parent.storage.getContext()->getOpenTelemetrySpanLog()); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); - auto result = parent.pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); - if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY) - sortConnectionPoolByNonReadOnlyReplicas(result); - + auto result = parent.pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, parent.storage.remote_storage.getQualifiedName()); auto connection = std::move(result.front().entry); bool compression_expected = connection->getCompression() == Protocol::Compression::Enable; diff --git a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp index f5135c6c80a..1ee77611191 100644 --- a/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp +++ b/src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp @@ -412,10 +412,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path, insert_settings.applyChanges(settings_changes); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(insert_settings); - auto result = pool->getManyChecked(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - if (distributed_header.insert_settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY) - sortConnectionPoolByNonReadOnlyReplicas(result); - + auto result = pool->getManyCheckedForInsert(timeouts, insert_settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); auto connection = std::move(result.front().entry); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 50017803913..ddbcc6d473f 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -372,9 +372,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) - auto results = shard_info.pool->getManyChecked(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - if (settings.distributed_insert_replicas_preferences == ReplicasPreferences::PREFER_NON_READ_ONLY) - sortConnectionPoolByNonReadOnlyReplicas(results); + auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); job.connection_entry = std::move(results.front().entry); } else diff --git a/tests/queries/0_stateless/02980_dist_insert_readonly_replica.sql.j2 b/tests/queries/0_stateless/02980_dist_insert_readonly_replica.sql.j2 index 2c6d53d88fb..0d96db4c62f 100644 --- a/tests/queries/0_stateless/02980_dist_insert_readonly_replica.sql.j2 +++ b/tests/queries/0_stateless/02980_dist_insert_readonly_replica.sql.j2 @@ -29,14 +29,32 @@ system stop distributed sends dist; create table dist_batch as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', '', 'data') settings background_insert_batch=1; system stop distributed sends dist_batch; +create table dist_single_no_internal_replication as shard_0.data engine=Distributed('default', 'shard_1', 'data'); +system stop distributed sends dist_single_no_internal_replication; + +create table dist_single as shard_0.data engine=Distributed('test_cluster_two_replicas_different_databases_internal_replication', 'shard_1', 'data'); +system stop distributed sends dist_single; + set prefer_localhost_replica=0; set insert_deduplicate=0; +-- replica is readonly, avoid too many retries +set insert_keeper_max_retries=3; +-- and disable the fault injection to avoid failures +set insert_keeper_fault_injection_probability=0; + +-- for internal_replicatio==false, distributed_insert_skip_read_only_replicas does not changes anything +insert into dist_single_no_internal_replication settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY } +insert into dist_single_no_internal_replication settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError TABLE_IS_READ_ONLY } + +-- for internal_replicatio==true, it does +insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=0 values (0); -- { serverError TABLE_IS_READ_ONLY } +insert into dist_single settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values (0); -- { serverError ALL_CONNECTION_TRIES_FAILED } -- min_insert_block_size_rows is not enough, since replica will be selected -- before, so we need to perform INSERT multiple times to ensure that at least -- once read-only replica should be selected (if it wasn't not filtered out) {% for i in range(1, 30) %} -insert into dist settings distributed_foreground_insert=1 values ({{ i }}); +insert into dist settings distributed_foreground_insert=1, distributed_insert_skip_read_only_replicas=1 values ({{ i }}); {% endfor %} -- cannot check for background inserts, so only here system flush logs; @@ -51,12 +69,12 @@ where and Settings['distributed_foreground_insert'] = '1'; {% for i in range(1, 30) %} -insert into dist settings distributed_foreground_insert=0 values ({{ i }}); +insert into dist settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }}); system flush distributed dist; {% endfor %} {% for i in range(1, 30) %} -insert into dist_batch settings distributed_foreground_insert=0 values ({{ i }}); +insert into dist_batch settings distributed_foreground_insert=0, distributed_insert_skip_read_only_replicas=1 values ({{ i }}); system flush distributed dist_batch; {% endfor %}