mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #21264 from ClickHouse/fix_zookeeper_update
Fix several bugs with ZooKeeper client
This commit is contained in:
commit
b8fba768e5
@ -18,7 +18,8 @@ RUN apt-get update \
|
||||
curl \
|
||||
tar \
|
||||
krb5-user \
|
||||
iproute2
|
||||
iproute2 \
|
||||
lsof
|
||||
RUN rm -rf \
|
||||
/var/lib/apt/lists/* \
|
||||
/var/cache/debconf \
|
||||
|
@ -391,6 +391,9 @@ public:
|
||||
virtual void multi(
|
||||
const Requests & requests,
|
||||
MultiCallback callback) = 0;
|
||||
|
||||
/// Expire session and finish all pending requests
|
||||
virtual void finalize() = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
|
||||
*
|
||||
* NOTE: You can add various failure modes for better testing.
|
||||
*/
|
||||
class TestKeeper : public IKeeper
|
||||
class TestKeeper final : public IKeeper
|
||||
{
|
||||
public:
|
||||
TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_);
|
||||
@ -83,6 +83,7 @@ public:
|
||||
const Requests & requests,
|
||||
MultiCallback callback) override;
|
||||
|
||||
void finalize() override;
|
||||
|
||||
struct Node
|
||||
{
|
||||
@ -130,7 +131,6 @@ private:
|
||||
|
||||
void pushRequest(RequestInfo && request);
|
||||
|
||||
void finalize();
|
||||
|
||||
ThreadFromGlobalPool processing_thread;
|
||||
|
||||
|
@ -44,7 +44,7 @@ static void check(Coordination::Error code, const std::string & path)
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
|
||||
void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
|
||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
|
||||
{
|
||||
log = &Poco::Logger::get("ZooKeeper");
|
||||
@ -60,13 +60,16 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
|
||||
if (hosts.empty())
|
||||
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::Error::ZBADARGUMENTS);
|
||||
|
||||
std::vector<std::string> hosts_strings;
|
||||
splitInto<','>(hosts_strings, hosts);
|
||||
Coordination::ZooKeeper::Nodes nodes;
|
||||
nodes.reserve(hosts_strings.size());
|
||||
nodes.reserve(hosts.size());
|
||||
|
||||
Strings shuffled_hosts = hosts;
|
||||
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
|
||||
pcg64 generator(randomSeed());
|
||||
std::shuffle(shuffled_hosts.begin(), shuffled_hosts.end(), generator);
|
||||
|
||||
bool dns_error = false;
|
||||
for (auto & host_string : hosts_strings)
|
||||
for (auto & host_string : shuffled_hosts)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -109,9 +112,9 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
|
||||
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
||||
|
||||
if (chroot.empty())
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", hosts);
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
|
||||
else
|
||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", hosts, chroot);
|
||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(hosts, ","), chroot);
|
||||
}
|
||||
else if (implementation == "testkeeper")
|
||||
{
|
||||
@ -128,7 +131,16 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
|
||||
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
|
||||
{
|
||||
Strings hosts_strings;
|
||||
splitInto<','>(hosts_strings, hosts_string);
|
||||
|
||||
init(implementation_, hosts_strings, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_)
|
||||
{
|
||||
init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
|
||||
@ -141,8 +153,6 @@ struct ZooKeeperArgs
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
std::vector<std::string> hosts_strings;
|
||||
|
||||
session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
|
||||
operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
|
||||
implementation = "zookeeper";
|
||||
@ -150,7 +160,7 @@ struct ZooKeeperArgs
|
||||
{
|
||||
if (startsWith(key, "node"))
|
||||
{
|
||||
hosts_strings.push_back(
|
||||
hosts.push_back(
|
||||
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "") +
|
||||
config.getString(config_name + "." + key + ".host") + ":"
|
||||
+ config.getString(config_name + "." + key + ".port", "2181")
|
||||
@ -180,17 +190,6 @@ struct ZooKeeperArgs
|
||||
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
|
||||
}
|
||||
|
||||
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
|
||||
pcg64 generator(randomSeed());
|
||||
std::shuffle(hosts_strings.begin(), hosts_strings.end(), generator);
|
||||
|
||||
for (auto & host : hosts_strings)
|
||||
{
|
||||
if (!hosts.empty())
|
||||
hosts += ',';
|
||||
hosts += host;
|
||||
}
|
||||
|
||||
if (!chroot.empty())
|
||||
{
|
||||
if (chroot.front() != '/')
|
||||
@ -200,7 +199,7 @@ struct ZooKeeperArgs
|
||||
}
|
||||
}
|
||||
|
||||
std::string hosts;
|
||||
Strings hosts;
|
||||
std::string identity;
|
||||
int session_timeout_ms;
|
||||
int operation_timeout_ms;
|
||||
@ -922,6 +921,10 @@ Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & re
|
||||
}
|
||||
}
|
||||
|
||||
void ZooKeeper::finalize()
|
||||
{
|
||||
impl->finalize();
|
||||
}
|
||||
|
||||
size_t KeeperMultiException::getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
|
||||
{
|
||||
@ -1000,4 +1003,5 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
|
||||
request->version = version;
|
||||
return request;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -50,7 +50,14 @@ class ZooKeeper
|
||||
public:
|
||||
using Ptr = std::shared_ptr<ZooKeeper>;
|
||||
|
||||
ZooKeeper(const std::string & hosts_, const std::string & identity_ = "",
|
||||
/// hosts_string -- comma separated [secure://]host:port list
|
||||
ZooKeeper(const std::string & hosts_string, const std::string & identity_ = "",
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
const std::string & implementation_ = "zookeeper");
|
||||
|
||||
ZooKeeper(const Strings & hosts_, const std::string & identity_ = "",
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
@ -247,10 +254,12 @@ public:
|
||||
/// Like the previous one but don't throw any exceptions on future.get()
|
||||
FutureMulti tryAsyncMulti(const Coordination::Requests & ops);
|
||||
|
||||
void finalize();
|
||||
|
||||
private:
|
||||
friend class EphemeralNodeHolder;
|
||||
|
||||
void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
|
||||
void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
|
||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
|
||||
|
||||
/// The following methods don't throw exceptions but return error codes.
|
||||
@ -266,7 +275,7 @@ private:
|
||||
|
||||
std::unique_ptr<Coordination::IKeeper> impl;
|
||||
|
||||
std::string hosts;
|
||||
Strings hosts;
|
||||
std::string identity;
|
||||
int32_t session_timeout_ms;
|
||||
int32_t operation_timeout_ms;
|
||||
|
@ -88,7 +88,7 @@ using namespace DB;
|
||||
|
||||
/** Usage scenario: look at the documentation for IKeeper class.
|
||||
*/
|
||||
class ZooKeeper : public IKeeper
|
||||
class ZooKeeper final : public IKeeper
|
||||
{
|
||||
public:
|
||||
struct Node
|
||||
@ -167,6 +167,20 @@ public:
|
||||
const Requests & requests,
|
||||
MultiCallback callback) override;
|
||||
|
||||
/// Without forcefully invalidating (finalizing) ZooKeeper session before
|
||||
/// establishing a new one, there was a possibility that server is using
|
||||
/// two ZooKeeper sessions simultaneously in different parts of code.
|
||||
/// This is strong antipattern and we always prevented it.
|
||||
|
||||
/// ZooKeeper is linearizeable for writes, but not linearizeable for
|
||||
/// reads, it only maintains "sequential consistency": in every session
|
||||
/// you observe all events in order but possibly with some delay. If you
|
||||
/// perform write in one session, then notify different part of code and
|
||||
/// it will do read in another session, that read may not see the
|
||||
/// already performed write.
|
||||
|
||||
void finalize() override { finalize(false, false); }
|
||||
|
||||
private:
|
||||
String root_path;
|
||||
ACLs default_acls;
|
||||
|
@ -1661,7 +1661,12 @@ void Context::resetZooKeeper() const
|
||||
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk)
|
||||
{
|
||||
if (!zk || zk->configChanged(*config, config_name))
|
||||
{
|
||||
if (zk)
|
||||
zk->finalize();
|
||||
|
||||
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name);
|
||||
}
|
||||
}
|
||||
|
||||
void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
|
||||
|
@ -144,6 +144,12 @@ static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
|
||||
|
||||
void StorageReplicatedMergeTree::setZooKeeper()
|
||||
{
|
||||
/// Every ReplicatedMergeTree table is using only one ZooKeeper session.
|
||||
/// But if several ReplicatedMergeTree tables are using different
|
||||
/// ZooKeeper sessions, some queries like ATTACH PARTITION FROM may have
|
||||
/// strange effects. So we always use only one session for all tables.
|
||||
/// (excluding auxiliary zookeepers)
|
||||
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
{
|
||||
|
@ -74,6 +74,9 @@ def test_reload_zookeeper(start_cluster):
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT COUNT() FROM test_table", settings={"select_sequential_consistency" : 1})
|
||||
|
||||
def get_active_zk_connections():
|
||||
return str(node.exec_in_container(['bash', '-c', 'lsof -a -i4 -i6 -itcp -w | grep 2181 | grep ESTABLISHED | wc -l'], privileged=True, user='root')).strip()
|
||||
|
||||
## set config to zoo2, server will be normal
|
||||
new_config = """
|
||||
<yandex>
|
||||
@ -89,5 +92,10 @@ def test_reload_zookeeper(start_cluster):
|
||||
node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
|
||||
node.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
active_zk_connections = get_active_zk_connections()
|
||||
assert active_zk_connections == '1', "Total connections to ZooKeeper not equal to 1, {}".format(active_zk_connections)
|
||||
|
||||
assert_eq_with_retry(node, "SELECT COUNT() FROM test_table", '1000', retry_count=120, sleep_time=0.5)
|
||||
|
||||
active_zk_connections = get_active_zk_connections()
|
||||
assert active_zk_connections == '1', "Total connections to ZooKeeper not equal to 1, {}".format(active_zk_connections)
|
||||
|
Loading…
Reference in New Issue
Block a user