Merge pull request #29921 from azat/copier-fixes

copier: add ability to configure retries and delays between them
This commit is contained in:
Nikita Mikhaylov 2021-10-11 12:11:38 +03:00 committed by GitHub
commit 32d77aafe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 17 deletions

View File

@ -86,7 +86,7 @@ decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
if (try_number < max_tries)
{
tryLogCurrentException(log, "Will retry");
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
}
}
@ -309,7 +309,7 @@ void ClusterCopier::process(const ConnectionTimeouts & timeouts)
/// Retry table processing
bool table_is_done = false;
for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
for (UInt64 num_table_tries = 1; num_table_tries <= max_table_tries; ++num_table_tries)
{
if (tryProcessTable(timeouts, task_table))
{
@ -340,7 +340,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
const String & description,
bool unprioritized)
{
std::chrono::milliseconds current_sleep_time = default_sleep_time;
std::chrono::milliseconds current_sleep_time = retry_delay_ms;
static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
if (unprioritized)
@ -366,7 +366,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
LOG_INFO(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
current_sleep_time = std::min(max_sleep_time, current_sleep_time + retry_delay_ms);
std::this_thread::sleep_for(current_sleep_time);
num_bad_version_errors = 0;
@ -785,7 +785,7 @@ bool ClusterCopier::tryDropPartitionPiece(
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
return false;
}
@ -798,7 +798,7 @@ bool ClusterCopier::tryDropPartitionPiece(
if (stat.numChildren != 0)
{
LOG_INFO(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
return false;
}
else
@ -1005,7 +1005,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
task_status = TaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
for (UInt64 try_num = 1; try_num <= max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
@ -1020,7 +1020,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
if (task_status == TaskStatus::Error)
@ -1068,7 +1068,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
break;
/// Repeat on errors.
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
catch (...)
{
@ -1109,7 +1109,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
if (!table_is_done)
{
LOG_INFO(log, "Table {} is not processed yet.Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
LOG_INFO(log, "Table {} is not processed yet. Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
}
else
{
@ -1212,7 +1212,7 @@ TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTim
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
was_active_pieces = (res == TaskStatus::Active);

View File

@ -65,6 +65,23 @@ public:
experimental_use_sample_offset = value;
}
void setMaxTableTries(UInt64 tries)
{
max_table_tries = tries;
}
void setMaxShardPartitionTries(UInt64 tries)
{
max_shard_partition_tries = tries;
}
void setMaxShardPartitionPieceTriesForAlter(UInt64 tries)
{
max_shard_partition_piece_tries_for_alter = tries;
}
void setRetryDelayMs(std::chrono::milliseconds ms)
{
retry_delay_ms = ms;
}
protected:
String getWorkersPath() const
@ -123,10 +140,6 @@ protected:
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@ -218,6 +231,9 @@ private:
Poco::Logger * log;
std::chrono::milliseconds default_sleep_time{1000};
UInt64 max_table_tries = 3;
UInt64 max_shard_partition_tries = 3;
UInt64 max_shard_partition_piece_tries_for_alter = 10;
std::chrono::milliseconds retry_delay_ms{1000};
};
}

View File

@ -31,6 +31,10 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
move_fault_probability = std::max(std::min(config().getDouble("move-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : fs::current_path().string();
max_table_tries = std::max<size_t>(config().getUInt("max-table-tries", 3), 1);
max_shard_partition_tries = std::max<size_t>(config().getUInt("max-shard-partition-tries", 3), 1);
max_shard_partition_piece_tries_for_alter = std::max<size_t>(config().getUInt("max-shard-partition-piece-tries-for-alter", 10), 1);
retry_delay_ms = std::chrono::milliseconds(std::max<size_t>(config().getUInt("retry-delay-ms", 1000), 100));
if (config().has("experimental-use-sample-offset"))
experimental_use_sample_offset = config().getBool("experimental-use-sample-offset");
@ -100,6 +104,15 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
options.addOption(Poco::Util::Option("max-table-tries", "", "Number of tries for the copy table task")
.argument("max-table-tries").binding("max-table-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-tries", "", "Number of tries for the copy one partition task")
.argument("max-shard-partition-tries").binding("max-shard-partition-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-piece-tries-for-alter", "", "Number of tries for final ALTER ATTACH to destination table")
.argument("max-shard-partition-piece-tries-for-alter").binding("max-shard-partition-piece-tries-for-alter"));
options.addOption(Poco::Util::Option("retry-delay-ms", "", "Delay between task retries")
.argument("retry-delay-ms").binding("retry-delay-ms"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
@ -161,7 +174,10 @@ void ClusterCopierApp::mainImpl()
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);
copier->setMaxTableTries(max_table_tries);
copier->setMaxShardPartitionTries(max_shard_partition_tries);
copier->setMaxShardPartitionPieceTriesForAlter(max_shard_partition_piece_tries_for_alter);
copier->setRetryDelayMs(retry_delay_ms);
copier->setExperimentalUseSampleOffset(experimental_use_sample_offset);
auto task_file = config().getString("task-file", "");

View File

@ -83,6 +83,11 @@ private:
double move_fault_probability = 0.0;
bool is_help = false;
UInt64 max_table_tries = 3;
UInt64 max_shard_partition_tries = 3;
UInt64 max_shard_partition_piece_tries_for_alter = 10;
std::chrono::milliseconds retry_delay_ms{1000};
bool experimental_use_sample_offset{false};
std::string base_dir;