Merge pull request #72682 from vitlibar/add-retries-to-creating-replicated-table

Add retries to creating a replicated table
This commit is contained in:
Vitaly Baranov 2024-12-10 21:04:36 +00:00 committed by GitHub
commit e7df52d8a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 339 additions and 127 deletions

View File

@ -182,6 +182,7 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster(
, current_host(current_host_) , current_host(current_host_)
, current_host_index(findCurrentHostIndex(current_host, all_hosts)) , current_host_index(findCurrentHostIndex(current_host, all_hosts))
, plain_backup(is_plain_backup_) , plain_backup(is_plain_backup_)
, process_list_element(process_list_element_)
, log(getLogger("BackupCoordinationOnCluster")) , log(getLogger("BackupCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); }) , with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log) , cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log)
@ -273,7 +274,8 @@ ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeep
{ {
return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing, return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing,
static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()), static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()),
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count())}; static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count()),
process_list_element};
} }
void BackupCoordinationOnCluster::serializeToMultipleZooKeeperNodes(const String & path, const String & value, const String & logging_name) void BackupCoordinationOnCluster::serializeToMultipleZooKeeperNodes(const String & path, const String & value, const String & logging_name)

View File

@ -107,7 +107,8 @@ private:
const String current_host; const String current_host;
const size_t current_host_index; const size_t current_host_index;
const bool plain_backup; const bool plain_backup;
LoggerPtr const log; const QueryStatusPtr process_list_element;
const LoggerPtr log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`. /// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries; const WithRetries with_retries;

View File

@ -112,10 +112,11 @@ BackupEntriesCollector::BackupEntriesCollector(
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000)) context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true)) , compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
, log(getLogger("BackupEntriesCollector")) , log(getLogger("BackupEntriesCollector"))
, global_zookeeper_retries_info( , zookeeper_retries_info(
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries], context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms], context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms]) context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms],
context->getProcessListElementSafe())
, threadpool(threadpool_) , threadpool(threadpool_)
{ {
} }
@ -583,8 +584,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
try try
{ {
/// Database or table could be replicated - so may use ZooKeeper. We need to retry. /// Database or table could be replicated - so may use ZooKeeper. We need to retry.
auto zookeeper_retries_info = global_zookeeper_retries_info; ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info);
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); }); retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); });
} }
catch (Exception & e) catch (Exception & e)

View File

@ -48,7 +48,7 @@ public:
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; } std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
const ReadSettings & getReadSettings() const { return read_settings; } const ReadSettings & getReadSettings() const { return read_settings; }
ContextPtr getContext() const { return context; } ContextPtr getContext() const { return context; }
const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return global_zookeeper_retries_info; } const ZooKeeperRetriesInfo & getZooKeeperRetriesInfo() const { return zookeeper_retries_info; }
/// Returns all access entities which can be put into a backup. /// Returns all access entities which can be put into a backup.
std::unordered_map<UUID, AccessEntityPtr> getAllAccessEntities(); std::unordered_map<UUID, AccessEntityPtr> getAllAccessEntities();
@ -129,7 +129,7 @@ private:
LoggerPtr log; LoggerPtr log;
/// Unfortunately we can use ZooKeeper for collecting information for backup /// Unfortunately we can use ZooKeeper for collecting information for backup
/// and we need to retry... /// and we need to retry...
ZooKeeperRetriesInfo global_zookeeper_retries_info; ZooKeeperRetriesInfo zookeeper_retries_info;
Strings all_hosts; Strings all_hosts;
DDLRenamingMap renaming_map; DDLRenamingMap renaming_map;

View File

@ -33,6 +33,7 @@ RestoreCoordinationOnCluster::RestoreCoordinationOnCluster(
, all_hosts_without_initiator(BackupCoordinationOnCluster::excludeInitiator(all_hosts)) , all_hosts_without_initiator(BackupCoordinationOnCluster::excludeInitiator(all_hosts))
, current_host(current_host_) , current_host(current_host_)
, current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts)) , current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts))
, process_list_element(process_list_element_)
, log(getLogger("RestoreCoordinationOnCluster")) , log(getLogger("RestoreCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); }) , with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log) , cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log)
@ -122,7 +123,8 @@ ZooKeeperRetriesInfo RestoreCoordinationOnCluster::getOnClusterInitializationKee
{ {
return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing, return ZooKeeperRetriesInfo{keeper_settings.max_retries_while_initializing,
static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()), static_cast<UInt64>(keeper_settings.retry_initial_backoff_ms.count()),
static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count())}; static_cast<UInt64>(keeper_settings.retry_max_backoff_ms.count()),
process_list_element};
} }
bool RestoreCoordinationOnCluster::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) bool RestoreCoordinationOnCluster::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)

View File

@ -75,7 +75,8 @@ private:
const Strings all_hosts_without_initiator; const Strings all_hosts_without_initiator;
const String current_host; const String current_host;
const size_t current_host_index; const size_t current_host_index;
LoggerPtr const log; const QueryStatusPtr process_list_element;
const LoggerPtr log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`. /// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries; const WithRetries with_retries;

View File

