mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #65545 from ClickHouse/add_customizeable_config_reloader_interval
Add ability to change config reload interval
This commit is contained in:
commit
12a129d838
@ -577,8 +577,7 @@ try
|
||||
#if USE_SSL
|
||||
CertificateReloader::instance().tryLoad(*config);
|
||||
#endif
|
||||
},
|
||||
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
||||
});
|
||||
|
||||
SCOPE_EXIT({
|
||||
LOG_INFO(log, "Shutting down.");
|
||||
|
@ -1540,6 +1540,8 @@ try
|
||||
global_context->setMaxDictionaryNumToWarn(new_server_settings.max_dictionary_num_to_warn);
|
||||
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
|
||||
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);
|
||||
/// Only for system.server_settings
|
||||
global_context->setConfigReloaderInterval(new_server_settings.config_reload_interval_ms);
|
||||
|
||||
SlotCount concurrent_threads_soft_limit = UnlimitedSlots;
|
||||
if (new_server_settings.concurrent_threads_soft_limit_num > 0 && new_server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
|
||||
@ -1702,8 +1704,7 @@ try
|
||||
|
||||
/// Must be the last.
|
||||
latest_config = config;
|
||||
},
|
||||
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
||||
});
|
||||
|
||||
const auto listen_hosts = getListenHosts(config());
|
||||
const auto interserver_listen_hosts = getInterserverListenHosts(config());
|
||||
|
@ -880,8 +880,7 @@ void UsersConfigAccessStorage::load(
|
||||
Settings::checkNoSettingNamesAtTopLevel(*new_config, users_config_path);
|
||||
parseFromConfig(*new_config);
|
||||
access_control.getChangesNotifier().sendNotifications();
|
||||
},
|
||||
/* already_loaded = */ false);
|
||||
});
|
||||
}
|
||||
|
||||
void UsersConfigAccessStorage::startPeriodicReloading()
|
||||
|
@ -19,8 +19,7 @@ ConfigReloader::ConfigReloader(
|
||||
const std::string & preprocessed_dir_,
|
||||
zkutil::ZooKeeperNodeCache && zk_node_cache_,
|
||||
const zkutil::EventPtr & zk_changed_event_,
|
||||
Updater && updater_,
|
||||
bool already_loaded)
|
||||
Updater && updater_)
|
||||
: config_path(config_path_)
|
||||
, extra_paths(extra_paths_)
|
||||
, preprocessed_dir(preprocessed_dir_)
|
||||
@ -28,10 +27,15 @@ ConfigReloader::ConfigReloader(
|
||||
, zk_changed_event(zk_changed_event_)
|
||||
, updater(std::move(updater_))
|
||||
{
|
||||
if (!already_loaded)
|
||||
reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true, /* initial_loading = */ true);
|
||||
}
|
||||
auto config = reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true, /* initial_loading = */ true);
|
||||
|
||||
if (config.has_value())
|
||||
reload_interval = std::chrono::milliseconds(config->configuration->getInt64("config_reload_interval_ms", DEFAULT_RELOAD_INTERVAL.count()));
|
||||
else
|
||||
reload_interval = DEFAULT_RELOAD_INTERVAL;
|
||||
|
||||
LOG_TRACE(log, "Config reload interval set to {}ms", reload_interval.count());
|
||||
}
|
||||
|
||||
void ConfigReloader::start()
|
||||
{
|
||||
@ -82,7 +86,17 @@ void ConfigReloader::run()
|
||||
if (quit)
|
||||
return;
|
||||
|
||||
reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false, /* initial_loading = */ false);
|
||||
auto config = reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false, /* initial_loading = */ false);
|
||||
if (config.has_value())
|
||||
{
|
||||
auto new_reload_interval = std::chrono::milliseconds(config->configuration->getInt64("config_reload_interval_ms", DEFAULT_RELOAD_INTERVAL.count()));
|
||||
if (new_reload_interval != reload_interval)
|
||||
{
|
||||
reload_interval = new_reload_interval;
|
||||
LOG_TRACE(log, "Config reload interval changed to {}ms", reload_interval.count());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -92,7 +106,7 @@ void ConfigReloader::run()
|
||||
}
|
||||
}
|
||||
|
||||
void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading)
|
||||
std::optional<ConfigProcessor::LoadedConfig> ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading)
|
||||
{
|
||||
std::lock_guard lock(reload_mutex);
|
||||
|
||||
@ -120,7 +134,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(log, "ZooKeeper error when loading config from '" + config_path + "'");
|
||||
return;
|
||||
return std::nullopt;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -128,7 +142,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(log, "Error loading config from '" + config_path + "'");
|
||||
return;
|
||||
return std::nullopt;
|
||||
}
|
||||
config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir);
|
||||
|
||||
@ -154,11 +168,13 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
tryLogCurrentException(log, "Error updating configuration from '" + config_path + "' config.");
|
||||
return;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Loaded config '{}', performed update on configuration", config_path);
|
||||
return loaded_config;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
struct ConfigReloader::FileWithTimestamp
|
||||
|
@ -17,8 +17,6 @@ namespace Poco { class Logger; }
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
/** Every two seconds checks configuration files for update.
|
||||
* If configuration is changed, then config will be reloaded by ConfigProcessor
|
||||
* and the reloaded config will be applied via Updater functor.
|
||||
@ -27,6 +25,8 @@ class Context;
|
||||
class ConfigReloader
|
||||
{
|
||||
public:
|
||||
static constexpr auto DEFAULT_RELOAD_INTERVAL = std::chrono::milliseconds(2000);
|
||||
|
||||
using Updater = std::function<void(ConfigurationPtr, bool)>;
|
||||
|
||||
ConfigReloader(
|
||||
@ -35,8 +35,7 @@ public:
|
||||
const std::string & preprocessed_dir,
|
||||
zkutil::ZooKeeperNodeCache && zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
Updater && updater,
|
||||
bool already_loaded);
|
||||
Updater && updater);
|
||||
|
||||
~ConfigReloader();
|
||||
|
||||
@ -53,7 +52,7 @@ public:
|
||||
private:
|
||||
void run();
|
||||
|
||||
void reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading);
|
||||
std::optional<ConfigProcessor::LoadedConfig> reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed, bool initial_loading);
|
||||
|
||||
struct FileWithTimestamp;
|
||||
|
||||
@ -67,8 +66,6 @@ private:
|
||||
|
||||
FilesChangesTracker getNewFileList() const;
|
||||
|
||||
static constexpr auto reload_interval = std::chrono::seconds(2);
|
||||
|
||||
LoggerPtr log = getLogger("ConfigReloader");
|
||||
|
||||
std::string config_path;
|
||||
@ -85,6 +82,8 @@ private:
|
||||
std::atomic<bool> quit{false};
|
||||
ThreadFromGlobalPool thread;
|
||||
|
||||
std::chrono::milliseconds reload_interval = DEFAULT_RELOAD_INTERVAL;
|
||||
|
||||
/// Locked inside reloadIfNewer.
|
||||
std::mutex reload_mutex;
|
||||
};
|
||||
|
@ -154,6 +154,7 @@ namespace DB
|
||||
M(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
|
||||
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
|
||||
M(Double, gwp_asan_force_sample_probability, 0, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
|
||||
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
|
||||
|
||||
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp
|
||||
|
||||
|
@ -91,6 +91,7 @@
|
||||
#include <Common/StackTrace.h>
|
||||
#include <Common/Config/ConfigHelper.h>
|
||||
#include <Common/Config/ConfigProcessor.h>
|
||||
#include <Common/Config/ConfigReloader.h>
|
||||
#include <Common/Config/AbstractConfigurationComparison.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
@ -367,6 +368,9 @@ struct ContextSharedPart : boost::noncopyable
|
||||
std::atomic_size_t max_view_num_to_warn = 10000lu;
|
||||
std::atomic_size_t max_dictionary_num_to_warn = 1000lu;
|
||||
std::atomic_size_t max_part_num_to_warn = 100000lu;
|
||||
/// Only for system.server_settings, actually value stored in reloader itself
|
||||
std::atomic_size_t config_reload_interval_ms = ConfigReloader::DEFAULT_RELOAD_INTERVAL.count();
|
||||
|
||||
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
|
||||
String google_protos_path; /// Path to a directory that contains the proto files for the well-known Protobuf types.
|
||||
mutable OnceFlag action_locks_manager_initialized;
|
||||
@ -4503,6 +4507,16 @@ void Context::checkPartitionCanBeDropped(const String & database, const String &
|
||||
checkCanBeDropped(database, table, partition_size, max_partition_size_to_drop);
|
||||
}
|
||||
|
||||
void Context::setConfigReloaderInterval(size_t value_ms)
|
||||
{
|
||||
shared->config_reload_interval_ms.store(value_ms, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
size_t Context::getConfigReloaderInterval() const
|
||||
{
|
||||
return shared->config_reload_interval_ms.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
InputFormatPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size, const std::optional<FormatSettings> & format_settings, std::optional<size_t> max_parsing_threads) const
|
||||
{
|
||||
return FormatFactory::instance().getInput(name, buf, sample, shared_from_this(), max_block_size, format_settings, max_parsing_threads);
|
||||
|
@ -1161,6 +1161,9 @@ public:
|
||||
size_t getMaxPartitionSizeToDrop() const;
|
||||
void checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const;
|
||||
void checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size, const size_t & max_partition_size_to_drop) const;
|
||||
/// Only for system.server_settings, actual value is stored in ConfigReloader
|
||||
void setConfigReloaderInterval(size_t value_ms);
|
||||
size_t getConfigReloaderInterval() const;
|
||||
|
||||
/// Lets you select the compression codec according to the conditions described in the configuration file.
|
||||
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <IO/MMappedFileCache.h>
|
||||
#include <IO/UncompressedCache.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/Config/ConfigReloader.h>
|
||||
#include <Interpreters/ProcessList.h>
|
||||
#include <Storages/MarkCache.h>
|
||||
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
||||
@ -84,7 +85,8 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
|
||||
{"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
|
||||
|
||||
{"merge_workload", {context->getMergeWorkload(), ChangeableWithoutRestart::Yes}},
|
||||
{"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}}
|
||||
{"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}},
|
||||
{"config_reload_interval_ms", {std::to_string(context->getConfigReloaderInterval()), ChangeableWithoutRestart::Yes}}
|
||||
};
|
||||
|
||||
if (context->areBackgroundExecutorsInitialized())
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,4 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<clickhouse>
|
||||
<config_reload_interval_ms>1000</config_reload_interval_ms>
|
||||
</clickhouse>
|
52
tests/integration/test_config_reloader_interval/test.py
Normal file
52
tests/integration/test_config_reloader_interval/test.py
Normal file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
import fnmatch
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/config_reloader.xml"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_reload_config(start_cluster):
|
||||
assert node.wait_for_log_line(
|
||||
f"Config reload interval set to 1000ms", look_behind_lines=2000
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT value from system.server_settings where name = 'config_reload_interval_ms'"
|
||||
)
|
||||
== "1000\n"
|
||||
)
|
||||
node.replace_in_config(
|
||||
"/etc/clickhouse-server/config.d/config_reloader.xml",
|
||||
"1000",
|
||||
"7777",
|
||||
)
|
||||
|
||||
assert node.wait_for_log_line(
|
||||
f"Config reload interval changed to 7777ms", look_behind_lines=2000
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT value from system.server_settings where name = 'config_reload_interval_ms'"
|
||||
)
|
||||
== "7777\n"
|
||||
)
|
Loading…
Reference in New Issue
Block a user