[ClickHouse-58380] Implement Retries in distributed queries when server stopped during queries - REBASED

This commit is contained in:
Haydn 2024-08-19 14:24:06 -07:00
parent 540c9464c0
commit 19d2e7016c
7 changed files with 240 additions and 95 deletions

View File

@ -1295,7 +1295,7 @@ class IColumn;
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \
M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \
M(UInt64, distributed_query_retries, 3, "Maximum number of retries for distributed queries.", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -33,6 +33,8 @@ namespace ErrorCodes
extern const int INVALID_SHARD_ID;
extern const int NO_SUCH_REPLICA;
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
}
namespace
@ -920,4 +922,57 @@ bool Cluster::maybeCrossReplication() const
return false;
}
ConnectionPoolWithFailover::Entry Cluster::getConnectionWithRetries(
size_t shard_index,
size_t replica_index,
const Settings & settings,
size_t max_retries,
const QualifiedTableName & query_context)
{
const auto & shard_info = getShardsInfo()[shard_index];
const auto & addresses = getShardsAddresses();
size_t retries = 0;
// Begins a loop that will execute until the maximum number of retries is reached
while (retries < max_retries)
{
try
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ConnectionPoolWithFailover::Entry connection_entry;
if (shard_info.hasInternalReplication())
{
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, query_context);
connection_entry = std::move(results.front().entry);
}
else
{
// const auto & replica = addresses.at(shard_index).at(replica_index);
const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(replica_index);
connection_entry = connection_pool->get(timeouts, settings);
}
if (connection_entry.isNull())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty connection for replica {}", addresses[shard_index][replica_index].readableString());
return connection_entry;
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::NETWORK_ERROR || ex.code() == ErrorCodes::SOCKET_TIMEOUT)
{
LOG_WARNING(log, "Network error or socket timeout detected on shard {}, replica {}. Retry {}/{}", shard_index, replica_index, retries + 1, max_retries);
++retries;
}
else
{
throw;
}
}
}
throw Exception(ErrorCodes::NETWORK_ERROR, "Failed to get connection to shard {} replica {} after {} retries", shard_index, replica_index, max_retries);
}
}

View File

@ -276,6 +276,9 @@ public:
const String & getName() const { return name; }
// New method to get a connection with retries
ConnectionPoolWithFailover::Entry getConnectionWithRetries(size_t shard_index, size_t replica_index, const Settings & settings, size_t max_retries = 3, const QualifiedTableName & query_context = QualifiedTableName());
private:
SlotToShard slot_to_shard;
@ -285,6 +288,9 @@ public:
private:
void initMisc();
// Add a logger instance
LoggerPtr log = getLogger("Cluster");
/// For getClusterWithMultipleShards implementation.
struct SubclusterTag {};
Cluster(SubclusterTag, const Cluster & from, const std::vector<size_t> & indices);

View File

