From 19d2e7016c6a8147e79fe6b6a483938e89e71d0a Mon Sep 17 00:00:00 2001 From: Haydn Date: Mon, 19 Aug 2024 14:24:06 -0700 Subject: [PATCH] [ClickHouse-58380] Implement Retries in distributed queries when server stopped during queries - REBASED --- src/Core/Settings.h | 2 +- src/Interpreters/Cluster.cpp | 55 +++++ src/Interpreters/Cluster.h | 6 + src/Storages/Distributed/DistributedSink.cpp | 203 ++++++++++-------- src/Storages/Distributed/DistributedSink.h | 4 + .../03204_replica_outage_management.reference | 5 + .../03204_replica_outage_management.sql | 60 ++++++ 7 files changed, 240 insertions(+), 95 deletions(-) create mode 100644 tests/queries/0_stateless/03204_replica_outage_management.reference create mode 100644 tests/queries/0_stateless/03204_replica_outage_management.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index d8837d26e54..e2b2afa3d69 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -1295,7 +1295,7 @@ class IColumn; M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \ M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \ M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \ - + M(UInt64, distributed_query_retries, 3, "Maximum number of retries for distributed queries.", 0) \ // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 7b7bedc850d..7784ea03fc1 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -33,6 +33,8 @@ namespace ErrorCodes extern const int INVALID_SHARD_ID; extern const int NO_SUCH_REPLICA; extern const int BAD_ARGUMENTS; + extern const int NETWORK_ERROR; + extern const int SOCKET_TIMEOUT; } namespace @@ -920,4 +922,57 @@ bool Cluster::maybeCrossReplication() const return false; } +ConnectionPoolWithFailover::Entry Cluster::getConnectionWithRetries( + size_t shard_index, + size_t replica_index, + const Settings & settings, + size_t max_retries, + const QualifiedTableName & query_context) +{ + const auto & shard_info = getShardsInfo()[shard_index]; + const auto & addresses = getShardsAddresses(); + size_t retries = 0; + + // Begins a loop that will execute until the maximum number of retries is reached + while (retries < max_retries) + { + try + { + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); + ConnectionPoolWithFailover::Entry connection_entry; + + if (shard_info.hasInternalReplication()) + { + auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, query_context); + connection_entry = std::move(results.front().entry); + } + else + { + // const auto & replica = addresses.at(shard_index).at(replica_index); + const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(replica_index); + connection_entry = connection_pool->get(timeouts, settings); + } + + if (connection_entry.isNull()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty connection for replica {}", addresses[shard_index][replica_index].readableString()); + + return connection_entry; + } + catch (const Exception & ex) + { + if (ex.code() == ErrorCodes::NETWORK_ERROR || ex.code() == ErrorCodes::SOCKET_TIMEOUT) + { + LOG_WARNING(log, "Network error or socket timeout detected on shard {}, replica {}. Retry {}/{}", shard_index, replica_index, retries + 1, max_retries); + ++retries; + } + else + { + throw; + } + } + } + + throw Exception(ErrorCodes::NETWORK_ERROR, "Failed to get connection to shard {} replica {} after {} retries", shard_index, replica_index, max_retries); +} + } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 82d77941b76..1107569a212 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -276,6 +276,9 @@ public: const String & getName() const { return name; } + // New method to get a connection with retries + ConnectionPoolWithFailover::Entry getConnectionWithRetries(size_t shard_index, size_t replica_index, const Settings & settings, size_t max_retries = 3, const QualifiedTableName & query_context = QualifiedTableName()); + private: SlotToShard slot_to_shard; @@ -285,6 +288,9 @@ public: private: void initMisc(); + // Add a logger instance + LoggerPtr log = getLogger("Cluster"); + /// For getClusterWithMultipleShards implementation. struct SubclusterTag {}; Cluster(SubclusterTag, const Cluster & from, const std::vector & indices); diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index e3e73e42096..ec7353ee7b4 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -61,6 +61,8 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; extern const int TOO_LARGE_DISTRIBUTED_DEPTH; extern const int ABORTED; + extern const int SOCKET_TIMEOUT; + extern const int NETWORK_ERROR } static Block adoptBlock(const Block & header, const Block & block, LoggerPtr log) @@ -125,6 +127,7 @@ DistributedSink::DistributedSink( , allow_materialized(context->getSettingsRef().insert_allow_materialized_columns) , insert_timeout(insert_timeout_) , columns_to_send(columns_to_send_.begin(), columns_to_send_.end()) + , max_retries(context->getSettingsRef().distributed_query_retries) , log(getLogger("DistributedSink")) { const auto & settings = context->getSettingsRef(); @@ -362,91 +365,97 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si if (rows == 0) return; - if (!job.is_local_job || !settings.prefer_localhost_replica) + try { - if (!job.executor) + if (!job.is_local_job || !settings.prefer_localhost_replica) { - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); - if (shard_info.hasInternalReplication()) + QualifiedTableName query_context{storage.getRemoteDatabaseName(), storage.getRemoteTableName()}; + + if (!job.executor) { - /// Skip replica_index in case of internal replication - if (shard_job.replicas_jobs.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "There are several writing job for an automatically replicated shard"); + job.connection_entry = cluster->getConnectionWithRetries(job.shard_index, job.replica_index, settings, /* max_retries */ 3, query_context); + if (throttler) + job.connection_entry->setThrottler(throttler); - /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here - /// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries - /// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default) - auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName()); - auto result = results.front(); - if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result"); + job.pipeline = QueryPipeline(std::make_shared(*job.connection_entry, ConnectionTimeouts::getTCPTimeoutsWithFailover(settings), query_string, settings, context->getClientInfo())); + job.executor = std::make_unique(job.pipeline); + job.executor->start(); + } + CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; + Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); + job.executor->push(adopted_shard_block); + } + else + { + if (!job.executor) + { + job.local_context = Context::createCopy(context); + auto copy_query_ast = query_ast->clone(); - job.connection_entry = std::move(result.entry); + InterpreterInsertQuery interp(copy_query_ast, job.local_context, allow_materialized, false, false, false); + auto block_io = interp.execute(); + job.pipeline = std::move(block_io.pipeline); + job.executor = std::make_unique(job.pipeline); + job.executor->start(); + } + writeBlockConvert(*job.executor, shard_block, shard_info.getLocalNodeCount(), log); + } + job.blocks_written += 1; + job.rows_written += rows; + } + catch (const Exception & ex) + { + if (ex.code() == ErrorCodes::NETWORK_ERROR || ex.code() == ErrorCodes::SOCKET_TIMEOUT) + { + LOG_WARNING(log, "Network error or socket timeout detected. Attempting to reconnect and resend the query."); + if (reconnectAndResend(job, shard_block)) + { + LOG_INFO(log, "Reconnection and resend successful."); + return; } else { - const auto & replica = addresses.at(job.shard_index).at(job.replica_index); - - const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index); - if (!connection_pool) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Connection pool for replica {} does not exist", replica.readableString()); - - job.connection_entry = connection_pool->get(timeouts, settings); - if (job.connection_entry.isNull()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty connection for replica{}", replica.readableString()); + LOG_ERROR(log, "Reconnection and resend failed. Throwing exception."); + throw; } - - if (throttler) - job.connection_entry->setThrottler(throttler); - - job.pipeline = QueryPipeline(std::make_shared( - *job.connection_entry, timeouts, query_string, settings, context->getClientInfo())); - job.executor = std::make_unique(job.pipeline); - job.executor->start(); } - - CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; - - Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); - job.executor->push(adopted_shard_block); - } - else // local - { - if (!job.executor) + else { - /// Forward user settings - job.local_context = Context::createCopy(context); - - /// Copying of the query AST is required to avoid race, - /// in case of INSERT into multiple local shards. - /// - /// Since INSERT into local node uses AST, - /// and InterpreterInsertQuery::execute() is modifying it, - /// to resolve tables (in InterpreterInsertQuery::getTable()) - auto copy_query_ast = query_ast->clone(); - - InterpreterInsertQuery interp( - copy_query_ast, - job.local_context, - allow_materialized, - /* no_squash */ false, - /* no_destination */ false, - /* async_isnert */ false); - auto block_io = interp.execute(); - - job.pipeline = std::move(block_io.pipeline); - job.executor = std::make_unique(job.pipeline); - job.executor->start(); + throw; } - - writeBlockConvert(*job.executor, shard_block, shard_info.getLocalNodeCount(), log); - } - - job.blocks_written += 1; - job.rows_written += rows; + }; }; } +bool DistributedSink::reconnectAndResend(JobReplica & job, const Block & shard_block) +{ + const auto & settings = context->getSettingsRef(); + + try + { + job.connection_entry = cluster->getConnectionWithRetries(job.shard_index, job.replica_index, settings, max_retries); + + if (throttler) + job.connection_entry->setThrottler(throttler); + + job.pipeline = QueryPipeline(std::make_shared(*job.connection_entry, ConnectionTimeouts::getTCPTimeoutsWithFailover(settings), query_string, settings, context->getClientInfo())); + job.executor = std::make_unique(job.pipeline); + job.executor->start(); + + CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; + Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log); + job.executor->push(adopted_shard_block); + + job.blocks_written += 1; + job.rows_written += shard_block.rows(); + return true; + } + catch (const Exception & ex) + { + LOG_ERROR(log, "Reconnection attempt failed with error: {}", ex.message()); + return false; + } +} void DistributedSink::writeSync(const Block & block) { @@ -517,33 +526,39 @@ void DistributedSink::writeSync(const Block & block) per_shard_jobs[current_selector[i]].shard_current_block_permutation.push_back(i); } - try + size_t attempt = 0; + while (attempt < max_retries) { - /// Run jobs in parallel for each block and wait them - finished_jobs_count = 0; - for (size_t shard_index : collections::range(start, end)) - for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) - pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards)); - } - catch (...) - { - pool->wait(); - throw; - } + try + { + /// Run jobs in parallel for each block and wait them + finished_jobs_count = 0; + for (size_t shard_index : collections::range(start, end)) + for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) + pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards)); + waitForJobs(); - try - { - waitForJobs(); + inserted_blocks += 1; + inserted_rows += block.rows(); + break; // If successful, exit the retry loop + } + catch (const Exception & exception) + { + if (exception.code() == ErrorCodes::NETWORK_ERROR || exception.code() == ErrorCodes::SOCKET_TIMEOUT) + { + LOG_WARNING(log, "Retrying due to network error. Attempt: {}", attempt + 1); + ++attempt; + } + else + { + throw; + } + } + if (attempt == max_retries) + { + throw; + } } - catch (Exception & exception) - { - exception.addMessage(getCurrentStateDescription()); - span.addAttribute(exception); - throw; - } - - inserted_blocks += 1; - inserted_rows += block.rows(); } diff --git a/src/Storages/Distributed/DistributedSink.h b/src/Storages/Distributed/DistributedSink.h index 33e580e00fa..f5b81c9c844 100644 --- a/src/Storages/Distributed/DistributedSink.h +++ b/src/Storages/Distributed/DistributedSink.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -113,6 +114,8 @@ private: std::optional pool; ThrottlerPtr throttler; + size_t max_retires; + std::mutex execution_mutex; struct JobReplica @@ -153,6 +156,7 @@ private: std::atomic finished_jobs_count{0}; LoggerPtr log; + bool reconnectAndResend(JobReplica & job, const Block & shard_block); }; } diff --git a/tests/queries/0_stateless/03204_replica_outage_management.reference b/tests/queries/0_stateless/03204_replica_outage_management.reference new file mode 100644 index 00000000000..d65bf8b6e83 --- /dev/null +++ b/tests/queries/0_stateless/03204_replica_outage_management.reference @@ -0,0 +1,5 @@ +2 b +3 c +1 a +2 b +3 c \ No newline at end of file diff --git a/tests/queries/0_stateless/03204_replica_outage_management.sql b/tests/queries/0_stateless/03204_replica_outage_management.sql new file mode 100644 index 00000000000..faa0e28ca32 --- /dev/null +++ b/tests/queries/0_stateless/03204_replica_outage_management.sql @@ -0,0 +1,60 @@ +CREATE DATABASE IF NOT EXISTS test_03204; +USE test_03204; + +DROP TABLE IF EXISTS t1_shard; +DROP TABLE IF EXISTS t2_shard; +DROP TABLE IF EXISTS t1_distr; +DROP TABLE IF EXISTS t2_distr; + +-- Create the shard tables +CREATE TABLE t1_shard (id Int32, value String) ENGINE = MergeTree PARTITION BY id ORDER BY id; +CREATE TABLE t2_shard (id Int32, value String) ENGINE = MergeTree PARTITION BY id ORDER BY id; + +-- Create the distributed tables +CREATE TABLE t1_distr AS t1_shard ENGINE = Distributed(test_cluster_two_shards_localhost, test_03204, t1_shard, id); +CREATE TABLE t2_distr AS t2_shard ENGINE = Distributed(test_cluster_two_shards_localhost, test_03204, t2_shard, id); + +-- Insert some data into the shard tables +INSERT INTO t1_shard VALUES (1, 'a'), (2, 'b'), (3, 'c'); +INSERT INTO t2_shard VALUES (1, 'a'), (2, 'b'), (3, 'c'); + +-- Set the distributed product mode to allow global subqueries +SET distributed_product_mode = 'global'; + +-- Simulate replica failure by detaching a partition +ALTER TABLE t1_shard DETACH PARTITION 1; + +-- Execute a distributed query that should reflect missing data +SELECT DISTINCT d0.id, d0.value +FROM t1_distr d0 +WHERE d0.id IN +( + SELECT d1.id + FROM t1_distr AS d1 + INNER JOIN t2_distr AS d2 ON d1.id = d2.id + WHERE d1.id > 0 + ORDER BY d1.id +) +ORDER BY d0.id; + +-- Reattach the partition to restore the data +ALTER TABLE t1_shard ATTACH PARTITION 1; + +-- Execute the query again to verify restoration +SELECT DISTINCT d0.id, d0.value +FROM t1_distr d0 +JOIN ( + SELECT d1.id, d1.value + FROM t1_distr AS d1 + INNER JOIN t2_distr AS d2 ON d1.id = d2.id + WHERE d1.id > 0 + ORDER BY d1.id +) s0 USING (id, value) +ORDER BY d0.id; + +-- Cleanup +DROP TABLE t1_shard; +DROP TABLE t2_shard; +DROP TABLE t1_distr; +DROP TABLE t2_distr; +DROP DATABASE test_03204; \ No newline at end of file