diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a8810ada1b0..11fcae7c53f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2155,6 +2155,21 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr #endif } +zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeperConnection(const String & name) const +{ + if (name.find(':') != std::string::npos || name.find('/') != std::string::npos) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid auxiliary ZooKeeper name {}: ':' and '/' are not allowed", name); + + const auto & config = shared->auxiliary_zookeepers_config ? *shared->auxiliary_zookeepers_config : getConfigRef(); + if (!config.has("auxiliary_zookeepers." + name)) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section in " + "config.xml", + name); + + return std::make_shared(config, "auxiliary_zookeepers." + name, getZooKeeperLog()); +} zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const { @@ -2163,19 +2178,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const auto zookeeper = shared->auxiliary_zookeepers.find(name); if (zookeeper == shared->auxiliary_zookeepers.end()) { - if (name.find(':') != std::string::npos || name.find('/') != std::string::npos) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid auxiliary ZooKeeper name {}: ':' and '/' are not allowed", name); - - const auto & config = shared->auxiliary_zookeepers_config ? *shared->auxiliary_zookeepers_config : getConfigRef(); - if (!config.has("auxiliary_zookeepers." + name)) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section in " - "config.xml", - name); - - zookeeper = shared->auxiliary_zookeepers.emplace(name, - std::make_shared(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first; + auto new_connection = getAuxiliaryZooKeeperConnection(name); + zookeeper = shared->auxiliary_zookeepers.emplace(name, std::move(new_connection)).first; } else if (zookeeper->second->expired()) zookeeper->second = zookeeper->second->startNewSession(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ea03b8e6586..c8189fccf90 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -739,11 +739,14 @@ public: ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; + /// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call. /// If no ZooKeeper configured, throws an exception. std::shared_ptr getZooKeeper() const; /// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry. std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; + // create a new zookeeper connection from auxiliary_zookeepers configuration entry + std::shared_ptr getAuxiliaryZooKeeperConnection(const String & name) const; /// Try to connect to Keeper using get(Auxiliary)ZooKeeper. Useful for /// internal Keeper start (check connection to some other node). Return true diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 89081fe924f..6dd02cc75d6 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -95,6 +95,7 @@ struct Settings; M(Seconds, replicated_fetches_http_receive_timeout, 0, "HTTP receive timeout for fetch part requests. Inherited from default profile `http_receive_timeout` if not set explicitly.", 0) \ M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \ M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \ + M(Seconds, initialization_retry_period, 10, "Retry period for table initialization, in seconds.", 0) \ M(Bool, detach_old_local_parts_when_cloning_replica, true, "Do not remove old local parts when repairing lost replica.", 0) \ M(Bool, detach_not_byte_identical_parts, false, "Do not remove non byte-idential parts for ReplicatedMergeTree, instead detach them (maybe useful for further analysis).", 0) \ M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 0b57087bbdb..1c49a3f0909 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -15,6 +15,8 @@ ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicat , log(&Poco::Logger::get(log_name)) { task = storage.getContext()->getSchedulePool().createTask(log_name, [this] { run(); }); + const auto storage_settings = storage.getSettings(); + retry_period = storage_settings->initialization_retry_period.totalSeconds(); } void ReplicatedMergeTreeAttachThread::shutdown() @@ -166,17 +168,18 @@ void ReplicatedMergeTreeAttachThread::tryReconnect() { try { + const auto context = storage.getContext(); if (storage.zookeeper_name == storage.default_zookeeper_name) - zookeeper = storage.getContext()->getZooKeeper(); + zookeeper = std::make_shared(context->getConfigRef(), "zookeeper", context->getZooKeeperLog()); else - zookeeper = storage.getContext()->getAuxiliaryZooKeeper(storage.zookeeper_name); + zookeeper = context->getAuxiliaryZooKeeperConnection(storage.zookeeper_name); return; } catch (...) { notifyIfFirstTry(); - LOG_WARNING(log, "Will try to reconnect to ZooKeeper in 10 seconds"); - std::this_thread::sleep_for(std::chrono::seconds(10)); + LOG_WARNING(log, "Will try to reconnect to ZooKeeper in {} seconds", retry_period); + std::this_thread::sleep_for(std::chrono::seconds(retry_period)); } } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index e85e011d396..0a93c0ba7cd 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -42,6 +42,8 @@ private: zkutil::ZooKeeperPtr zookeeper; + UInt64 retry_period; + void run(); void tryReconnect();