mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #58285 from canhld94/fix_a_test
Hot reload storage policy for distributed tables when adding a new disk
This commit is contained in:
commit
02a3049876
@ -1986,9 +1986,18 @@ void registerStorageDistributed(StorageFactory & factory)
|
||||
|
||||
bool StorageDistributed::initializeDiskOnConfigChange(const std::set<String> & new_added_disks)
|
||||
{
|
||||
if (!data_volume)
|
||||
if (!storage_policy || !data_volume)
|
||||
return true;
|
||||
|
||||
auto new_storage_policy = getContext()->getStoragePolicy(storage_policy->getName());
|
||||
auto new_data_volume = new_storage_policy->getVolume(0);
|
||||
if (new_storage_policy->getVolumes().size() > 1)
|
||||
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
|
||||
"Only {} volume will be used to store data. Other will be ignored.", data_volume->getName());
|
||||
|
||||
std::atomic_store(&storage_policy, new_storage_policy);
|
||||
std::atomic_store(&data_volume, new_data_volume);
|
||||
|
||||
for (auto & disk : data_volume->getDisks())
|
||||
{
|
||||
if (new_added_disks.contains(disk->getName()))
|
||||
|
@ -4,18 +4,25 @@
|
||||
<disk0>
|
||||
<path>/var/lib/clickhouse/disk0/</path>
|
||||
</disk0>
|
||||
<disk1>
|
||||
<path>/var/lib/clickhouse/disk1/</path>
|
||||
</disk1>
|
||||
</disks>
|
||||
<policies>
|
||||
<default_policy>
|
||||
<volumes>
|
||||
<default_volume>
|
||||
<volume0>
|
||||
<disk>disk0</disk>
|
||||
</default_volume>
|
||||
</volume0>
|
||||
</volumes>
|
||||
</default_policy>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -10,11 +10,8 @@ from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node0 = cluster.add_instance(
|
||||
"node0", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
|
||||
)
|
||||
node1 = cluster.add_instance(
|
||||
"node1", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
|
||||
node = cluster.add_instance(
|
||||
"node", main_configs=["configs/config.d/storage_configuration.xml"], stay_alive=True
|
||||
)
|
||||
|
||||
|
||||
@ -28,6 +25,37 @@ def started_cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
old_disk_config = """
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<disk0>
|
||||
<path>/var/lib/clickhouse/disk0/</path>
|
||||
</disk0>
|
||||
</disks>
|
||||
<policies>
|
||||
<default_policy>
|
||||
<volumes>
|
||||
<volume0>
|
||||
<disk>disk0</disk>
|
||||
</volume0>
|
||||
</volumes>
|
||||
</default_policy>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
||||
"""
|
||||
|
||||
new_disk_config = """
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
@ -38,49 +66,120 @@ new_disk_config = """
|
||||
<disk1>
|
||||
<path>/var/lib/clickhouse/disk1/</path>
|
||||
</disk1>
|
||||
<disk2>
|
||||
<path>/var/lib/clickhouse/disk2/</path>
|
||||
</disk2>
|
||||
</disks>
|
||||
<policies>
|
||||
<default_policy>
|
||||
<volumes>
|
||||
<default_volume>
|
||||
<disk>disk2</disk>
|
||||
<volume1>
|
||||
<disk>disk1</disk>
|
||||
</volume1>
|
||||
<volume0>
|
||||
<disk>disk0</disk>
|
||||
</default_volume>
|
||||
</volume0>
|
||||
</volumes>
|
||||
</default_policy>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
||||
"""
|
||||
|
||||
|
||||
def set_config(node, config):
|
||||
node.replace_config(
|
||||
"/etc/clickhouse-server/config.d/storage_configuration.xml", config
|
||||
)
|
||||
node.replace_config("/etc/clickhouse-server/config.d/config.xml", config)
|
||||
node.query("SYSTEM RELOAD CONFIG")
|
||||
# to give ClickHouse time to refresh disks
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def test_hot_reload_policy(started_cluster):
|
||||
node0.query(
|
||||
"CREATE TABLE t (d Int32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/t', '0') PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
|
||||
node.query(
|
||||
"CREATE TABLE t (d Int32, s String) ENGINE = MergeTree() PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
|
||||
)
|
||||
node0.query("INSERT INTO TABLE t VALUES (1, 'foo') (1, 'bar')")
|
||||
node.query("SYSTEM STOP MERGES t")
|
||||
node.query("INSERT INTO TABLE t VALUES (1, 'foo')")
|
||||
|
||||
node1.query(
|
||||
"CREATE TABLE t (d Int32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/t_mirror', '1') PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
|
||||
set_config(node, new_disk_config)
|
||||
|
||||
# After reloading new policy with new disk, merge tree tables should reinitialize the new disk (create relative path, 'detached' folder...)
|
||||
# and as default policy is `least_used`, at least one insertion should come to the new disk
|
||||
node.query("INSERT INTO TABLE t VALUES (1, 'foo')")
|
||||
node.query("INSERT INTO TABLE t VALUES (1, 'bar')")
|
||||
|
||||
num_disks = int(
|
||||
node.query(
|
||||
"SELECT uniqExact(disk_name) FROM system.parts WHERE database = 'default' AND table = 't'"
|
||||
)
|
||||
)
|
||||
set_config(node1, new_disk_config)
|
||||
time.sleep(1)
|
||||
node1.query("ALTER TABLE t FETCH PARTITION 1 FROM '/clickhouse/tables/t'")
|
||||
result = int(node1.query("SELECT count() FROM t"))
|
||||
|
||||
assert (
|
||||
result == 4,
|
||||
"Node should have 2 x full data (4 rows) after reloading storage configuration and fetch new partition, but get {} rows".format(
|
||||
result
|
||||
),
|
||||
num_disks == 2
|
||||
), "Node should write data to 2 disks after reloading disks, but got {}".format(
|
||||
num_disks
|
||||
)
|
||||
|
||||
# If `detached` is not created this query will throw exception
|
||||
node.query("ALTER TABLE t DETACH PARTITION 1")
|
||||
|
||||
node.query("DROP TABLE t")
|
||||
|
||||
|
||||
def test_hot_reload_policy_distributed_table(started_cluster):
|
||||
# Same test for distributed table, it should reinitialize the storage policy and data volume
|
||||
# We check it by trying an insert and the distribution queue must be on new disk
|
||||
|
||||
# Restart node first
|
||||
set_config(node, old_disk_config)
|
||||
node.restart_clickhouse()
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE t (d Int32, s String) ENGINE = MergeTree PARTITION BY d ORDER BY tuple()"
|
||||
)
|
||||
node.query(
|
||||
"CREATE TABLE t_d (d Int32, s String) ENGINE = Distributed('default', 'default', 't', d%20, 'default_policy')"
|
||||
)
|
||||
|
||||
node.query("SYSTEM STOP DISTRIBUTED SENDS t_d")
|
||||
node.query(
|
||||
"INSERT INTO TABLE t_d SETTINGS prefer_localhost_replica = 0 VALUES (2, 'bar') (12, 'bar')"
|
||||
)
|
||||
# t_d should create queue on disk0
|
||||
queue_path = node.query("SELECT data_path FROM system.distribution_queue")
|
||||
|
||||
assert (
|
||||
"disk0" in queue_path
|
||||
), "Distributed table should create distributed queue on disk0 (disk1), but the queue path is {}".format(
|
||||
queue_path
|
||||
)
|
||||
|
||||
node.query("SYSTEM START DISTRIBUTED SENDS t_d")
|
||||
|
||||
node.query("SYSTEM FLUSH DISTRIBUTED t_d")
|
||||
|
||||
set_config(node, new_disk_config)
|
||||
|
||||
node.query("SYSTEM STOP DISTRIBUTED SENDS t_d")
|
||||
node.query(
|
||||
"INSERT INTO TABLE t_d SETTINGS prefer_localhost_replica = 0 VALUES (2, 'bar') (12, 'bar')"
|
||||
)
|
||||
|
||||
# t_d should create queue on disk1
|
||||
queue_path = node.query("SELECT data_path FROM system.distribution_queue")
|
||||
|
||||
assert (
|
||||
"disk1" in queue_path
|
||||
), "Distributed table should be using new disk (disk1), but the queue paths are {}".format(
|
||||
queue_path
|
||||
)
|
||||
|
||||
node.query("DROP TABLE t")
|
||||
node.query("DROP TABLE t_d")
|
||||
|
Loading…
Reference in New Issue
Block a user