Merge pull request #56367 from canhld94/ch_table_reinit_new_disk

Adding new disk to storage configuration without restart
This commit is contained in:
Sema Checherinda 2023-11-14 15:54:22 +01:00 committed by GitHub
commit b9cc1580e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 227 additions and 10 deletions

View File

@ -763,7 +763,7 @@ void LocalServer::processConfig()
{
DatabaseCatalog::instance().createBackgroundTasks();
loadMetadata(global_context);
DatabaseCatalog::instance().startupBackgroundCleanup();
DatabaseCatalog::instance().startupBackgroundTasks();
}
/// For ClickHouse local if path is not set the loader will be disabled.

View File

@ -1691,7 +1691,7 @@ try
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
database_catalog.startupBackgroundCleanup();
database_catalog.startupBackgroundTasks();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.

View File

@ -9,6 +9,7 @@
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <algorithm>
#include <set>
@ -429,10 +430,11 @@ StoragePolicySelector::StoragePolicySelector(
}
StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const
StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks, Strings & new_disks) const
{
std::shared_ptr<StoragePolicySelector> result = std::make_shared<StoragePolicySelector>(config, config_prefix, disks);
std::set<String> disks_before_reload;
std::set<String> disks_after_reload;
/// First pass, check.
for (const auto & [name, policy] : policies)
{
@ -443,6 +445,8 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage policy {} is missing in new configuration", backQuote(name));
policy->checkCompatibleWith(result->policies[name]);
for (const auto & disk : policy->getDisks())
disks_before_reload.insert(disk->getName());
}
/// Second pass, load.
@ -453,8 +457,18 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
result->policies[name] = policy;
else
result->policies[name] = std::make_shared<StoragePolicy>(policy, config, config_prefix + "." + name, disks);
for (const auto & disk : result->policies[name]->getDisks())
disks_after_reload.insert(disk->getName());
}
std::set_difference(
disks_after_reload.begin(),
disks_after_reload.end(),
disks_before_reload.begin(),
disks_before_reload.end(),
std::back_inserter(new_disks));
return result;
}

View File

@ -122,7 +122,7 @@ public:
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
StoragePolicySelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const;
StoragePolicySelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks, Strings & new_disks) const;
/// Policy by name
StoragePolicyPtr get(const String & name) const;

View File

@ -3768,6 +3768,7 @@ void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration
{
{
std::lock_guard lock(shared->storage_policies_mutex);
Strings disks_to_reinit;
if (shared->merge_tree_disk_selector)
shared->merge_tree_disk_selector
= shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", shared_from_this());
@ -3777,7 +3778,7 @@ void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration
try
{
shared->merge_tree_storage_policy_selector = shared->merge_tree_storage_policy_selector->updateFromConfig(
config, "storage_configuration.policies", shared->merge_tree_disk_selector);
config, "storage_configuration.policies", shared->merge_tree_disk_selector, disks_to_reinit);
}
catch (Exception & e)
{
@ -3785,6 +3786,12 @@ void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration
shared->log, "An error has occurred while reloading storage policies, storage policies were not applied: {}", e.message());
}
}
if (!disks_to_reinit.empty())
{
LOG_INFO(shared->log, "Initializing disks: ({}) for all tables", fmt::join(disks_to_reinit, ", "));
DatabaseCatalog::instance().triggerReloadDisksTask(disks_to_reinit);
}
}
{
@ -3792,6 +3799,7 @@ void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration
if (shared->storage_s3_settings)
shared->storage_s3_settings->loadFromConfig("s3", config, getSettingsRef());
}
}

View File

@ -1,4 +1,5 @@
#include <string>
#include <mutex>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
@ -200,11 +201,14 @@ void DatabaseCatalog::createBackgroundTasks()
cleanup_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(cleanup_task_holder));
}
auto task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
auto drop_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(drop_task_holder));
auto reload_disks_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->reloadDisksTask(); });
reload_disks_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(reload_disks_task_holder));
}
void DatabaseCatalog::startupBackgroundCleanup()
void DatabaseCatalog::startupBackgroundTasks()
{
/// And it has to be done after all databases are loaded, otherwise cleanup_task may remove something that should not be removed
if (cleanup_task)
@ -1576,6 +1580,37 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP
}
}
void DatabaseCatalog::reloadDisksTask()
{
std::set<String> disks;
{
std::lock_guard lock{reload_disks_mutex};
disks.swap(disks_to_reload);
}
for (auto & database : getDatabases())
{
auto it = database.second->getTablesIterator(getContext());
while (it->isValid())
{
auto table = it->table();
table->initializeDiskOnConfigChange(disks);
it->next();
}
}
std::lock_guard lock{reload_disks_mutex};
if (!disks_to_reload.empty()) /// during reload, another disks configuration change
(*reload_disks_task)->scheduleAfter(DBMS_DEFAULT_DISK_RELOAD_PERIOD_SEC * 1000);
}
void DatabaseCatalog::triggerReloadDisksTask(const Strings & new_added_disks)
{
std::lock_guard lock{reload_disks_mutex};
disks_to_reload.insert(new_added_disks.begin(), new_added_disks.end());
(*reload_disks_task)->schedule();
}
static void maybeUnlockUUID(UUID uuid)
{
if (uuid == UUIDHelpers::Nil)

View File

@ -166,7 +166,7 @@ public:
void createBackgroundTasks();
void initializeAndLoadTemporaryDatabase();
void startupBackgroundCleanup();
void startupBackgroundTasks();
void loadMarkedAsDroppedTables();
/// Get an object that protects the table from concurrently executing multiple DDL operations.
@ -286,6 +286,9 @@ public:
std::lock_guard lock(tables_marked_dropped_mutex);
return tables_marked_dropped;
}
void triggerReloadDisksTask(const Strings & new_added_disks);
private:
// The global instance of database catalog. unique_ptr is to allow
// deferred initialization. Thought I'd use std::optional, but I can't
@ -319,6 +322,8 @@ private:
void cleanupStoreDirectoryTask();
bool maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir);
void reloadDisksTask();
static constexpr size_t reschedule_time_ms = 100;
mutable std::mutex databases_mutex;
@ -380,8 +385,14 @@ private:
static constexpr time_t default_drop_error_cooldown_sec = 5;
time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> reload_disks_task;
std::mutex reload_disks_mutex;
std::set<String> disks_to_reload;
static constexpr time_t DBMS_DEFAULT_DISK_RELOAD_PERIOD_SEC = 5;
};
/// This class is useful when creating a table or database.
/// Usually we create IStorage/IDatabase object first and then add it to IDatabase/DatabaseCatalog.
/// But such object may start using a directory in store/ since its creation.

