Refactor and add test.

This commit is contained in:
zhangxiao871 2021-10-21 15:46:34 +08:00
parent 8480ae631a
commit e6cf9605a5
10 changed files with 229 additions and 187 deletions

View File

@ -112,9 +112,6 @@ private:
private:
GetPriorityForLoadBalancing get_priority_load_balancing;
// std::vector<size_t> hostname_differences; /// Distances from name of this host to the names of hosts of pools.
// size_t last_used = 0; /// Last used for round_robin policy.
// LoadBalancing default_load_balancing;
};
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;

View File

@ -258,16 +258,8 @@ struct ZooKeeperArgs
const String & local_hostname = getFQDNOrHostName();
for (size_t i = 0; i < hosts.size(); ++i)
{
const String & ip_or_hostname = hosts[i].substr(0, hosts[i].find_last_of(':'));
try
{
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, Poco::Net::DNS::resolve(ip_or_hostname).name());
}
catch (...)
{
/// There may be HostNotFoundException or DNSException, these exceptions will be processed later.
LOG_ERROR(&Poco::Logger::get("ZooKeeperArgs"), "Cannot use ZooKeeper host {}, hostname differences will be set to the maximum value", hosts[i]);
}
const String & node_host = hosts[i].substr(0, hosts[i].find_last_of(':'));
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, node_host);
}
get_priority_load_balancing.pool_size = hosts.size();
}

View File