@ -20,6 +20,7 @@
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Databases/DDLDependencyVisitor.h> #include <Databases/DDLDependencyVisitor.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <base/insertAtEnd.h> #include <base/insertAtEnd.h>
@ -39,6 +40,9 @@ namespace DB
{ {
namespace Setting namespace Setting
{ {
extern const SettingsUInt64 backup_restore_keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 backup_restore_keeper_retry_max_backoff_ms;
extern const SettingsUInt64 backup_restore_keeper_max_retries;
extern const SettingsSeconds lock_acquire_timeout; extern const SettingsSeconds lock_acquire_timeout;
} }
@ -103,6 +107,11 @@ RestorerFromBackup::RestorerFromBackup(
, after_task_callback(after_task_callback_) , after_task_callback(after_task_callback_)
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000)) , create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(getLogger("RestorerFromBackup")) , log(getLogger("RestorerFromBackup"))
, zookeeper_retries_info(
context->getSettingsRef()[Setting::backup_restore_keeper_max_retries],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_initial_backoff_ms],
context->getSettingsRef()[Setting::backup_restore_keeper_retry_max_backoff_ms],
context->getProcessListElementSafe())
, tables_dependencies("RestorerFromBackup") , tables_dependencies("RestorerFromBackup")
, thread_pool(thread_pool_) , thread_pool(thread_pool_)
{ {
@ -977,6 +986,11 @@ void RestorerFromBackup::createTable(const QualifiedTableName & table_name)
query_context->setSetting("database_replicated_allow_explicit_uuid", 3); query_context->setSetting("database_replicated_allow_explicit_uuid", 3);
query_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3); query_context->setSetting("database_replicated_allow_replicated_engine_arguments", 3);
/// Creating of replicated tables may need retries.
query_context->setSetting("keeper_max_retries", zookeeper_retries_info.max_retries);
query_context->setSetting("keeper_initial_backoff_ms", zookeeper_retries_info.initial_backoff_ms);
query_context->setSetting("keeper_max_backoff_ms", zookeeper_retries_info.max_backoff_ms);
/// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some /// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some
/// database-specific things). /// database-specific things).
database->createTableRestoredFromBackup( database->createTableRestoredFromBackup(

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Backups/RestoreSettings.h> #include <Backups/RestoreSettings.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Databases/DDLRenamingVisitor.h> #include <Databases/DDLRenamingVisitor.h>
#include <Databases/TablesDependencyGraph.h> #include <Databases/TablesDependencyGraph.h>
#include <Parsers/ASTBackupQuery.h> #include <Parsers/ASTBackupQuery.h>
@ -85,6 +86,7 @@ private:
std::chrono::milliseconds create_table_timeout; std::chrono::milliseconds create_table_timeout;
LoggerPtr log; LoggerPtr log;
const ZooKeeperRetriesInfo zookeeper_retries_info;
Mode mode = Mode::RESTORE; Mode mode = Mode::RESTORE;
Strings all_hosts; Strings all_hosts;
DDLRenamingMap renaming_map; DDLRenamingMap renaming_map;
@ -172,7 +174,6 @@ private:
TablesDependencyGraph tables_dependencies TSA_GUARDED_BY(mutex); TablesDependencyGraph tables_dependencies TSA_GUARDED_BY(mutex);
std::vector<DataRestoreTask> data_restore_tasks TSA_GUARDED_BY(mutex); std::vector<DataRestoreTask> data_restore_tasks TSA_GUARDED_BY(mutex);
std::unique_ptr<AccessRestorerFromBackup> access_restorer TSA_GUARDED_BY(mutex); std::unique_ptr<AccessRestorerFromBackup> access_restorer TSA_GUARDED_BY(mutex);
bool access_restored TSA_GUARDED_BY(mutex) = false;
std::vector<std::future<void>> futures TSA_GUARDED_BY(mutex); std::vector<std::future<void>> futures TSA_GUARDED_BY(mutex);
std::atomic<bool> exception_caught = false; std::atomic<bool> exception_caught = false;

View File

@ -20,9 +20,10 @@ WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * pare
: (kind == kErrorHandling) ? parent->settings.max_retries_while_handling_error : (kind == kErrorHandling) ? parent->settings.max_retries_while_handling_error
: parent->settings.max_retries, : parent->settings.max_retries,
parent->settings.retry_initial_backoff_ms.count(), parent->settings.retry_initial_backoff_ms.count(),
parent->settings.retry_max_backoff_ms.count()) parent->settings.retry_max_backoff_ms.count(),
(kind == kErrorHandling) ? nullptr : parent->process_list_element)
/// We don't use process_list_element while handling an error because the error handling can't be cancellable. /// We don't use process_list_element while handling an error because the error handling can't be cancellable.
, retries_ctl(name, parent->log, info, (kind == kErrorHandling) ? nullptr : parent->process_list_element) , retries_ctl(name, parent->log, info)
, faulty_zookeeper(parent->getFaultyZooKeeper()) , faulty_zookeeper(parent->getFaultyZooKeeper())
{} {}

View File

@ -16,21 +16,25 @@ namespace ErrorCodes
struct ZooKeeperRetriesInfo struct ZooKeeperRetriesInfo
{ {
ZooKeeperRetriesInfo() = default; ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_, QueryStatusPtr query_status_)
: max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_) : max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_)
, query_status(query_status_)
{ {
} }
UInt64 max_retries = 0; /// "max_retries = 0" means only one attempt. UInt64 max_retries = 0; /// "max_retries = 0" means only one attempt.
UInt64 initial_backoff_ms = 100; UInt64 initial_backoff_ms = 0;
UInt64 max_backoff_ms = 5000; UInt64 max_backoff_ms = 0;
QueryStatusPtr query_status; /// can be nullptr
}; };
class ZooKeeperRetriesControl class ZooKeeperRetriesControl
{ {
public: public:
ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_, QueryStatusPtr elem) ZooKeeperRetriesControl(std::string name_, LoggerPtr logger_, ZooKeeperRetriesInfo retries_info_)
: name(std::move(name_)), logger(logger_), retries_info(retries_info_), process_list_element(elem) : name(std::move(name_)), logger(logger_), retries_info(retries_info_)
{ {
} }
@ -39,7 +43,6 @@ public:
, logger(other.logger) , logger(other.logger)
, retries_info(other.retries_info) , retries_info(other.retries_info)
, total_failures(other.total_failures) , total_failures(other.total_failures)
, process_list_element(other.process_list_element)
, current_backoff_ms(other.current_backoff_ms) , current_backoff_ms(other.current_backoff_ms)
{ {
} }
@ -222,8 +225,8 @@ private:
} }
/// Check if the query was cancelled. /// Check if the query was cancelled.
if (process_list_element) if (retries_info.query_status)
process_list_element->checkTimeLimit(); retries_info.query_status->checkTimeLimit();
/// retries /// retries
logLastError("will retry due to error"); logLastError("will retry due to error");
@ -231,8 +234,8 @@ private:
current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms); current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms);
/// Check if the query was cancelled again after sleeping. /// Check if the query was cancelled again after sleeping.
if (process_list_element) if (retries_info.query_status)
process_list_element->checkTimeLimit(); retries_info.query_status->checkTimeLimit();
return true; return true;
} }
@ -288,7 +291,6 @@ private:
std::function<void()> action_after_last_failed_retry = []() {}; std::function<void()> action_after_last_failed_retry = []() {};
bool iteration_succeeded = true; bool iteration_succeeded = true;
bool stop_retries = false; bool stop_retries = false;
QueryStatusPtr process_list_element;
UInt64 current_iteration = 0; UInt64 current_iteration = 0;
UInt64 current_backoff_ms = 0; UInt64 current_backoff_ms = 0;

