Merge pull request #17070 from fastio/master

Support multiple ZooKeeper clusters
This commit is contained in:
alexey-milovidov 2020-11-27 10:38:01 +03:00 committed by GitHub
commit dfae1efbbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 221 additions and 19 deletions

View File

@ -53,6 +53,42 @@ Example of setting the addresses of the ZooKeeper cluster:
</zookeeper>
```
ClickHouse also supports to store replicas meta information in the auxiliary ZooKeeper cluster by providing ZooKeeper cluster name and path as engine arguments.
In other word, it supports to store the metadata of differnt tables in different ZooKeeper clusters.
Example of setting the addresses of the auxiliary ZooKeeper cluster:
``` xml
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>example_2_1</host>
<port>2181</port>
</node>
<node index="2">
<host>example_2_2</host>
<port>2181</port>
</node>
<node index="3">
<host>example_2_3</host>
<port>2181</port>
</node>
</zookeeper2>
<zookeeper3>
<node index="1">
<host>example_3_1</host>
<port>2181</port>
</node>
</zookeeper3>
</auxiliary_zookeepers>
```
To store table datameta in a auxiliary ZooKeeper cluster instead of default ZooKeeper cluster, we can use the SQL to create table with
ReplicatedMergeTree engine as follow:
```
CREATE TABLE table_name ( ... ) ENGINE = ReplicatedMergeTree('zookeeper_name_configurated_in_auxiliary_zookeepers:path', 'replica_name') ...
```
You can specify any existing ZooKeeper cluster and the system will use a directory on it for its own data (the directory is specified when creating a replicatable table).
If ZooKeeper isnt set in the config file, you cant create replicated tables, and any existing replicated tables will be read-only.

View File

@ -1572,6 +1572,10 @@ bool Context::hasZooKeeper() const
return getConfigRef().has("zookeeper");
}
bool Context::hasAuxiliaryZooKeeper(const String & name) const
{
return getConfigRef().has("auxiliary_zookeepers." + name);
}
void Context::setInterserverIOAddress(const String & host, UInt16 port)
{

View File

@ -499,6 +499,8 @@ public:
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);
/// Has ready or expired ZooKeeper
bool hasZooKeeper() const;
/// Has ready or expired auxiliary ZooKeeper
bool hasAuxiliaryZooKeeper(const String & name) const;
/// Reset current zookeeper session. Do not create a new one.
void resetZooKeeper() const;
// Reload Zookeeper

View File

@ -83,7 +83,7 @@ void ReplicatedMergeTreeRestartingThread::run()
{
try
{
storage.setZooKeeper(storage.global_context.getZooKeeper());
storage.setZooKeeper();
}
catch (const Coordination::Exception &)
{

View File

@ -139,10 +139,17 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
void StorageReplicatedMergeTree::setZooKeeper()
{
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = zookeeper;
if (zookeeper_name == default_zookeeper_name)
{
current_zookeeper = global_context.getZooKeeper();
}
else
{
current_zookeeper = global_context.getAuxiliaryZooKeeper(zookeeper_name);
}
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
@ -159,7 +166,6 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
return res;
}
static std::string normalizeZooKeeperPath(std::string zookeeper_path)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
@ -171,6 +177,33 @@ static std::string normalizeZooKeeperPath(std::string zookeeper_path)
return zookeeper_path;
}
static String extractZooKeeperName(const String & path)
{
if (path.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto pos = path.find(':');
if (pos != String::npos)
{
auto zookeeper_name = path.substr(0, pos);
if (zookeeper_name.empty())
throw Exception("Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return zookeeper_name;
}
static constexpr auto default_zookeeper_name = "default";
return default_zookeeper_name;
}
static String extractZooKeeperPath(const String & path)
{
if (path.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto pos = path.find(':');
if (pos != String::npos)
{
return normalizeZooKeeperPath(path.substr(pos + 1, String::npos));
}
return normalizeZooKeeperPath(path);
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
@ -195,9 +228,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_path(normalizeZooKeeperPath(zookeeper_path_))
, zookeeper_name(extractZooKeeperName(zookeeper_path_))
, zookeeper_path(extractZooKeeperPath(zookeeper_path_))
, replica_name(replica_name_)
, replica_path(zookeeper_path + "/replicas/" + replica_name)
, replica_path(zookeeper_path + "/replicas/" + replica_name_)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
@ -227,7 +261,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
mutations_finalizing_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
if (global_context.hasZooKeeper())
if (global_context.hasZooKeeper() || global_context.hasAuxiliaryZooKeeper(zookeeper_name))
{
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
/// be reached. In such cases Poco::Exception is thrown after a connection
@ -244,7 +278,14 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// to be manually deleted before retrying the CreateQuery.
try
{
current_zookeeper = global_context.getZooKeeper();
if (zookeeper_name == default_zookeeper_name)
{
current_zookeeper = global_context.getZooKeeper();
}
else
{
current_zookeeper = global_context.getAuxiliaryZooKeeper(zookeeper_name);
}
}
catch (...)
{
@ -4880,22 +4921,15 @@ void StorageReplicatedMergeTree::fetchPartition(
const String & from_,
const Context & query_context)
{
String from = from_;
String auxiliary_zookeeper_name = extractZooKeeperName(from_);
String from = extractZooKeeperPath(from_);
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
String partition_id = getPartitionIDFromQuery(partition, query_context);
zkutil::ZooKeeperPtr zookeeper;
if (from[0] != '/')
if (auxiliary_zookeeper_name != default_zookeeper_name)
{
auto delimiter = from.find(':');
if (delimiter == String::npos)
throw Exception("Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto auxiliary_zookeeper_name = from.substr(0, delimiter);
from = from.substr(delimiter + 1, String::npos);
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
zookeeper = global_context.getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);

View File

@ -237,13 +237,15 @@ private:
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper);
void setZooKeeper();
/// If true, the table is offline and can not be written to it.
std::atomic_bool is_readonly {false};
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
bool has_metadata_in_zookeeper = true;
static constexpr auto default_zookeeper_name = "default";
String zookeeper_name;
String zookeeper_path;
String replica_name;
String replica_path;

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,28 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
</zookeeper>
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
</zookeeper2>
</auxiliary_zookeepers>
</yandex>

View File

@ -0,0 +1,80 @@
import time
import helpers.client as client
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"], with_zookeeper=True)
node2 = cluster.add_instance("node2", main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
# Create table with default zookeeper.
def test_create_replicated_merge_tree_with_default_zookeeper(started_cluster):
drop_table([node1, node2], "test_default_zookeeper")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_default_zookeeper(a Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_default_zookeeper', '{replica}')
ORDER BY a;
'''.format(replica=node.name))
# Insert data into node1, and query it from node2.
node1.query("INSERT INTO test_default_zookeeper VALUES (1)")
time.sleep(5)
expected = "1\n"
assert TSV(node1.query("SELECT a FROM test_default_zookeeper")) == TSV(expected)
assert TSV(node2.query("SELECT a FROM test_default_zookeeper")) == TSV(expected)
# Create table with auxiliary zookeeper.
def test_create_replicated_merge_tree_with_auxiliary_zookeeper(started_cluster):
drop_table([node1, node2], "test_auxiliary_zookeeper")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_auxiliary_zookeeper(a Int32)
ENGINE = ReplicatedMergeTree('zookeeper2:/clickhouse/tables/test/test_auxiliary_zookeeper', '{replica}')
ORDER BY a;
'''.format(replica=node.name))
# Insert data into node1, and query it from node2.
node1.query("INSERT INTO test_auxiliary_zookeeper VALUES (1)")
time.sleep(5)
expected = "1\n"
assert TSV(node1.query("SELECT a FROM test_auxiliary_zookeeper")) == TSV(expected)
assert TSV(node2.query("SELECT a FROM test_auxiliary_zookeeper")) == TSV(expected)
# Create table with auxiliary zookeeper.
def test_create_replicated_merge_tree_with_not_exists_auxiliary_zookeeper(started_cluster):
drop_table([node1], "test_auxiliary_zookeeper")
with pytest.raises(QueryRuntimeException):
node1.query(
'''
CREATE TABLE test_auxiliary_zookeeper(a Int32)
ENGINE = ReplicatedMergeTree('zookeeper_not_exits:/clickhouse/tables/test/test_auxiliary_zookeeper', '{replica}')
ORDER BY a;
'''.format(replica=node1.name))