@ -0,0 +1,23 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>nod1</host>
<port>9000</port>
</replica>
<replica>
<host>nod2</host>
<port>9000</port>
</replica>
<replica>
<host>nod3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,19 @@
<clickhouse>
<zookeeper>
<!--<zookeeper_load_balancing>random / in_order / nearest_hostname / first_or_random / round_robin</zookeeper_load_balancing>-->
<zookeeper_load_balancing>first_or_random</zookeeper_load_balancing>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,19 @@
<clickhouse>
<zookeeper>
<!--<zookeeper_load_balancing>random / in_order / nearest_hostname / first_or_random / round_robin</zookeeper_load_balancing>-->
<zookeeper_load_balancing>nearest_hostname</zookeeper_load_balancing>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -1,131 +0,0 @@
import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_in_order.xml')
node1 = cluster.add_instance('node1', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
node2 = cluster.add_instance('node2', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
node3 = cluster.add_instance('node3', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def wait_zookeeper_node_to_start(started_cluster, zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = started_cluster.get_kazoo_client(instance)
conn.get_children('/')
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_in_order(started_cluster):
zoo1_ip = started_cluster.get_instance_ip("zoo1")
for i, node in enumerate([node1, node3]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
time.sleep(5)
assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# def test_round_robin(started_cluster):
# new_config = """
# <clickhouse>
# <zookeeper>
# <zookeeper_load_balancing>round_robin</zookeeper_load_balancing>
# <node index="1">
# <host>zoo1</host>
# <port>2181</port>
# </node>
# <node index="2">
# <host>zoo2</host>
# <port>2181</port>
# </node>
# <node index="3">
# <host>zoo3</host>
# <port>2181</port>
# </node>
# <session_timeout_ms>3000</session_timeout_ms>
# </zookeeper>
# </clickhouse>
# """
# for i, node in enumerate([node1, node3]):
# node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
# node.query("SYSTEM RELOAD CONFIG")
#
# started_cluster.stop_zookeeper_nodes(["zoo1"])
# zoo2_ip = started_cluster.get_instance_ip("zoo2")
# for i, node in enumerate([node1, node3]):
# node.query('DROP TABLE IF EXISTS simple SYNC')
# node.query('''
# CREATE TABLE simple (date Date, id UInt32)
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
# '''.format(replica=node.name))
# assert '::ffff:' + str(zoo2_ip) + '\n' == node.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# ## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
# started_cluster.start_zookeeper_nodes(["zoo1"])
# wait_zookeeper_node_to_start(started_cluster, ["zoo1"])
#
#
# def test_nearest_hostname(started_cluster):
# new_config = """
# <clickhouse>
# <zookeeper>
# <zookeeper_load_balancing>nearest_hostname</zookeeper_load_balancing>
# <node index="1">
# <host>zoo1</host>
# <port>2181</port>
# </node>
# <node index="2">
# <host>zoo2</host>
# <port>2181</port>
# </node>
# <node index="3">
# <host>zoo3</host>
# <port>2181</port>
# </node>
# <session_timeout_ms>3000</session_timeout_ms>
# </zookeeper>
# </clickhouse>
# """
# for i, node in enumerate([node1, node3]):
# node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
# node.query("SYSTEM RELOAD CONFIG")
#
# zoo1_ip = started_cluster.get_instance_ip("zoo1")
# zoo2_ip = started_cluster.get_instance_ip("zoo2")
# zoo3_ip = started_cluster.get_instance_ip("zoo3")
#
# for i, node in enumerate([node1, node3]):
# node.query('DROP TABLE IF EXISTS simple SYNC')
# node.query('''
# CREATE TABLE simple (date Date, id UInt32)
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
# '''.format(replica=node.name))
#
# assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# assert '::ffff:' + str(zoo2_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# assert '::ffff:' + str(zoo3_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')

View File

@ -0,0 +1,53 @@
import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_first_or_random.xml')
node1 = cluster.add_instance('node1', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_first_or_random.xml", "configs/zookeeper_log.xml"])
node2 = cluster.add_instance('node2', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_first_or_random.xml", "configs/zookeeper_log.xml"])
node3 = cluster.add_instance('node3', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_first_or_random.xml", "configs/zookeeper_log.xml"])
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def wait_zookeeper_node_to_start(started_cluster, zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = started_cluster.get_kazoo_client(instance)
conn.get_children('/')
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_first_or_random(started_cluster):
wait_zookeeper_node_to_start(started_cluster, ["zoo1", "zoo2", "zoo3"])
time.sleep(2)
zoo1_ip = started_cluster.get_instance_ip("zoo1")
for i, node in enumerate([node1, node3]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
time.sleep(5)
assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')

View File

@ -0,0 +1,53 @@
import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_in_order.xml')
node1 = cluster.add_instance('node1', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
node2 = cluster.add_instance('node2', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
node3 = cluster.add_instance('node3', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_in_order.xml", "configs/zookeeper_log.xml"])
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def wait_zookeeper_node_to_start(started_cluster, zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = started_cluster.get_kazoo_client(instance)
conn.get_children('/')
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_in_order(started_cluster):
wait_zookeeper_node_to_start(started_cluster, ["zoo1", "zoo2", "zoo3"])
time.sleep(2)
zoo1_ip = started_cluster.get_instance_ip("zoo1")
for i, node in enumerate([node1, node3]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
time.sleep(5)
assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo1_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')

View File

@ -0,0 +1,56 @@
import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_nearest_hostname.xml')
node1 = cluster.add_instance('nod1', with_zookeeper=True,
main_configs=["configs/remote_servers_nearest_hostname.xml", "configs/zookeeper_config_nearest_hostname.xml", "configs/zookeeper_log.xml"])
node2 = cluster.add_instance('nod2', with_zookeeper=True,
main_configs=["configs/remote_servers_nearest_hostname.xml", "configs/zookeeper_config_nearest_hostname.xml", "configs/zookeeper_log.xml"])
node3 = cluster.add_instance('nod3', with_zookeeper=True,
main_configs=["configs/remote_servers_nearest_hostname.xml", "configs/zookeeper_config_nearest_hostname.xml", "configs/zookeeper_log.xml"])
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def wait_zookeeper_node_to_start(started_cluster, zk_nodes, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
for instance in zk_nodes:
conn = started_cluster.get_kazoo_client(instance)
conn.get_children('/')
print("All instances of ZooKeeper started")
return
except Exception as ex:
print(("Can't connect to ZooKeeper " + str(ex)))
time.sleep(0.5)
def test_nearest_hostname(started_cluster):
wait_zookeeper_node_to_start(started_cluster, ["zoo1", "zoo2", "zoo3"])
time.sleep(2)
zoo1_ip = started_cluster.get_instance_ip("zoo1")
zoo2_ip = started_cluster.get_instance_ip("zoo2")
zoo3_ip = started_cluster.get_instance_ip("zoo3")
for i, node in enumerate([node1, node3]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
time.sleep(5)
assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo2_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo3_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')

View File

@ -3,7 +3,7 @@ import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_in_order.xml')
cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_round_robin.xml')
node1 = cluster.add_instance('node1', with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_config_round_robin.xml", "configs/zookeeper_log.xml"])
@ -39,8 +39,9 @@ def wait_zookeeper_node_to_start(started_cluster, zk_nodes, timeout=60):
def test_round_robin(started_cluster):
wait_zookeeper_node_to_start(started_cluster, ["zoo1", "zoo2", "zoo3"])
started_cluster.stop_zookeeper_nodes(["zoo1"])
time.sleep(10)
zoo2_ip = started_cluster.get_instance_ip("zoo2")
for i, node in enumerate([node1, node3]):
node.query('DROP TABLE IF EXISTS simple SYNC')
@ -50,6 +51,7 @@ def test_round_robin(started_cluster):
'''.format(replica=node.name))
time.sleep(5)
print("zoo2", zoo2_ip)
assert '::ffff:' + str(zoo2_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo2_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
assert '::ffff:' + str(zoo2_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
@ -57,44 +59,3 @@ def test_round_robin(started_cluster):
## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1
started_cluster.start_zookeeper_nodes(["zoo1"])
wait_zookeeper_node_to_start(started_cluster, ["zoo1"])
# def test_nearest_hostname(started_cluster):
# new_config = """
# <clickhouse>
# <zookeeper>
# <zookeeper_load_balancing>nearest_hostname</zookeeper_load_balancing>
# <node index="1">
# <host>zoo1</host>
# <port>2181</port>
# </node>
# <node index="2">
# <host>zoo2</host>
# <port>2181</port>
# </node>
# <node index="3">
# <host>zoo3</host>
# <port>2181</port>
# </node>
# <session_timeout_ms>3000</session_timeout_ms>
# </zookeeper>
# </clickhouse>
# """
# for i, node in enumerate([node1, node3]):
# node.replace_config("/etc/clickhouse-server/conf.d/zookeeper.xml", new_config)
# node.query("SYSTEM RELOAD CONFIG")
#
# zoo1_ip = started_cluster.get_instance_ip("zoo1")
# zoo2_ip = started_cluster.get_instance_ip("zoo2")
# zoo3_ip = started_cluster.get_instance_ip("zoo3")
#
# for i, node in enumerate([node1, node3]):
# node.query('DROP TABLE IF EXISTS simple SYNC')
# node.query('''
# CREATE TABLE simple (date Date, id UInt32)
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
# '''.format(replica=node.name))
#
# assert '::ffff:' + str(zoo1_ip) + '\n' == node1.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# assert '::ffff:' + str(zoo2_ip) + '\n' == node2.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')
# assert '::ffff:' + str(zoo3_ip) + '\n' == node3.query('SELECT IPv6NumToString(address) FROM system.zookeeper_log ORDER BY event_time DESC LIMIT 1')