@ -61,6 +61,8 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int ABORTED;
extern const int SOCKET_TIMEOUT;
extern const int NETWORK_ERROR
}
static Block adoptBlock(const Block & header, const Block & block, LoggerPtr log)
@ -125,6 +127,7 @@ DistributedSink::DistributedSink(
, allow_materialized(context->getSettingsRef().insert_allow_materialized_columns)
, insert_timeout(insert_timeout_)
, columns_to_send(columns_to_send_.begin(), columns_to_send_.end())
, max_retries(context->getSettingsRef().distributed_query_retries)
, log(getLogger("DistributedSink"))
{
const auto & settings = context->getSettingsRef();
@ -362,91 +365,97 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
if (rows == 0)
return;
if (!job.is_local_job || !settings.prefer_localhost_replica)
try
{
if (!job.executor)
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
if (shard_info.hasInternalReplication())
QualifiedTableName query_context{storage.getRemoteDatabaseName(), storage.getRemoteTableName()};
if (!job.executor)
{
/// Skip replica_index in case of internal replication
if (shard_job.replicas_jobs.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are several writing job for an automatically replicated shard");
job.connection_entry = cluster->getConnectionWithRetries(job.shard_index, job.replica_index, settings, /* max_retries */ 3, query_context);
if (throttler)
job.connection_entry->setThrottler(throttler);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
/// NOTE: INSERT will also take into account max_replica_delay_for_distributed_queries
/// (anyway fallback_to_stale_replicas_for_distributed_queries=true by default)
auto results = shard_info.pool->getManyCheckedForInsert(timeouts, settings, PoolMode::GET_ONE, storage.remote_storage.getQualifiedName());
auto result = results.front();
if (shard_info.pool->isTryResultInvalid(result, settings.distributed_insert_skip_read_only_replicas))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an invalid connection result");
job.pipeline = QueryPipeline(std::make_shared<RemoteSink>(*job.connection_entry, ConnectionTimeouts::getTCPTimeoutsWithFailover(settings), query_string, settings, context->getClientInfo()));
job.executor = std::make_unique<PushingPipelineExecutor>(job.pipeline);
job.executor->start();
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log);
job.executor->push(adopted_shard_block);
}
else
{
if (!job.executor)
{
job.local_context = Context::createCopy(context);
auto copy_query_ast = query_ast->clone();
job.connection_entry = std::move(result.entry);
InterpreterInsertQuery interp(copy_query_ast, job.local_context, allow_materialized, false, false, false);
auto block_io = interp.execute();
job.pipeline = std::move(block_io.pipeline);
job.executor = std::make_unique<PushingPipelineExecutor>(job.pipeline);
job.executor->start();
}
writeBlockConvert(*job.executor, shard_block, shard_info.getLocalNodeCount(), log);
}
job.blocks_written += 1;
job.rows_written += rows;
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::NETWORK_ERROR || ex.code() == ErrorCodes::SOCKET_TIMEOUT)
{
LOG_WARNING(log, "Network error or socket timeout detected. Attempting to reconnect and resend the query.");
if (reconnectAndResend(job, shard_block))
{
LOG_INFO(log, "Reconnection and resend successful.");
return;
}
else
{
const auto & replica = addresses.at(job.shard_index).at(job.replica_index);
const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index);
if (!connection_pool)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Connection pool for replica {} does not exist", replica.readableString());
job.connection_entry = connection_pool->get(timeouts, settings);
if (job.connection_entry.isNull())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty connection for replica{}", replica.readableString());
LOG_ERROR(log, "Reconnection and resend failed. Throwing exception.");
throw;
}
if (throttler)
job.connection_entry->setThrottler(throttler);
job.pipeline = QueryPipeline(std::make_shared<RemoteSink>(
*job.connection_entry, timeouts, query_string, settings, context->getClientInfo()));
job.executor = std::make_unique<PushingPipelineExecutor>(job.pipeline);
job.executor->start();
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log);
job.executor->push(adopted_shard_block);
}
else // local
{
if (!job.executor)
else
{
/// Forward user settings
job.local_context = Context::createCopy(context);
/// Copying of the query AST is required to avoid race,
/// in case of INSERT into multiple local shards.
///
/// Since INSERT into local node uses AST,
/// and InterpreterInsertQuery::execute() is modifying it,
/// to resolve tables (in InterpreterInsertQuery::getTable())
auto copy_query_ast = query_ast->clone();
InterpreterInsertQuery interp(
copy_query_ast,
job.local_context,
allow_materialized,
/* no_squash */ false,
/* no_destination */ false,
/* async_isnert */ false);
auto block_io = interp.execute();
job.pipeline = std::move(block_io.pipeline);
job.executor = std::make_unique<PushingPipelineExecutor>(job.pipeline);
job.executor->start();
throw;
}
writeBlockConvert(*job.executor, shard_block, shard_info.getLocalNodeCount(), log);
}
job.blocks_written += 1;
job.rows_written += rows;
};
};
}
bool DistributedSink::reconnectAndResend(JobReplica & job, const Block & shard_block)
{
const auto & settings = context->getSettingsRef();
try
{
job.connection_entry = cluster->getConnectionWithRetries(job.shard_index, job.replica_index, settings, max_retries);
if (throttler)
job.connection_entry->setThrottler(throttler);
job.pipeline = QueryPipeline(std::make_shared<RemoteSink>(*job.connection_entry, ConnectionTimeouts::getTCPTimeoutsWithFailover(settings), query_string, settings, context->getClientInfo()));
job.executor = std::make_unique<PushingPipelineExecutor>(job.pipeline);
job.executor->start();
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
Block adopted_shard_block = adoptBlock(job.executor->getHeader(), shard_block, log);
job.executor->push(adopted_shard_block);
job.blocks_written += 1;
job.rows_written += shard_block.rows();
return true;
}
catch (const Exception & ex)
{
LOG_ERROR(log, "Reconnection attempt failed with error: {}", ex.message());
return false;
}
}
void DistributedSink::writeSync(const Block & block)
{
@ -517,33 +526,39 @@ void DistributedSink::writeSync(const Block & block)
per_shard_jobs[current_selector[i]].shard_current_block_permutation.push_back(i);
}
try
size_t attempt = 0;
while (attempt < max_retries)
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : collections::range(start, end))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards));
}
catch (...)
{
pool->wait();
throw;
}
try
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : collections::range(start, end))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards));
waitForJobs();
try
{
waitForJobs();
inserted_blocks += 1;
inserted_rows += block.rows();
break; // If successful, exit the retry loop
}
catch (const Exception & exception)
{
if (exception.code() == ErrorCodes::NETWORK_ERROR || exception.code() == ErrorCodes::SOCKET_TIMEOUT)
{
LOG_WARNING(log, "Retrying due to network error. Attempt: {}", attempt + 1);
++attempt;
}
else
{
throw;
}
}
if (attempt == max_retries)
{
throw;
}
}
catch (Exception & exception)
{
exception.addMessage(getCurrentStateDescription());
span.addAttribute(exception);
throw;
}
inserted_blocks += 1;
inserted_rows += block.rows();
}

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Common/PODArray.h>
#include <Common/Throttler.h>
#include <Common/ThreadPool.h>
@ -113,6 +114,8 @@ private:
std::optional<ThreadPool> pool;
ThrottlerPtr throttler;
size_t max_retires;
std::mutex execution_mutex;
struct JobReplica
@ -153,6 +156,7 @@ private:
std::atomic<unsigned> finished_jobs_count{0};
LoggerPtr log;
bool reconnectAndResend(JobReplica & job, const Block & shard_block);
};
}