View File

@ -717,6 +717,9 @@ public:
return getStorageSnapshot(metadata_snapshot, query_context);
}
/// Re initialize disks in case the underlying storage policy changed
virtual bool initializeDiskOnConfigChange(const std::set<String> & /*new_added_disks*/) { return true; }
/// A helper to implement read()
static void readFromPipe(
QueryPlan & query_plan,

View File

@ -8230,4 +8230,24 @@ CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
storage.currently_emerging_big_parts.erase(emerging_part_name);
}
bool MergeTreeData::initializeDiskOnConfigChange(const std::set<String> & new_added_disks)
{
auto storage_policy = getStoragePolicy();
const auto format_version_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
for (const auto & name : new_added_disks)
{
auto disk = storage_policy->tryGetDiskByName(name);
if (disk)
{
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, getContext()->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
buf->finalize();
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
}
}
return true;
}
}

View File

@ -1088,6 +1088,8 @@ public:
const SelectQueryInfo & query_info,
const ActionDAGNodes & added_filter_nodes) const;
bool initializeDiskOnConfigChange(const std::set<String> & /*new_added_disks*/) override;
protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;

View File

@ -1875,4 +1875,19 @@ void registerStorageDistributed(StorageFactory & factory)
});
}
bool StorageDistributed::initializeDiskOnConfigChange(const std::set<String> & new_added_disks)
{
if (!data_volume)
return true;
for (auto & disk : data_volume->getDisks())
{
if (new_added_disks.contains(disk->getName()))
{
initializeDirectoryQueuesForDisk(disk);
}
}
return true;
}
}

View File

@ -159,6 +159,8 @@ public:
/// Used by ClusterCopier
size_t getShardCount() const;
bool initializeDiskOnConfigChange(const std::set<String> & new_added_disks) override;
private:
void renameOnDisk(const String & new_path_to_table_data);

View File

@ -0,0 +1,21 @@
<clickhouse>
<storage_configuration>
<disks>
<disk0>
<path>/var/lib/clickhouse/disk0/</path>
</disk0>
<disk1>
<path>/var/lib/clickhouse/disk1/</path>
</disk1>
</disks>
<policies>
<default_policy>
<volumes>
<default_volume>
<disk>disk0</disk>
</default_volume>
</volumes>
</default_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,86 @@
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node0 = cluster.add_instance(
"node0", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
)
node1 = cluster.add_instance(
"node1", with_zookeeper=True, main_configs=["configs/storage_configuration.xml"]
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
new_disk_config = """
<clickhouse>
<storage_configuration>
<disks>
<disk0>
<path>/var/lib/clickhouse/disk0/</path>
</disk0>
<disk1>
<path>/var/lib/clickhouse/disk1/</path>
</disk1>
<disk2>
<path>/var/lib/clickhouse/disk2/</path>
</disk2>
</disks>
<policies>
<default_policy>
<volumes>
<default_volume>
<disk>disk2</disk>
<disk>disk1</disk>
<disk>disk0</disk>
</default_volume>
</volumes>
</default_policy>
</policies>
</storage_configuration>
</clickhouse>
"""
def set_config(node, config):
node.replace_config(
"/etc/clickhouse-server/config.d/storage_configuration.xml", config
)
node.query("SYSTEM RELOAD CONFIG")
def test_hot_reload_policy(started_cluster):
node0.query(
"CREATE TABLE t (d Int32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/t', '0') PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
)
node0.query("INSERT INTO TABLE t VALUES (1, 'foo') (1, 'bar')")
node1.query(
"CREATE TABLE t (d Int32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/t_mirror', '1') PARTITION BY d ORDER BY tuple() SETTINGS storage_policy = 'default_policy'"
)
set_config(node1, new_disk_config)
time.sleep(1)
node1.query("ALTER TABLE t FETCH PARTITION 1 FROM '/clickhouse/tables/t'")
result = int(node1.query("SELECT count() FROM t"))
assert (
result == 4,
"Node should have 2 x full data (4 rows) after reloading storage configuration and fetch new partition, but get {} rows".format(
result
),
)