View File

@ -408,7 +408,7 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab
} }
else else
{ {
rmt->restoreMetadataInZooKeeper(); rmt->restoreMetadataInZooKeeper(/* zookeeper_retries_info = */ {});
LOG_INFO LOG_INFO
( (
log, log,

View File

@ -199,7 +199,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
} }
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &)
{ {
auto zookeeper = getAndSetZooKeeper(); auto zookeeper = getAndSetZooKeeper();
return enqueueQueryImpl(zookeeper, entry, database); return enqueueQueryImpl(zookeeper, entry, database);

View File

@ -24,7 +24,7 @@ class DatabaseReplicatedDDLWorker : public DDLWorker
public: public:
DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_); DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_);
String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) override; String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &) override;
String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context); String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context);

View File

@ -1054,12 +1054,12 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP
} }
String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element) String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info)
{ {
String node_path; String node_path;
if (retries_info.max_retries > 0) if (retries_info.max_retries > 0)
{ {
ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info, process_list_element}; ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info};
retries_ctl.retryLoop([&]{ retries_ctl.retryLoop([&]{
node_path = enqueueQueryAttempt(entry); node_path = enqueueQueryAttempt(entry);
}); });

View File

@ -68,7 +68,7 @@ public:
virtual ~DDLWorker(); virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node /// Pushes query into DDL queue, returns path to created node
virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element); virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info);
/// Host ID (name:port) for logging purposes /// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config /// Note that in each task hosts are identified individually by name:port from initiator server cluster config

View File

@ -133,8 +133,7 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path
String status_data; String status_data;
bool finished_exists = false; bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl( auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo());
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); }); retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); });
if (finished_exists) if (finished_exists)
status.tryDeserializeText(status_data); status.tryDeserializeText(status_data);
@ -142,13 +141,14 @@ ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path
return status; return status;
} }
ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo() ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo() const
{ {
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef(); const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo( return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5), config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100), config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000)); config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000),
context->getProcessListElement());
} }
std::pair<String, UInt16> DistributedQueryStatusSource::parseHostAndPort(const String & host_id) std::pair<String, UInt16> DistributedQueryStatusSource::parseHostAndPort(const String & host_id)
@ -194,8 +194,7 @@ Chunk DistributedQueryStatusSource::generate()
Strings tmp_active_hosts; Strings tmp_active_hosts;
{ {
auto retries_ctl = ZooKeeperRetriesControl( auto retries_ctl = ZooKeeperRetriesControl("executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo());
"executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop( retries_ctl.retryLoop(
[&]() [&]()
{ {

View File

@ -38,7 +38,7 @@ protected:
Strings getNewAndUpdate(const Strings & current_finished_hosts); Strings getNewAndUpdate(const Strings & current_finished_hosts);
ExecutionStatus getExecutionStatus(const fs::path & status_path); ExecutionStatus getExecutionStatus(const fs::path & status_path);
static ZooKeeperRetriesInfo getRetriesInfo(); ZooKeeperRetriesInfo getRetriesInfo() const;
static std::pair<String, UInt16> parseHostAndPort(const String & host_id); static std::pair<String, UInt16> parseHostAndPort(const String & host_id);
String node_path; String node_path;

View File

@ -98,6 +98,9 @@ namespace DB
{ {
namespace Setting namespace Setting
{ {
extern const SettingsUInt64 keeper_max_retries;
extern const SettingsUInt64 keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 keeper_retry_max_backoff_ms;
extern const SettingsSeconds lock_acquire_timeout; extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsSeconds receive_timeout; extern const SettingsSeconds receive_timeout;
extern const SettingsMaxThreads max_threads; extern const SettingsMaxThreads max_threads;
@ -878,7 +881,13 @@ void InterpreterSystemQuery::restoreReplica()
if (table_replicated_ptr == nullptr) if (table_replicated_ptr == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs()); throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
table_replicated_ptr->restoreMetadataInZooKeeper(); const auto & settings = getContext()->getSettingsRef();
table_replicated_ptr->restoreMetadataInZooKeeper(
ZooKeeperRetriesInfo{settings[Setting::keeper_max_retries],
settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
getContext()->getProcessListElementSafe()});
} }
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context) StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context)

View File

@ -189,7 +189,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context); entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext(); entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id; entry.initial_query_id = context->getClientInfo().initial_query_id;
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement()); String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context); return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
} }

View File

@ -166,23 +166,24 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// Just in case it was not removed earlier due to connection loss /// Just in case it was not removed earlier due to connection loss
zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); zookeeper->tryRemove(replica_path + "/flags/force_restore_data");
storage.checkTableStructure(replica_path, metadata_snapshot); /// Here `zookeeper_retries_info = {}` because the attach thread has its own retries (see ReplicatedMergeTreeAttachThread::run()).
storage.checkTableStructure(replica_path, metadata_snapshot, /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {});
storage.checkParts(skip_sanity_checks); storage.checkParts(skip_sanity_checks);
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
/// don't allow to reinitialize them, delete each of them immediately. /// don't allow to reinitialize them, delete each of them immediately.
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
storage.createNewZooKeeperNodes(); storage.createNewZooKeeperNodes(/* zookeeper_retries_info = */ {});
storage.syncPinnedPartUUIDs(); storage.syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {});
std::lock_guard lock(storage.table_shared_id_mutex); std::lock_guard lock(storage.table_shared_id_mutex);
storage.createTableSharedID(); storage.createTableSharedID(/* zookeeper_retries_info = */ {});
}; };
void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
{ {
storage.startupImpl(/* from_attach_thread */ true); storage.startupImpl(/* from_attach_thread */ true, /* zookeeper_retries_info = */ {});
storage.initialization_done = true; storage.initialization_done = true;
LOG_INFO(log, "Table is initialized"); LOG_INFO(log, "Table is initialized");
} }

