hot reload storage policy for StorageDistributed

also fix integration test

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-12-28 11:09:28 +00:00
parent a62f4ee278
commit 41fc29c1ea
3 changed files with 83 additions and 19 deletions

View File

@ -105,6 +105,7 @@
#include <Storages/BlockNumberColumn.h>
#include <atomic>
#include <memory>
#include <filesystem>
#include <optional>
@ -1936,9 +1937,18 @@ void registerStorageDistributed(StorageFactory & factory)
bool StorageDistributed::initializeDiskOnConfigChange(const std::set<String> & new_added_disks)
{
if (!data_volume)
if (!storage_policy || !data_volume)
return true;
auto new_storage_policy = getContext()->getStoragePolicy(storage_policy->getName());
auto new_data_volume = new_storage_policy->getVolume(0);
if (new_storage_policy->getVolumes().size() > 1)
LOG_WARNING(log, "Storage policy for Distributed table has multiple volumes. "
"Only {} volume will be used to store data. Other will be ignored.", data_volume->getName());
std::atomic_store(&storage_policy, new_storage_policy);
std::atomic_store(&data_volume, new_data_volume);
for (auto & disk : data_volume->getDisks())
{
if (new_added_disks.contains(disk->getName()))

View File

@ -4,18 +4,25 @@
<disk0>
<path>/var/lib/clickhouse/disk0/</path>
</disk0>
<disk1>
<path>/var/lib/clickhouse/disk1/</path>
</disk1>
</disks>
<policies>
<default_policy>
<volumes>
<default_volume>
<volume0>
<disk>disk0</disk>
</default_volume>
</volume0>
</volumes>
</default_policy>
</policies>
</storage_configuration>
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
</clickhouse>

View File

@ -11,10 +11,10 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node0 = cluster.add_instance(
"node0", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
"node0", with_zookeeper=True, main_configs=["configs/config.xml"]
)
node1 = cluster.add_instance(
"node1", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
"node1", with_zookeeper=True, main_configs=["configs/config.xml"]
)
@ -38,29 +38,37 @@ new_disk_config = """
<disk1>
<path>/var/lib/clickhouse/disk1/</path>
</disk1>
<disk2>
<path>/var/lib/clickhouse/disk2/</path>
</disk2>
</disks>
<policies>
<default_policy>
<volumes>
<default_volume>
<disk>disk2</disk>
<volume1>
<disk>disk1</disk>
</volume1>
<volume0>
<disk>disk0</disk>
</default_volume>
</volume0>
</volumes>
</default_policy>
</policies>
</storage_configuration>
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
</clickhouse>
"""
def set_config(node, config):
node.replace_config(
"/etc/clickhouse-server/config.d/storage_configuration.xml", config
"/etc/clickhouse-server/config.d/config.xml", config
)
node.query("SYSTEM RELOAD CONFIG")
@ -74,13 +82,52 @@ def test_hot_reload_policy(started_cluster):
node1.query(
"CREATE TABLE t (d Int32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/t_mirror', '1') PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
)
node1.query(
"CREATE TABLE t_d (d Int32, s String) ENGINE = Distributed('default', 'default', 't', d%20, 'default_policy')"
)
set_config(node1, new_disk_config)
time.sleep(1)
# After reloading new policy with new disk, merge tree tables should reinitialize the new disk (create relative path, 'detached' folder...)
# Otherwise FETCH PARTITION will fails
node1.query("ALTER TABLE t FETCH PARTITION 1 FROM '/clickhouse/tables/t'")
node1.query("ALTER TABLE t ATTACH PARTITION 1")
# Check that fetch partition success and we get full data from node0
result = int(node1.query("SELECT count() FROM t"))
assert (
result == 4,
"Node should have 2 x full data (4 rows) after reloading storage configuration and fetch new partition, but get {} rows".format(
result
),
result == 2
), "Node should have 2 rows after reloading storage configuration and fetch new partition, but get {} rows".format(
result
)
# Same test for distributed table, it should reinitialize the storage policy and data volume
# We check it by trying an insert and the distribution queue must be on new disk
node1.query(
"SYSTEM STOP DISTRIBUTED SENDS t_d"
)
node1.query(
"INSERT INTO TABLE t_d SETTINGS prefer_localhost_replica = 0 VALUES (2, 'bar') (12, 'bar')"
)
queue_path = node1.query(
"SELECT data_path FROM system.distribution_queue"
)
assert ("disk1" in queue_path), "Distributed table should be using new disk (disk1), but it's still creating queue in {}".format(queue_path)
node1.query(
"SYSTEM START DISTRIBUTED SENDS t_d"
)
node1.query(
"SYSTEM FLUSH DISTRIBUTED t_d"
)
result = int(node1.query("SELECT count() FROM t"))
assert (
result == 4
), "Node should have 4 rows after inserting to distributed table, but get {} rows".format(
result
)