Merge pull request #14678 from sundy-li/patch-2

dynamic zookeeper config when session expired
This commit is contained in:
alexey-milovidov 2020-09-17 17:05:22 +03:00 committed by GitHub
commit 496df5b3e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 232 additions and 1 deletions

View File

@ -538,6 +538,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config->has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));
if (config->has("zookeeper"))
global_context->reloadZooKeeperIfChanged(config);
global_context->updateStorageConfiguration(*config);
},
/* already_loaded = */ true);

View File

@ -200,6 +200,18 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
}
bool ZooKeeper::configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const
{
ZooKeeperArgs args(config, config_name);
// skip reload testkeeper cause it's for test and data in memory
if (args.implementation == implementation && implementation == "testkeeper")
return false;
return std::tie(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot)
!= std::tie(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
}
static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
{

View File

@ -56,7 +56,7 @@ public:
int32_t session_timeout_ms_ = DEFAULT_SESSION_TIMEOUT,
int32_t operation_timeout_ms_ = DEFAULT_OPERATION_TIMEOUT,
const std::string & chroot_ = "",
const std::string & implementation = "zookeeper");
const std::string & implementation_ = "zookeeper");
/** Config of the form:
<zookeeper>
@ -87,6 +87,8 @@ public:
/// This object remains unchanged, and the new session is returned.
Ptr startNewSession() const;
bool configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const;
/// Returns true, if the session has expired.
bool expired();

View File

@ -1498,6 +1498,15 @@ void Context::resetZooKeeper() const
shared->zookeeper.reset();
}
void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
{
std::lock_guard lock(shared->zookeeper_mutex);
if (!shared->zookeeper || shared->zookeeper->configChanged(*config, "zookeeper"))
{
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper");
}
}
bool Context::hasZooKeeper() const
{
return getConfigRef().has("zookeeper");

View File

@ -476,6 +476,8 @@ public:
bool hasZooKeeper() const;
/// Reset current zookeeper session. Do not create a new one.
void resetZooKeeper() const;
// Reload Zookeeper
void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const;
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(size_t max_size_in_bytes);

View File

@ -763,6 +763,16 @@ class ClickHouseCluster:
def add_zookeeper_startup_command(self, command):
self.pre_zookeeper_commands.append(command)
def stop_zookeeper_nodes(self, zk_nodes):
for n in zk_nodes:
logging.info("Stopping zookeeper node: %s", n)
subprocess_check_call(self.base_zookeeper_cmd + ["stop", n])
def start_zookeeper_nodes(self, zk_nodes):
for n in zk_nodes:
logging.info("Starting zookeeper node: %s", n)
subprocess_check_call(self.base_zookeeper_cmd + ["start", n])
CLICKHOUSE_START_COMMAND = "clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log"

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<max_table_size_to_drop>1</max_table_size_to_drop>
<max_partition_size_to_drop>1</max_partition_size_to_drop>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,19 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,120 @@
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper.xml')
node = cluster.add_instance('node', with_zookeeper=True)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ZK_CONFIG_PATH = os.path.join(SCRIPT_DIR, 'configs/zookeeper.xml')
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
node.query(
'''
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard1/test/test_table', '1')
PARTITION BY toYYYYMM(date)
ORDER BY id
''')
yield cluster
finally:
## write back the configs
config = open(ZK_CONFIG_PATH, 'w')
config.write(
"""
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>
""")
config.close()
cluster.shutdown()
def test_reload_zookeeper(start_cluster):
def wait_zookeeper_node_to_start(zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = start_cluster.get_kazoo_client(instance)
conn.get_children('/')
print("All instances of ZooKeeper started")
return
except Exception as ex:
print("Can't connect to ZooKeeper " + str(ex))
time.sleep(0.5)
node.query("INSERT INTO test_table(date, id) select today(), number FROM numbers(1000)")
## remove zoo2, zoo3 from configs
config = open(ZK_CONFIG_PATH, 'w')
config.write(
"""
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex >
"""
)
config.close()
## config reloads, but can still work
assert_eq_with_retry(node, "SELECT COUNT() FROM test_table", '1000', retry_count=120, sleep_time=0.5)
## stop all zookeepers, table will be readonly
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM test_table")
## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
cluster.start_zookeeper_nodes(["zoo2", "zoo3"])
wait_zookeeper_node_to_start(["zoo2", "zoo3"])
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM test_table")
## set config to zoo2, server will be normal
config = open(ZK_CONFIG_PATH, 'w')
config.write(
"""
<yandex>
<zookeeper>
<node index="1">
<host>zoo2</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>
"""
)
config.close()
assert_eq_with_retry(node, "SELECT COUNT() FROM test_table", '1000', retry_count=120, sleep_time=0.5)