View File

@ -201,8 +201,8 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
log, log,
{settings[Setting::insert_keeper_max_retries], {settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms], settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]}, settings[Setting::insert_keeper_retry_max_backoff_ms],
context->getProcessListElement()); context->getProcessListElement()});
quorum_retries_ctl.retryLoop( quorum_retries_ctl.retryLoop(
[&]() [&]()
{ {
@ -725,8 +725,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
log, log,
{settings[Setting::insert_keeper_max_retries], {settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms], settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]}, settings[Setting::insert_keeper_retry_max_backoff_ms],
context->getProcessListElement()); context->getProcessListElement()});
auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages
{ {

View File

@ -13,6 +13,7 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/OptimizedRegularExpression.h> #include <Common/OptimizedRegularExpression.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -37,6 +38,9 @@ namespace Setting
extern const SettingsBool allow_suspicious_ttl_expressions; extern const SettingsBool allow_suspicious_ttl_expressions;
extern const SettingsBool create_table_empty_primary_key_by_default; extern const SettingsBool create_table_empty_primary_key_by_default;
extern const SettingsUInt64 database_replicated_allow_replicated_engine_arguments; extern const SettingsUInt64 database_replicated_allow_replicated_engine_arguments;
extern const SettingsUInt64 keeper_max_retries;
extern const SettingsUInt64 keeper_retry_initial_backoff_ms;
extern const SettingsUInt64 keeper_retry_max_backoff_ms;
} }
namespace MergeTreeSetting namespace MergeTreeSetting
@ -831,6 +835,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (auto txn = args.getLocalContext()->getZooKeeperMetadataTransaction()) if (auto txn = args.getLocalContext()->getZooKeeperMetadataTransaction())
need_check_table_structure = txn->isInitialQuery(); need_check_table_structure = txn->isInitialQuery();
ZooKeeperRetriesInfo create_query_zk_retries_info{
local_settings[Setting::keeper_max_retries],
local_settings[Setting::keeper_retry_initial_backoff_ms],
local_settings[Setting::keeper_retry_max_backoff_ms],
args.getLocalContext()->getProcessListElementSafe()};
return std::make_shared<StorageReplicatedMergeTree>( return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_info, zookeeper_info,
args.mode, args.mode,
@ -841,8 +851,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
date_column_name, date_column_name,
merging_params, merging_params,
std::move(storage_settings), std::move(storage_settings),
need_check_table_structure); need_check_table_structure,
create_query_zk_retries_info);
} }
return std::make_shared<StorageMergeTree>( return std::make_shared<StorageMergeTree>(
args.table_id, args.table_id,
args.relative_data_path, args.relative_data_path,

View File

@ -189,8 +189,8 @@ public:
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms], settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]}, settings[Setting::insert_keeper_retry_max_backoff_ms],
context->getProcessListElement()}; context->getProcessListElement()}};
zk_retry.retryLoop([&]() zk_retry.retryLoop([&]()
{ {
@ -425,8 +425,10 @@ StorageKeeperMap::StorageKeeperMap(
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
context_->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
context_->getProcessListElement()}};
zk_retry.retryLoop( zk_retry.retryLoop(
[&] [&]
@ -670,8 +672,10 @@ Pipe StorageKeeperMap::read(
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
context_->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
context_->getProcessListElement()}};
std::vector<std::string> children; std::vector<std::string> children;
zk_retry.retryLoop([&] zk_retry.retryLoop([&]
@ -699,8 +703,10 @@ void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
local_context->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
local_context->getProcessListElement()}};
zk_retry.retryLoop([&] zk_retry.retryLoop([&]
{ {
@ -1136,8 +1142,10 @@ StorageKeeperMap::TableStatus StorageKeeperMap::getTableStatus(const ContextPtr
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
local_context->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
local_context->getProcessListElement()}};
zk_retry.retryLoop([&] zk_retry.retryLoop([&]
{ {
@ -1248,8 +1256,10 @@ Chunk StorageKeeperMap::getBySerializedKeys(
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
local_context->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
local_context->getProcessListElement()}};
zkutil::ZooKeeper::MultiTryGetResponse values; zkutil::ZooKeeper::MultiTryGetResponse values;
zk_retry.retryLoop([&]{ zk_retry.retryLoop([&]{
@ -1394,8 +1404,10 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
getName(), getName(),
getLogger(getName()), getLogger(getName()),
ZooKeeperRetriesInfo{ ZooKeeperRetriesInfo{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}, settings[Setting::keeper_max_retries],
local_context->getProcessListElement()}; settings[Setting::keeper_retry_initial_backoff_ms],
settings[Setting::keeper_retry_max_backoff_ms],
local_context->getProcessListElement()}};
Coordination::Error status; Coordination::Error status;
zk_retry.retryLoop([&] zk_retry.retryLoop([&]

View File

@ -374,7 +374,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name, const String & date_column_name,
const MergingParams & merging_params_, const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_, std::unique_ptr<MergeTreeSettings> settings_,
bool need_check_structure) bool need_check_structure,
const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_)
: MergeTreeData(table_id_, : MergeTreeData(table_id_,
metadata_, metadata_,
context_, context_,
@ -388,6 +389,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, zookeeper_path(zookeeper_info.path) , zookeeper_path(zookeeper_info.path)
, replica_name(zookeeper_info.replica_name) , replica_name(zookeeper_info.replica_name)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name) , replica_path(fs::path(zookeeper_path) / "replicas" / replica_name)
, create_query_zookeeper_retries_info(create_query_zookeeper_retries_info_)
, reader(*this) , reader(*this)
, writer(*this) , writer(*this)
, merger_mutator(*this) , merger_mutator(*this)
@ -577,7 +579,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
try try
{ {
bool is_first_replica = createTableIfNotExists(metadata_snapshot); bool is_first_replica = createTableIfNotExists(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo());
try try
{ {
@ -586,24 +588,22 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// We have to check granularity on other replicas. If it's fixed we /// We have to check granularity on other replicas. If it's fixed we
/// must create our new replica with fixed granularity and store this /// must create our new replica with fixed granularity and store this
/// information in /replica/metadata. /// information in /replica/metadata.
other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(); other_replicas_fixed_granularity = checkFixedGranularityInZookeeper(getCreateQueryZooKeeperRetriesInfo());
/// Allow structure mismatch for secondary queries from Replicated database. /// Allow structure mismatch for secondary queries from Replicated database.
/// It may happen if the table was altered just after creation. /// It may happen if the table was altered just after creation.
/// Metadata will be updated in cloneMetadataIfNeeded(...), metadata_version will be 0 for a while. /// Metadata will be updated in cloneMetadataIfNeeded(...), metadata_version will be 0 for a while.
bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, need_check_structure); int32_t metadata_version;
bool same_structure = checkTableStructure(zookeeper_path, metadata_snapshot, &metadata_version, need_check_structure, getCreateQueryZooKeeperRetriesInfo());
if (same_structure) if (same_structure)
{ {
Coordination::Stat metadata_stat;
current_zookeeper->get(fs::path(zookeeper_path) / "metadata", &metadata_stat);
/** We change metadata_snapshot so that `createReplica` method will create `metadata_version` node in ZooKeeper /** We change metadata_snapshot so that `createReplica` method will create `metadata_version` node in ZooKeeper
* with version of table '/metadata' node in Zookeeper. * with version of table '/metadata' node in Zookeeper.
* *
* Otherwise `metadata_version` for not first replica will be initialized with 0 by default. * Otherwise `metadata_version` for not first replica will be initialized with 0 by default.
*/ */
setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_stat.version)); setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_version));
metadata_snapshot = getInMemoryMetadataPtr(); metadata_snapshot = getInMemoryMetadataPtr();
} }
} }
@ -615,15 +615,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
} }
if (!is_first_replica) if (!is_first_replica)
createReplica(metadata_snapshot); createReplica(metadata_snapshot, getCreateQueryZooKeeperRetriesInfo());
createNewZooKeeperNodes(); createNewZooKeeperNodes(getCreateQueryZooKeeperRetriesInfo());
syncPinnedPartUUIDs(); syncPinnedPartUUIDs(getCreateQueryZooKeeperRetriesInfo());
if (!has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper) if (!has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper)
createTableSharedID(); createTableSharedID(getCreateQueryZooKeeperRetriesInfo());
} }
catch (...) catch (...)
{ {
@ -636,12 +634,29 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
} }
bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper() bool StorageReplicatedMergeTree::checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{ {
bool fixed_granularity = false;
auto check_fixed_granularity = [&]
{
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
String metadata_str = zookeeper->get(zookeeper_path + "/metadata"); String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
return metadata_from_zk.index_granularity_bytes == 0; fixed_granularity = (metadata_from_zk.index_granularity_bytes == 0);
};
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkFixedGranularityInZookeeper", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { check_fixed_granularity(); });
}
else
{
check_fixed_granularity();
}
return fixed_granularity;
} }
@ -816,7 +831,20 @@ std::vector<String> getAncestors(const String & path)
} }
void StorageReplicatedMergeTree::createNewZooKeeperNodes() void StorageReplicatedMergeTree::createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createNewZooKeeperNodes", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { createNewZooKeeperNodesAttempt(); });
}
else
{
createNewZooKeeperNodesAttempt();
}
}
void StorageReplicatedMergeTree::createNewZooKeeperNodesAttempt() const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -881,14 +909,32 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
} }
} }
bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{
bool table_created = false;
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableIfNotExists", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { table_created = createTableIfNotExistsAttempt(metadata_snapshot, zookeeper_retries_info.query_status); });
}
else
{
table_created = createTableIfNotExistsAttempt(metadata_snapshot, zookeeper_retries_info.query_status);
}
return table_created;
}
bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot) bool StorageReplicatedMergeTree::createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zookeeper_path); zookeeper->createAncestors(zookeeper_path);
for (size_t i = 0; i < 1000; ++i) for (size_t i = 0; i < 1000; ++i)
{ {
/// Check if the query was cancelled.
if (process_list_element)
process_list_element->checkTimeLimit();
/// Invariant: "replicas" does not exist if there is no table or if there are leftovers from incompletely dropped table. /// Invariant: "replicas" does not exist if there is no table or if there are leftovers from incompletely dropped table.
if (zookeeper->exists(zookeeper_path + "/replicas")) if (zookeeper->exists(zookeeper_path + "/replicas"))
{ {
@ -1027,7 +1073,20 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
"of wrong zookeeper_path or because of logical error"); "of wrong zookeeper_path or because of logical error");
} }
void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot) void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createReplica", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { createReplicaAttempt(metadata_snapshot, zookeeper_retries_info.query_status); });
}
else
{
createReplicaAttempt(metadata_snapshot, zookeeper_retries_info.query_status);
}
}
void StorageReplicatedMergeTree::createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -1111,6 +1170,10 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
do do
{ {
/// Check if the query was cancelled.
if (process_list_element)
process_list_element->checkTimeLimit();
Coordination::Stat replicas_stat; Coordination::Stat replicas_stat;
String replicas_value; String replicas_value;
@ -1177,6 +1240,18 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
} while (code == Coordination::Error::ZBADVERSION); } while (code == Coordination::Error::ZBADVERSION);
} }
ZooKeeperRetriesInfo StorageReplicatedMergeTree::getCreateQueryZooKeeperRetriesInfo() const
{
std::lock_guard lock{create_query_zookeeper_retries_info_mutex};
return create_query_zookeeper_retries_info;
}
void StorageReplicatedMergeTree::clearCreateQueryZooKeeperRetriesInfo()
{
std::lock_guard lock{create_query_zookeeper_retries_info_mutex};
create_query_zookeeper_retries_info = {};
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const
{ {
@ -1538,7 +1613,26 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
/** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/metadata). /** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/metadata).
* If not, throw an exception. * If not, throw an exception.
*/ */
bool StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check) bool StorageReplicatedMergeTree::checkTableStructure(
const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check,
const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{
bool same_structure = false;
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::checkTableStructure", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { same_structure = checkTableStructureAttempt(zookeeper_prefix, metadata_snapshot, metadata_version, strict_check); });
}
else
{
same_structure = checkTableStructureAttempt(zookeeper_prefix, metadata_snapshot, metadata_version, strict_check);
}
return same_structure;
}
bool StorageReplicatedMergeTree::checkTableStructureAttempt(
const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -1549,6 +1643,9 @@ bool StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr
auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
old_metadata.checkEquals(metadata_from_zk, metadata_snapshot->getColumns(), getContext()); old_metadata.checkEquals(metadata_from_zk, metadata_snapshot->getColumns(), getContext());
if (metadata_version)
*metadata_version = metadata_stat.version;
Coordination::Stat columns_stat; Coordination::Stat columns_stat;
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat)); auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
@ -1866,21 +1963,35 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
} }
void StorageReplicatedMergeTree::syncPinnedPartUUIDs() void StorageReplicatedMergeTree::syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info)
{ {
auto zookeeper = getZooKeeper(); String new_pinned_part_uuids_str;
Coordination::Stat new_stat;
Coordination::Stat stat; auto read_pinned_part_uuids = [&]
String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat); {
auto zookeeper = getZooKeeper();
new_pinned_part_uuids_str = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &new_stat);
};
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::syncPinnedPartUUIDs", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { read_pinned_part_uuids(); });
}
else
{
read_pinned_part_uuids();
}
std::lock_guard lock(pinned_part_uuids_mutex); std::lock_guard lock(pinned_part_uuids_mutex);
/// Unsure whether or not this can be called concurrently. /// Unsure whether or not this can be called concurrently.
if (pinned_part_uuids->stat.version < stat.version) if (pinned_part_uuids->stat.version < new_stat.version)
{ {
auto new_pinned_part_uuids = std::make_shared<PinnedPartUUIDs>(); auto new_pinned_part_uuids = std::make_shared<PinnedPartUUIDs>();
new_pinned_part_uuids->fromString(s); new_pinned_part_uuids->fromString(new_pinned_part_uuids_str);
new_pinned_part_uuids->stat = stat; new_pinned_part_uuids->stat = new_stat;
pinned_part_uuids = new_pinned_part_uuids; pinned_part_uuids = new_pinned_part_uuids;
} }
@ -2234,7 +2345,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
case LogEntry::ALTER_METADATA: case LogEntry::ALTER_METADATA:
return executeMetadataAlter(entry); return executeMetadataAlter(entry);
case LogEntry::SYNC_PINNED_PART_UUIDS: case LogEntry::SYNC_PINNED_PART_UUIDS:
syncPinnedPartUUIDs(); syncPinnedPartUUIDs(/* zookeeper_retries_info = */ {});
return true; return true;
case LogEntry::CLONE_PART_FROM_SHARD: case LogEntry::CLONE_PART_FROM_SHARD:
executeClonePartFromShard(entry); executeClonePartFromShard(entry);
@ -4412,17 +4523,29 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
} }
void StorageReplicatedMergeTree::startBeingLeader() void StorageReplicatedMergeTree::startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info)
{ {
auto zookeeper = getZooKeeper();
if (!(*getSettings())[MergeTreeSetting::replicated_can_become_leader]) if (!(*getSettings())[MergeTreeSetting::replicated_can_become_leader])
{ {
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
return; return;
} }
auto start_being_leader = [&]
{
auto zookeeper = getZooKeeper();
zkutil::checkNoOldLeaders(log.load(), *zookeeper, fs::path(zookeeper_path) / "leader_election"); zkutil::checkNoOldLeaders(log.load(), *zookeeper, fs::path(zookeeper_path) / "leader_election");
};
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::startBeingLeader", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { start_being_leader(); });
}
else
{
start_being_leader();
}
LOG_INFO(log, "Became leader"); LOG_INFO(log, "Became leader");
is_leader = true; is_leader = true;
@ -5297,10 +5420,10 @@ void StorageReplicatedMergeTree::startup()
return; return;
} }
startupImpl(/* from_attach_thread */ false); startupImpl(/* from_attach_thread */ false, getCreateQueryZooKeeperRetriesInfo());
} }
void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread) void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info)
{ {
/// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper /// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper
if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper) if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper)
@ -5329,7 +5452,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
getContext()->getInterserverIOHandler().addEndpoint( getContext()->getInterserverIOHandler().addEndpoint(
data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr); data_parts_exchange_ptr->getId(getEndpointName()), data_parts_exchange_ptr);
startBeingLeader(); startBeingLeader(zookeeper_retries_info);
if (from_attach_thread) if (from_attach_thread)
{ {
@ -5367,6 +5490,9 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
startBackgroundMovesIfNeeded(); startBackgroundMovesIfNeeded();
part_moves_between_shards_orchestrator.start(); part_moves_between_shards_orchestrator.start();
/// After finishing startup() create_query_zk_retries_info won't be used anymore.
clearCreateQueryZooKeeperRetriesInfo();
} }
catch (...) catch (...)
{ {
@ -6595,7 +6721,7 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(
return true; return true;
} }
void StorageReplicatedMergeTree::restoreMetadataInZooKeeper() void StorageReplicatedMergeTree::restoreMetadataInZooKeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info)
{ {
LOG_INFO(log, "Restoring replica metadata"); LOG_INFO(log, "Restoring replica metadata");
@ -6638,14 +6764,14 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
LOG_INFO(log, "Moved all parts to detached/"); LOG_INFO(log, "Moved all parts to detached/");
const bool is_first_replica = createTableIfNotExists(metadata_snapshot); const bool is_first_replica = createTableIfNotExists(metadata_snapshot, zookeeper_retries_info);
LOG_INFO(log, "Created initial ZK nodes, replica is first: {}", is_first_replica); LOG_INFO(log, "Created initial ZK nodes, replica is first: {}", is_first_replica);
if (!is_first_replica) if (!is_first_replica)
createReplica(metadata_snapshot); createReplica(metadata_snapshot, zookeeper_retries_info);
createNewZooKeeperNodes(); createNewZooKeeperNodes(zookeeper_retries_info);
LOG_INFO(log, "Created ZK nodes for table"); LOG_INFO(log, "Created ZK nodes for table");
@ -6657,7 +6783,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
LOG_INFO(log, "Attached all partitions, starting table"); LOG_INFO(log, "Attached all partitions, starting table");
startupImpl(/* from_attach_thread */ false); startupImpl(/* from_attach_thread */ false, zookeeper_retries_info);
} }
void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name) void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)
@ -7946,8 +8072,8 @@ void StorageReplicatedMergeTree::forcefullyRemoveBrokenOutdatedPartFromZooKeeper
String part_path = replica_path + "/parts/" + part_name; String part_path = replica_path + "/parts/" + part_name;
const auto & settings = getContext()->getSettingsRef(); const auto & settings = getContext()->getSettingsRef();
ZooKeeperRetriesInfo retries_info{ ZooKeeperRetriesInfo retries_info{
settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms]}; settings[Setting::keeper_max_retries], settings[Setting::keeper_retry_initial_backoff_ms], settings[Setting::keeper_retry_max_backoff_ms], nullptr};
ZooKeeperRetriesControl retries_ctl("outdatedPartExists", log.load(), retries_info, nullptr); ZooKeeperRetriesControl retries_ctl("outdatedPartExists", log.load(), retries_info);
retries_ctl.retryLoop([&]() { exists = getZooKeeper()->exists(part_path); }); retries_ctl.retryLoop([&]() { exists = getZooKeeper()->exists(part_path); });
if (!exists) if (!exists)
@ -8836,7 +8962,7 @@ void StorageReplicatedMergeTree::movePartitionToShard(
{ {
/// Optimistic check that for compatible destination table structure. /// Optimistic check that for compatible destination table structure.
checkTableStructure(to, getInMemoryMetadataPtr()); checkTableStructure(to, getInMemoryMetadataPtr(), /* metadata_version = */ nullptr, /* strict_check = */ true, /* zookeeper_retries_info = */ {});
} }
PinnedPartUUIDs src_pins; PinnedPartUUIDs src_pins;
@ -9439,7 +9565,7 @@ String StorageReplicatedMergeTree::getTableSharedID() const
{ {
/// Can happen if table was partially initialized before drop by DatabaseCatalog /// Can happen if table was partially initialized before drop by DatabaseCatalog
if (table_shared_id == UUIDHelpers::Nil) if (table_shared_id == UUIDHelpers::Nil)
createTableSharedID(); createTableSharedID(/* zookeeper_retries_info = */ {});
} }
else else
{ {
@ -9454,7 +9580,20 @@ std::map<std::string, MutationCommands> StorageReplicatedMergeTree::getUnfinishe
return queue.getUnfinishedMutations(); return queue.getUnfinishedMutations();
} }
void StorageReplicatedMergeTree::createTableSharedID() const void StorageReplicatedMergeTree::createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info) const
{
if (zookeeper_retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"StorageReplicatedMergeTree::createTableSharedID", log.load(), zookeeper_retries_info};
retries_ctl.retryLoop([&] { createTableSharedIDAttempt(); });
}
else
{
createTableSharedIDAttempt();
}
}
void StorageReplicatedMergeTree::createTableSharedIDAttempt() const
{ {
LOG_DEBUG(log, "Creating shared ID for table {}", getStorageID().getNameForLogs()); LOG_DEBUG(log, "Creating shared ID for table {}", getStorageID().getNameForLogs());
// can be set by the call to getTableSharedID // can be set by the call to getTableSharedID
@ -10735,7 +10874,7 @@ void StorageReplicatedMergeTree::backupData(
bool exists = false; bool exists = false;
Strings mutation_ids; Strings mutation_ids;
{ {
ZooKeeperRetriesControl retries_ctl("getMutations", log.load(), zookeeper_retries_info, nullptr); ZooKeeperRetriesControl retries_ctl("getMutations", log.load(), zookeeper_retries_info);
retries_ctl.retryLoop([&]() retries_ctl.retryLoop([&]()
{ {
if (!zookeeper || zookeeper->expired()) if (!zookeeper || zookeeper->expired())
@ -10754,7 +10893,7 @@ void StorageReplicatedMergeTree::backupData(
bool mutation_id_exists = false; bool mutation_id_exists = false;
String mutation; String mutation;
ZooKeeperRetriesControl retries_ctl("getMutation", log.load(), zookeeper_retries_info, nullptr); ZooKeeperRetriesControl retries_ctl("getMutation", log.load(), zookeeper_retries_info);
retries_ctl.retryLoop([&]() retries_ctl.retryLoop([&]()
{ {
if (!zookeeper || zookeeper->expired()) if (!zookeeper || zookeeper->expired())

View File

@ -33,6 +33,7 @@
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Common/EventNotifier.h> #include <Common/EventNotifier.h>
#include <base/defines.h> #include <base/defines.h>
@ -108,7 +109,8 @@ public:
const String & date_column_name, const String & date_column_name,
const MergingParams & merging_params_, const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_, std::unique_ptr<MergeTreeSettings> settings_,
bool need_check_structure); bool need_check_structure,
const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_);
void startup() override; void startup() override;
@ -314,7 +316,7 @@ public:
/// Restores table metadata if ZooKeeper lost it. /// Restores table metadata if ZooKeeper lost it.
/// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/ /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
/// folder and attached. Parts in all other states are just moved to detached/ folder. /// folder and attached. Parts in all other states are just moved to detached/ folder.
void restoreMetadataInZooKeeper(); void restoreMetadataInZooKeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info);
/// Get throttler for replicated fetches /// Get throttler for replicated fetches
ThrottlerPtr getFetchesThrottler() const ThrottlerPtr getFetchesThrottler() const
@ -426,6 +428,9 @@ private:
const String replica_name; // shorthand for zookeeper_info.replica_name const String replica_name; // shorthand for zookeeper_info.replica_name
const String replica_path; const String replica_path;
ZooKeeperRetriesInfo create_query_zookeeper_retries_info TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex);
mutable std::mutex create_query_zookeeper_retries_info_mutex;
/** /replicas/me/is_active. /** /replicas/me/is_active.
*/ */
zkutil::EphemeralNodeHolderPtr replica_is_active_node; zkutil::EphemeralNodeHolderPtr replica_is_active_node;
@ -574,18 +579,27 @@ private:
/** Creates the minimum set of nodes in ZooKeeper and create first replica. /** Creates the minimum set of nodes in ZooKeeper and create first replica.
* Returns true if was created, false if exists. * Returns true if was created, false if exists.
*/ */
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
bool createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const;
/** /**
* Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
*/ */
void createReplica(const StorageMetadataPtr & metadata_snapshot); void createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
void createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const;
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/ */
void createNewZooKeeperNodes(); void createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
void createNewZooKeeperNodesAttempt() const;
bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check = true); /// Returns the ZooKeeper retries info specified for the CREATE TABLE query which is creating and starting this table right now.
ZooKeeperRetriesInfo getCreateQueryZooKeeperRetriesInfo() const;
void clearCreateQueryZooKeeperRetriesInfo();
bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check,
const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
bool checkTableStructureAttempt(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const;
/// A part of ALTER: apply metadata changes only (data parts are altered separately). /// A part of ALTER: apply metadata changes only (data parts are altered separately).
/// Must be called under IStorage::lockForAlter() lock. /// Must be called under IStorage::lockForAlter() lock.
@ -604,7 +618,7 @@ private:
/// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
/// to be used for deduplication. /// to be used for deduplication.
void syncPinnedPartUUIDs(); void syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info);
/** Check that the part's checksum is the same as the checksum of the same part on some other replica. /** Check that the part's checksum is the same as the checksum of the same part on some other replica.
* If no one has such a part, nothing checks. * If no one has such a part, nothing checks.
@ -707,7 +721,7 @@ private:
/// Start being leader (if not disabled by setting). /// Start being leader (if not disabled by setting).
/// Since multi-leaders are allowed, it just sets is_leader flag. /// Since multi-leaders are allowed, it just sets is_leader flag.
void startBeingLeader(); void startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info);
void stopBeingLeader(); void stopBeingLeader();
/** Selects the parts to merge and writes to the log. /** Selects the parts to merge and writes to the log.
@ -927,7 +941,7 @@ private:
/// Check granularity of already existing replicated table in zookeeper if it exists /// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed /// return true if it's fixed
bool checkFixedGranularityInZookeeper(); bool checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
/// Wait for timeout seconds mutation is finished on replicas /// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas( void waitMutationToFinishOnReplicas(
@ -965,7 +979,8 @@ private:
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override; void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
// Create table id if needed // Create table id if needed
void createTableSharedID() const; void createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
void createTableSharedIDAttempt() const;
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
void watchZeroCopyLock(const String & part_name, const DiskPtr & disk); void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);
@ -981,7 +996,7 @@ private:
/// Or if node actually disappeared. /// Or if node actually disappeared.
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override; bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread); void startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info);
std::vector<String> getZookeeperZeroCopyLockPaths() const; std::vector<String> getZookeeperZeroCopyLockPaths() const;
static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper, static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper,

View File

@ -518,7 +518,8 @@ Chunk SystemZooKeeperSource::generate()
ZooKeeperRetriesInfo retries_seetings( ZooKeeperRetriesInfo retries_seetings(
settings[Setting::insert_keeper_max_retries], settings[Setting::insert_keeper_max_retries],
settings[Setting::insert_keeper_retry_initial_backoff_ms], settings[Setting::insert_keeper_retry_initial_backoff_ms],
settings[Setting::insert_keeper_retry_max_backoff_ms]); settings[Setting::insert_keeper_retry_max_backoff_ms],
query_status);
/// Handles reconnects when needed /// Handles reconnects when needed
auto get_zookeeper = [&] () auto get_zookeeper = [&] ()
@ -586,7 +587,7 @@ Chunk SystemZooKeeperSource::generate()
} }
zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses; zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop( ZooKeeperRetriesControl("", nullptr, retries_seetings).retryLoop(
[&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); }); [&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); });
struct GetTask struct GetTask
@ -632,7 +633,7 @@ Chunk SystemZooKeeperSource::generate()
} }
zkutil::ZooKeeper::MultiTryGetResponse get_responses; zkutil::ZooKeeper::MultiTryGetResponse get_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop( ZooKeeperRetriesControl("", nullptr, retries_seetings).retryLoop(
[&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); }); [&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); });
/// Add children count to query total rows. We can not get total rows in advance, /// Add children count to query total rows. We can not get total rows in advance,