View File

@ -0,0 +1,5 @@
2 b
3 c
1 a
2 b
3 c

View File

@ -0,0 +1,60 @@
CREATE DATABASE IF NOT EXISTS test_03204;
USE test_03204;
DROP TABLE IF EXISTS t1_shard;
DROP TABLE IF EXISTS t2_shard;
DROP TABLE IF EXISTS t1_distr;
DROP TABLE IF EXISTS t2_distr;
-- Create the shard tables
CREATE TABLE t1_shard (id Int32, value String) ENGINE = MergeTree PARTITION BY id ORDER BY id;
CREATE TABLE t2_shard (id Int32, value String) ENGINE = MergeTree PARTITION BY id ORDER BY id;
-- Create the distributed tables
CREATE TABLE t1_distr AS t1_shard ENGINE = Distributed(test_cluster_two_shards_localhost, test_03204, t1_shard, id);
CREATE TABLE t2_distr AS t2_shard ENGINE = Distributed(test_cluster_two_shards_localhost, test_03204, t2_shard, id);
-- Insert some data into the shard tables
INSERT INTO t1_shard VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO t2_shard VALUES (1, 'a'), (2, 'b'), (3, 'c');
-- Set the distributed product mode to allow global subqueries
SET distributed_product_mode = 'global';
-- Simulate replica failure by detaching a partition
ALTER TABLE t1_shard DETACH PARTITION 1;
-- Execute a distributed query that should reflect missing data
SELECT DISTINCT d0.id, d0.value
FROM t1_distr d0
WHERE d0.id IN
(
SELECT d1.id
FROM t1_distr AS d1
INNER JOIN t2_distr AS d2 ON d1.id = d2.id
WHERE d1.id > 0
ORDER BY d1.id
)
ORDER BY d0.id;
-- Reattach the partition to restore the data
ALTER TABLE t1_shard ATTACH PARTITION 1;
-- Execute the query again to verify restoration
SELECT DISTINCT d0.id, d0.value
FROM t1_distr d0
JOIN (
SELECT d1.id, d1.value
FROM t1_distr AS d1
INNER JOIN t2_distr AS d2 ON d1.id = d2.id
WHERE d1.id > 0
ORDER BY d1.id
) s0 USING (id, value)
ORDER BY d0.id;
-- Cleanup
DROP TABLE t1_shard;
DROP TABLE t2_shard;
DROP TABLE t1_distr;
DROP TABLE t2_distr;
DROP DATABASE test_03204;