Use setting for retry period, create independent zookeeper sessions

This commit is contained in:
Antonio Andelic 2022-08-16 11:20:58 +00:00
parent 9ac4ca265a
commit b0b9f96837
5 changed files with 30 additions and 17 deletions

View File

@ -2155,6 +2155,21 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr
#endif
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeperConnection(const String & name) const
{
if (name.find(':') != std::string::npos || name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid auxiliary ZooKeeper name {}: ':' and '/' are not allowed", name);
const auto & config = shared->auxiliary_zookeepers_config ? *shared->auxiliary_zookeepers_config : getConfigRef();
if (!config.has("auxiliary_zookeepers." + name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section <auxiliary_zookeepers> in "
"config.xml",
name);
return std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog());
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
@ -2163,19 +2178,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
auto zookeeper = shared->auxiliary_zookeepers.find(name);
if (zookeeper == shared->auxiliary_zookeepers.end())
{
if (name.find(':') != std::string::npos || name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid auxiliary ZooKeeper name {}: ':' and '/' are not allowed", name);
const auto & config = shared->auxiliary_zookeepers_config ? *shared->auxiliary_zookeepers_config : getConfigRef();
if (!config.has("auxiliary_zookeepers." + name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section <auxiliary_zookeepers> in "
"config.xml",
name);
zookeeper = shared->auxiliary_zookeepers.emplace(name,
std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first;
auto new_connection = getAuxiliaryZooKeeperConnection(name);
zookeeper = shared->auxiliary_zookeepers.emplace(name, std::move(new_connection)).first;
}
else if (zookeeper->second->expired())
zookeeper->second = zookeeper->second->startNewSession();

View File

@ -739,11 +739,14 @@ public:
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
// create a new zookeeper connection from auxiliary_zookeepers configuration entry
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeperConnection(const String & name) const;
/// Try to connect to Keeper using get(Auxiliary)ZooKeeper. Useful for
/// internal Keeper start (check connection to some other node). Return true

View File

@ -95,6 +95,7 @@ struct Settings;
M(Seconds, replicated_fetches_http_receive_timeout, 0, "HTTP receive timeout for fetch part requests. Inherited from default profile `http_receive_timeout` if not set explicitly.", 0) \
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
M(Seconds, initialization_retry_period, 10, "Retry period for table initialization, in seconds.", 0) \
M(Bool, detach_old_local_parts_when_cloning_replica, true, "Do not remove old local parts when repairing lost replica.", 0) \
M(Bool, detach_not_byte_identical_parts, false, "Do not remove non byte-idential parts for ReplicatedMergeTree, instead detach them (maybe useful for further analysis).", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \

View File

@ -15,6 +15,8 @@ ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicat
, log(&Poco::Logger::get(log_name))
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this] { run(); });
const auto storage_settings = storage.getSettings();
retry_period = storage_settings->initialization_retry_period.totalSeconds();
}
void ReplicatedMergeTreeAttachThread::shutdown()
@ -166,17 +168,18 @@ void ReplicatedMergeTreeAttachThread::tryReconnect()
{
try
{
const auto context = storage.getContext();
if (storage.zookeeper_name == storage.default_zookeeper_name)
zookeeper = storage.getContext()->getZooKeeper();
zookeeper = std::make_shared<zkutil::ZooKeeper>(context->getConfigRef(), "zookeeper", context->getZooKeeperLog());
else
zookeeper = storage.getContext()->getAuxiliaryZooKeeper(storage.zookeeper_name);
zookeeper = context->getAuxiliaryZooKeeperConnection(storage.zookeeper_name);
return;
}
catch (...)
{
notifyIfFirstTry();
LOG_WARNING(log, "Will try to reconnect to ZooKeeper in 10 seconds");
std::this_thread::sleep_for(std::chrono::seconds(10));
LOG_WARNING(log, "Will try to reconnect to ZooKeeper in {} seconds", retry_period);
std::this_thread::sleep_for(std::chrono::seconds(retry_period));
}
}
}

View File

@ -42,6 +42,8 @@ private:
zkutil::ZooKeeperPtr zookeeper;
UInt64 retry_period;
void run();
void tryReconnect();