diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 60e1590d3bd..e7aeea8cbad 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -86,7 +86,7 @@ decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries) if (try_number < max_tries) { tryLogCurrentException(log, "Will retry"); - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); } } } @@ -309,7 +309,7 @@ void ClusterCopier::process(const ConnectionTimeouts & timeouts) /// Retry table processing bool table_is_done = false; - for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries) + for (UInt64 num_table_tries = 1; num_table_tries <= max_table_tries; ++num_table_tries) { if (tryProcessTable(timeouts, task_table)) { @@ -340,7 +340,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee const String & description, bool unprioritized) { - std::chrono::milliseconds current_sleep_time = default_sleep_time; + std::chrono::milliseconds current_sleep_time = retry_delay_ms; static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec if (unprioritized) @@ -366,7 +366,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee LOG_INFO(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description); if (unprioritized) - current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time); + current_sleep_time = std::min(max_sleep_time, current_sleep_time + retry_delay_ms); std::this_thread::sleep_for(current_sleep_time); num_bad_version_errors = 0; @@ -785,7 +785,7 @@ bool ClusterCopier::tryDropPartitionPiece( if (e.code == Coordination::Error::ZNODEEXISTS) { LOG_INFO(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number)); - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); return false; } @@ -798,7 +798,7 @@ bool ClusterCopier::tryDropPartitionPiece( if (stat.numChildren != 0) { LOG_INFO(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren); - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); return false; } else @@ -1005,7 +1005,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab task_status = TaskStatus::Error; bool was_error = false; has_shard_to_process = true; - for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num) + for (UInt64 try_num = 1; try_num <= max_shard_partition_tries; ++try_num) { task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task); @@ -1020,7 +1020,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab break; /// Repeat on errors - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); } if (task_status == TaskStatus::Error) @@ -1068,7 +1068,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab break; /// Repeat on errors. - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); } catch (...) { @@ -1109,7 +1109,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab if (!table_is_done) { - LOG_INFO(log, "Table {} is not processed yet.Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions); + LOG_INFO(log, "Table {} is not processed yet. Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions); } else { @@ -1212,7 +1212,7 @@ TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTim break; /// Repeat on errors - std::this_thread::sleep_for(default_sleep_time); + std::this_thread::sleep_for(retry_delay_ms); } was_active_pieces = (res == TaskStatus::Active); diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index 387b089724a..b354fc59eee 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -65,6 +65,23 @@ public: experimental_use_sample_offset = value; } + void setMaxTableTries(UInt64 tries) + { + max_table_tries = tries; + } + void setMaxShardPartitionTries(UInt64 tries) + { + max_shard_partition_tries = tries; + } + void setMaxShardPartitionPieceTriesForAlter(UInt64 tries) + { + max_shard_partition_piece_tries_for_alter = tries; + } + void setRetryDelayMs(std::chrono::milliseconds ms) + { + retry_delay_ms = ms; + } + protected: String getWorkersPath() const @@ -123,10 +140,6 @@ protected: bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock); - static constexpr UInt64 max_table_tries = 3; - static constexpr UInt64 max_shard_partition_tries = 3; - static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10; - bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table); TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table); @@ -218,6 +231,9 @@ private: Poco::Logger * log; - std::chrono::milliseconds default_sleep_time{1000}; + UInt64 max_table_tries = 3; + UInt64 max_shard_partition_tries = 3; + UInt64 max_shard_partition_piece_tries_for_alter = 10; + std::chrono::milliseconds retry_delay_ms{1000}; }; } diff --git a/programs/copier/ClusterCopierApp.cpp b/programs/copier/ClusterCopierApp.cpp index b8714d0851d..8d7e4abce51 100644 --- a/programs/copier/ClusterCopierApp.cpp +++ b/programs/copier/ClusterCopierApp.cpp @@ -31,6 +31,10 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self) move_fault_probability = std::max(std::min(config().getDouble("move-fault-probability"), 1.0), 0.0); base_dir = (config().has("base-dir")) ? config().getString("base-dir") : fs::current_path().string(); + max_table_tries = std::max(config().getUInt("max-table-tries", 3), 1); + max_shard_partition_tries = std::max(config().getUInt("max-shard-partition-tries", 3), 1); + max_shard_partition_piece_tries_for_alter = std::max(config().getUInt("max-shard-partition-piece-tries-for-alter", 10), 1); + retry_delay_ms = std::chrono::milliseconds(std::max(config().getUInt("retry-delay-ms", 1000), 100)); if (config().has("experimental-use-sample-offset")) experimental_use_sample_offset = config().getBool("experimental-use-sample-offset"); @@ -100,6 +104,15 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options) .argument("experimental-use-sample-offset").binding("experimental-use-sample-offset")); options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status")); + options.addOption(Poco::Util::Option("max-table-tries", "", "Number of tries for the copy table task") + .argument("max-table-tries").binding("max-table-tries")); + options.addOption(Poco::Util::Option("max-shard-partition-tries", "", "Number of tries for the copy one partition task") + .argument("max-shard-partition-tries").binding("max-shard-partition-tries")); + options.addOption(Poco::Util::Option("max-shard-partition-piece-tries-for-alter", "", "Number of tries for final ALTER ATTACH to destination table") + .argument("max-shard-partition-piece-tries-for-alter").binding("max-shard-partition-piece-tries-for-alter")); + options.addOption(Poco::Util::Option("retry-delay-ms", "", "Delay between task retries") + .argument("retry-delay-ms").binding("retry-delay-ms")); + using Me = std::decay_t; options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help") .callback(Poco::Util::OptionCallback(this, &Me::handleHelp))); @@ -161,7 +174,10 @@ void ClusterCopierApp::mainImpl() copier->setSafeMode(is_safe_mode); copier->setCopyFaultProbability(copy_fault_probability); copier->setMoveFaultProbability(move_fault_probability); - + copier->setMaxTableTries(max_table_tries); + copier->setMaxShardPartitionTries(max_shard_partition_tries); + copier->setMaxShardPartitionPieceTriesForAlter(max_shard_partition_piece_tries_for_alter); + copier->setRetryDelayMs(retry_delay_ms); copier->setExperimentalUseSampleOffset(experimental_use_sample_offset); auto task_file = config().getString("task-file", ""); diff --git a/programs/copier/ClusterCopierApp.h b/programs/copier/ClusterCopierApp.h index cce07e338c0..d447cd96149 100644 --- a/programs/copier/ClusterCopierApp.h +++ b/programs/copier/ClusterCopierApp.h @@ -83,6 +83,11 @@ private: double move_fault_probability = 0.0; bool is_help = false; + UInt64 max_table_tries = 3; + UInt64 max_shard_partition_tries = 3; + UInt64 max_shard_partition_piece_tries_for_alter = 10; + std::chrono::milliseconds retry_delay_ms{1000}; + bool experimental_use_sample_offset{false}; std::string base_dir;