mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
watch for certificate file updates in configreloader
This commit is contained in:
parent
2492f1bb78
commit
062b1c464c
@ -1,28 +1,28 @@
|
||||
#include "Keeper.h"
|
||||
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Interpreters/DNSCacheUpdater.h>
|
||||
#include <Coordination/Defines.h>
|
||||
#include <Common/Config/ConfigReloader.h>
|
||||
#include <filesystem>
|
||||
#include <IO/UseSSL.h>
|
||||
#include <pwd.h>
|
||||
#include <Coordination/Defines.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/UseSSL.h>
|
||||
#include <Interpreters/DNSCacheUpdater.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <base/safeExit.h>
|
||||
#include <base/scope_guard.h>
|
||||
#include <sys/stat.h>
|
||||
#include <Poco/Environment.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/TCPServer.h>
|
||||
#include <Poco/Net/TCPServerParams.h>
|
||||
#include <Poco/Util/HelpFormatter.h>
|
||||
#include <Common/ClickHouseRevision.h>
|
||||
#include <Common/Config/ConfigReloader.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/ErrorHandlers.h>
|
||||
#include <Common/assertProcessUserMatchesDataOwner.h>
|
||||
#include <Common/getMultipleKeysFromConfig.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/makeSocketAddress.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <base/scope_guard.h>
|
||||
#include <base/safeExit.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/TCPServerParams.h>
|
||||
#include <Poco/Net/TCPServer.h>
|
||||
#include <Poco/Util/HelpFormatter.h>
|
||||
#include <Poco/Environment.h>
|
||||
#include <sys/stat.h>
|
||||
#include <pwd.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
@ -30,13 +30,13 @@
|
||||
#include <Coordination/KeeperAsynchronousMetrics.h>
|
||||
|
||||
#include <Server/HTTP/HTTPServer.h>
|
||||
#include <Server/TCPServer.h>
|
||||
#include <Server/HTTPHandlerFactory.h>
|
||||
#include <Server/TCPServer.h>
|
||||
|
||||
#include "Core/Defines.h"
|
||||
#include "config.h"
|
||||
#include "config_version.h"
|
||||
#include "config_tools.h"
|
||||
#include "config_version.h"
|
||||
|
||||
|
||||
#if USE_SSL
|
||||
@ -44,8 +44,8 @@
|
||||
# include <Poco/Net/SecureServerSocket.h>
|
||||
#endif
|
||||
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
#include <Server/KeeperTCPHandlerFactory.h>
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
|
||||
#include <Disks/registerDisks.h>
|
||||
|
||||
@ -70,9 +70,9 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
|
||||
|
||||
// Weak symbols don't work correctly on Darwin
|
||||
// so we have a stub implementation to avoid linker errors
|
||||
void collectCrashLog(
|
||||
Int32, UInt64, const String &, const StackTrace &)
|
||||
{}
|
||||
void collectCrashLog(Int32, UInt64, const String &, const StackTrace &)
|
||||
{
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@ -87,7 +87,8 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
Poco::Net::SocketAddress Keeper::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const
|
||||
Poco::Net::SocketAddress
|
||||
Keeper::socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const
|
||||
{
|
||||
auto address = makeSocketAddress(host, port, &logger());
|
||||
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config().getBool("listen_reuse_port", false));
|
||||
@ -113,7 +114,9 @@ void Keeper::createServer(const std::string & listen_host, const char * port_nam
|
||||
|
||||
if (listen_try)
|
||||
{
|
||||
LOG_WARNING(&logger(), "{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
|
||||
LOG_WARNING(
|
||||
&logger(),
|
||||
"{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
|
||||
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
|
||||
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
|
||||
" Example for disabled IPv4: <listen_host>::</listen_host>",
|
||||
@ -137,12 +140,13 @@ int Keeper::run()
|
||||
if (config().hasOption("help"))
|
||||
{
|
||||
Poco::Util::HelpFormatter help_formatter(Keeper::options());
|
||||
auto header_str = fmt::format("{0} [OPTION] [-- [ARG]...]\n"
|
||||
auto header_str = fmt::format(
|
||||
"{0} [OPTION] [-- [ARG]...]\n"
|
||||
#if ENABLE_CLICKHOUSE_KEEPER_CLIENT
|
||||
"{0} client [OPTION]\n"
|
||||
"{0} client [OPTION]\n"
|
||||
#endif
|
||||
"positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010",
|
||||
commandName());
|
||||
"positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010",
|
||||
commandName());
|
||||
help_formatter.setHeader(header_str);
|
||||
help_formatter.format(std::cout);
|
||||
return 0;
|
||||
@ -161,7 +165,9 @@ void Keeper::initialize(Poco::Util::Application & self)
|
||||
BaseDaemon::initialize(self);
|
||||
logger().information("starting up");
|
||||
|
||||
LOG_INFO(&logger(), "OS Name = {}, OS Version = {}, OS Architecture = {}",
|
||||
LOG_INFO(
|
||||
&logger(),
|
||||
"OS Name = {}, OS Version = {}, OS Architecture = {}",
|
||||
Poco::Environment::osName(),
|
||||
Poco::Environment::osVersion(),
|
||||
Poco::Environment::osArchitecture());
|
||||
@ -186,82 +192,64 @@ void Keeper::handleCustomArguments(const std::string & arg, [[maybe_unused]] con
|
||||
|
||||
void Keeper::defineOptions(Poco::Util::OptionSet & options)
|
||||
{
|
||||
options.addOption(Poco::Util::Option("help", "h", "show help and exit").required(false).repeatable(false).binding("help"));
|
||||
options.addOption(Poco::Util::Option("version", "V", "show version and exit").required(false).repeatable(false).binding("version"));
|
||||
options.addOption(
|
||||
Poco::Util::Option("help", "h", "show help and exit")
|
||||
Poco::Util::Option(
|
||||
"force-recovery", "force-recovery", "Force recovery mode allowing Keeper to overwrite cluster configuration without quorum")
|
||||
.required(false)
|
||||
.repeatable(false)
|
||||
.binding("help"));
|
||||
options.addOption(
|
||||
Poco::Util::Option("version", "V", "show version and exit")
|
||||
.required(false)
|
||||
.repeatable(false)
|
||||
.binding("version"));
|
||||
options.addOption(
|
||||
Poco::Util::Option("force-recovery", "force-recovery", "Force recovery mode allowing Keeper to overwrite cluster configuration without quorum")
|
||||
.required(false)
|
||||
.repeatable(false)
|
||||
.noArgument()
|
||||
.callback(Poco::Util::OptionCallback<Keeper>(this, &Keeper::handleCustomArguments)));
|
||||
.noArgument()
|
||||
.callback(Poco::Util::OptionCallback<Keeper>(this, &Keeper::handleCustomArguments)));
|
||||
BaseDaemon::defineOptions(options);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct KeeperHTTPContext : public IHTTPContext
|
||||
{
|
||||
explicit KeeperHTTPContext(ContextPtr context_)
|
||||
: context(std::move(context_))
|
||||
{}
|
||||
|
||||
uint64_t getMaxHstsAge() const override
|
||||
struct KeeperHTTPContext : public IHTTPContext
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.hsts_max_age", 0);
|
||||
}
|
||||
explicit KeeperHTTPContext(ContextPtr context_) : context(std::move(context_)) { }
|
||||
|
||||
uint64_t getMaxUriSize() const override
|
||||
uint64_t getMaxHstsAge() const override { return context->getConfigRef().getUInt64("keeper_server.hsts_max_age", 0); }
|
||||
|
||||
uint64_t getMaxUriSize() const override { return context->getConfigRef().getUInt64("keeper_server.http_max_uri_size", 1048576); }
|
||||
|
||||
uint64_t getMaxFields() const override { return context->getConfigRef().getUInt64("keeper_server.http_max_fields", 1000000); }
|
||||
|
||||
uint64_t getMaxFieldNameSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_field_name_size", 128 * 1024);
|
||||
}
|
||||
|
||||
uint64_t getMaxFieldValueSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_field_value_size", 128 * 1024);
|
||||
}
|
||||
|
||||
uint64_t getMaxChunkSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_chunk_size", 100_GiB);
|
||||
}
|
||||
|
||||
Poco::Timespan getReceiveTimeout() const override
|
||||
{
|
||||
return {context->getConfigRef().getInt64("keeper_server.http_receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0};
|
||||
}
|
||||
|
||||
Poco::Timespan getSendTimeout() const override
|
||||
{
|
||||
return {context->getConfigRef().getInt64("keeper_server.http_send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0};
|
||||
}
|
||||
|
||||
ContextPtr context;
|
||||
};
|
||||
|
||||
HTTPContextPtr httpContext()
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_uri_size", 1048576);
|
||||
return std::make_shared<KeeperHTTPContext>(Context::getGlobalContextInstance());
|
||||
}
|
||||
|
||||
uint64_t getMaxFields() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_fields", 1000000);
|
||||
}
|
||||
|
||||
uint64_t getMaxFieldNameSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_field_name_size", 128 * 1024);
|
||||
}
|
||||
|
||||
uint64_t getMaxFieldValueSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_field_value_size", 128 * 1024);
|
||||
}
|
||||
|
||||
uint64_t getMaxChunkSize() const override
|
||||
{
|
||||
return context->getConfigRef().getUInt64("keeper_server.http_max_chunk_size", 100_GiB);
|
||||
}
|
||||
|
||||
Poco::Timespan getReceiveTimeout() const override
|
||||
{
|
||||
return {context->getConfigRef().getInt64("keeper_server.http_receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0};
|
||||
}
|
||||
|
||||
Poco::Timespan getSendTimeout() const override
|
||||
{
|
||||
return {context->getConfigRef().getInt64("keeper_server.http_send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0};
|
||||
}
|
||||
|
||||
ContextPtr context;
|
||||
};
|
||||
|
||||
HTTPContextPtr httpContext()
|
||||
{
|
||||
return std::make_shared<KeeperHTTPContext>(Context::getGlobalContextInstance());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int Keeper::main(const std::vector<std::string> & /*args*/)
|
||||
@ -298,7 +286,7 @@ try
|
||||
std::filesystem::create_directories(path);
|
||||
|
||||
/// Check that the process user id matches the owner of the data.
|
||||
assertProcessUserMatchesDataOwner(path, [&](const std::string & message){ LOG_WARNING(log, fmt::runtime(message)); });
|
||||
assertProcessUserMatchesDataOwner(path, [&](const std::string & message) { LOG_WARNING(log, fmt::runtime(message)); });
|
||||
|
||||
DB::ServerUUID::load(path + "/uuid", log);
|
||||
|
||||
@ -307,8 +295,7 @@ try
|
||||
GlobalThreadPool::initialize(
|
||||
config().getUInt("max_thread_pool_size", 100),
|
||||
config().getUInt("max_thread_pool_free_size", 1000),
|
||||
config().getUInt("thread_pool_queue_size", 10000)
|
||||
);
|
||||
config().getUInt("thread_pool_queue_size", 10000));
|
||||
|
||||
static ServerErrorHandler error_handler;
|
||||
Poco::ErrorHandler::set(&error_handler);
|
||||
@ -350,8 +337,7 @@ try
|
||||
for (const auto & server : *servers)
|
||||
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
|
||||
return metrics;
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
|
||||
|
||||
@ -367,10 +353,7 @@ try
|
||||
global_context->initializeKeeperDispatcher(/* start_async = */ true);
|
||||
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
|
||||
|
||||
auto config_getter = [&] () -> const Poco::Util::AbstractConfiguration &
|
||||
{
|
||||
return global_context->getConfigRef();
|
||||
};
|
||||
auto config_getter = [&]() -> const Poco::Util::AbstractConfiguration & { return global_context->getConfigRef(); };
|
||||
|
||||
auto tcp_receive_timeout = config().getInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC);
|
||||
auto tcp_send_timeout = config().getInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC);
|
||||
@ -379,43 +362,55 @@ try
|
||||
{
|
||||
/// TCP Keeper
|
||||
const char * port_name = "keeper_server.tcp_port";
|
||||
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
|
||||
socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Keeper (tcp): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter, global_context->getKeeperDispatcher(),
|
||||
tcp_receive_timeout, tcp_send_timeout, false), server_pool, socket));
|
||||
});
|
||||
createServer(
|
||||
listen_host,
|
||||
port_name,
|
||||
listen_try,
|
||||
[&](UInt16 port)
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
|
||||
socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Keeper (tcp): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter, global_context->getKeeperDispatcher(), tcp_receive_timeout, tcp_send_timeout, false),
|
||||
server_pool,
|
||||
socket));
|
||||
});
|
||||
|
||||
const char * secure_port_name = "keeper_server.tcp_port_secure";
|
||||
createServer(listen_host, secure_port_name, listen_try, [&](UInt16 port)
|
||||
{
|
||||
createServer(
|
||||
listen_host,
|
||||
secure_port_name,
|
||||
listen_try,
|
||||
[&](UInt16 port)
|
||||
{
|
||||
#if USE_SSL
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
|
||||
socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
secure_port_name,
|
||||
"Keeper with secure protocol (tcp_secure): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter, global_context->getKeeperDispatcher(),
|
||||
tcp_receive_timeout, tcp_send_timeout, true), server_pool, socket));
|
||||
Poco::Net::SecureServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
||||
socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
|
||||
socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
secure_port_name,
|
||||
"Keeper with secure protocol (tcp_secure): " + address.toString(),
|
||||
std::make_unique<TCPServer>(
|
||||
new KeeperTCPHandlerFactory(
|
||||
config_getter, global_context->getKeeperDispatcher(), tcp_receive_timeout, tcp_send_timeout, true),
|
||||
server_pool,
|
||||
socket));
|
||||
#else
|
||||
UNUSED(port);
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
UNUSED(port);
|
||||
throw Exception(
|
||||
ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
|
||||
#endif
|
||||
});
|
||||
});
|
||||
|
||||
const auto & config = config_getter();
|
||||
auto http_context = httpContext();
|
||||
@ -426,19 +421,27 @@ try
|
||||
|
||||
/// Prometheus (if defined and not setup yet with http_port)
|
||||
port_name = "prometheus.port";
|
||||
createServer(listen_host, port_name, listen_try, [&, my_http_context = std::move(http_context)](UInt16 port) mutable
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
|
||||
socket.setSendTimeout(my_http_context->getSendTimeout());
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Prometheus: http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::move(my_http_context), createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
|
||||
});
|
||||
createServer(
|
||||
listen_host,
|
||||
port_name,
|
||||
listen_try,
|
||||
[&, my_http_context = std::move(http_context)](UInt16 port) mutable
|
||||
{
|
||||
Poco::Net::ServerSocket socket;
|
||||
auto address = socketBindListen(socket, listen_host, port);
|
||||
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
|
||||
socket.setSendTimeout(my_http_context->getSendTimeout());
|
||||
servers->emplace_back(
|
||||
listen_host,
|
||||
port_name,
|
||||
"Prometheus: http://" + address.toString(),
|
||||
std::make_unique<HTTPServer>(
|
||||
std::move(my_http_context),
|
||||
createPrometheusMainHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"),
|
||||
server_pool,
|
||||
socket,
|
||||
http_params));
|
||||
});
|
||||
}
|
||||
|
||||
for (auto & server : *servers)
|
||||
@ -454,7 +457,7 @@ try
|
||||
/// ConfigReloader have to strict parameters which are redundant in our case
|
||||
auto main_config_reloader = std::make_unique<ConfigReloader>(
|
||||
config_path,
|
||||
include_from_path,
|
||||
std::vector{{include_from_path}},
|
||||
config().getString("path", ""),
|
||||
std::move(unused_cache),
|
||||
unused_event,
|
||||
@ -463,7 +466,7 @@ try
|
||||
if (config->has("keeper_server"))
|
||||
global_context->updateKeeperConfiguration(*config);
|
||||
},
|
||||
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
||||
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
||||
|
||||
SCOPE_EXIT({
|
||||
LOG_INFO(log, "Shutting down.");
|
||||
@ -488,7 +491,10 @@ try
|
||||
current_connections = waitServersToFinish(*servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
|
||||
|
||||
if (current_connections)
|
||||
LOG_INFO(log, "Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.", current_connections);
|
||||
LOG_INFO(
|
||||
log,
|
||||
"Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.",
|
||||
current_connections);
|
||||
else
|
||||
LOG_INFO(log, "Closed connections to Keeper.");
|
||||
|
||||
@ -526,11 +532,10 @@ catch (...)
|
||||
|
||||
void Keeper::logRevision() const
|
||||
{
|
||||
Poco::Logger::root().information("Starting ClickHouse Keeper " + std::string{VERSION_STRING}
|
||||
+ "(revision : " + std::to_string(ClickHouseRevision::getVersionRevision())
|
||||
+ ", git hash: " + (git_hash.empty() ? "<unknown>" : git_hash)
|
||||
+ ", build id: " + (build_id.empty() ? "<unknown>" : build_id) + ")"
|
||||
+ ", PID " + std::to_string(getpid()));
|
||||
Poco::Logger::root().information(
|
||||
"Starting ClickHouse Keeper " + std::string{VERSION_STRING} + "(revision : "
|
||||
+ std::to_string(ClickHouseRevision::getVersionRevision()) + ", git hash: " + (git_hash.empty() ? "<unknown>" : git_hash)
|
||||
+ ", build id: " + (build_id.empty() ? "<unknown>" : build_id) + ")" + ", PID " + std::to_string(getpid()));
|
||||
}
|
||||
|
||||
|
||||
|
@ -1100,9 +1100,16 @@ try
|
||||
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
|
||||
}
|
||||
|
||||
const std::string cert_path = config().getString("openSSL.server.certificateFile", "");
|
||||
const std::string key_path = config().getString("openSSL.server.privateKeyFile", "");
|
||||
|
||||
std::vector<std::string> extra_paths = {include_from_path};
|
||||
if (!cert_path.empty()) extra_paths.emplace_back(cert_path);
|
||||
if (!key_path.empty()) extra_paths.emplace_back(key_path);
|
||||
|
||||
auto main_config_reloader = std::make_unique<ConfigReloader>(
|
||||
config_path,
|
||||
include_from_path,
|
||||
extra_paths,
|
||||
config().getString("path", ""),
|
||||
std::move(main_config_zk_node_cache),
|
||||
main_config_zk_changed_event,
|
||||
|
@ -807,7 +807,7 @@ void UsersConfigAccessStorage::load(
|
||||
config_reloader.reset();
|
||||
config_reloader = std::make_unique<ConfigReloader>(
|
||||
users_config_path,
|
||||
include_from_path,
|
||||
std::vector{{include_from_path}},
|
||||
preprocessed_dir,
|
||||
zkutil::ZooKeeperNodeCache(get_zookeeper_function),
|
||||
std::make_shared<Poco::Event>(),
|
||||
|
@ -14,14 +14,15 @@ namespace DB
|
||||
{
|
||||
|
||||
ConfigReloader::ConfigReloader(
|
||||
const std::string & path_,
|
||||
const std::string & include_from_path_,
|
||||
std::string_view config_path_,
|
||||
const std::vector<std::string>& extra_paths_,
|
||||
const std::string & preprocessed_dir_,
|
||||
zkutil::ZooKeeperNodeCache && zk_node_cache_,
|
||||
const zkutil::EventPtr & zk_changed_event_,
|
||||
Updater && updater_,
|
||||
bool already_loaded)
|
||||
: path(path_), include_from_path(include_from_path_)
|
||||
: config_path(config_path_)
|
||||
, extra_paths(extra_paths_)
|
||||
, preprocessed_dir(preprocessed_dir_)
|
||||
, zk_node_cache(std::move(zk_node_cache_))
|
||||
, zk_changed_event(zk_changed_event_)
|
||||
@ -98,10 +99,10 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
FilesChangesTracker new_files = getNewFileList();
|
||||
if (force || need_reload_from_zk || new_files.isDifferOrNewerThan(files))
|
||||
{
|
||||
ConfigProcessor config_processor(path);
|
||||
ConfigProcessor config_processor(config_path);
|
||||
ConfigProcessor::LoadedConfig loaded_config;
|
||||
|
||||
LOG_DEBUG(log, "Loading config '{}'", path);
|
||||
LOG_DEBUG(log, "Loading config '{}'", config_path);
|
||||
|
||||
try
|
||||
{
|
||||
@ -118,7 +119,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(log, "ZooKeeper error when loading config from '" + path + "'");
|
||||
tryLogCurrentException(log, "ZooKeeper error when loading config from '" + config_path + "'");
|
||||
return;
|
||||
}
|
||||
catch (...)
|
||||
@ -126,7 +127,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(log, "Error loading config from '" + path + "'");
|
||||
tryLogCurrentException(log, "Error loading config from '" + config_path + "'");
|
||||
return;
|
||||
}
|
||||
config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir);
|
||||
@ -142,7 +143,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
need_reload_from_zk = false;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Loaded config '{}', performing update on configuration", path);
|
||||
LOG_DEBUG(log, "Loaded config '{}', performing update on configuration", config_path);
|
||||
|
||||
try
|
||||
{
|
||||
@ -152,11 +153,11 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
tryLogCurrentException(log, "Error updating configuration from '" + path + "' config.");
|
||||
tryLogCurrentException(log, "Error updating configuration from '" + config_path + "' config.");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Loaded config '{}', performed update on configuration", path);
|
||||
LOG_DEBUG(log, "Loaded config '{}', performed update on configuration", config_path);
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,10 +197,11 @@ ConfigReloader::FilesChangesTracker ConfigReloader::getNewFileList() const
|
||||
{
|
||||
FilesChangesTracker file_list;
|
||||
|
||||
file_list.addIfExists(path);
|
||||
file_list.addIfExists(include_from_path);
|
||||
file_list.addIfExists(config_path);
|
||||
for (const std::string& path : extra_paths)
|
||||
file_list.addIfExists(path);
|
||||
|
||||
for (const auto & merge_path : ConfigProcessor::getConfigMergeFiles(path))
|
||||
for (const auto & merge_path : ConfigProcessor::getConfigMergeFiles(config_path))
|
||||
file_list.addIfExists(merge_path);
|
||||
|
||||
return file_list;
|
||||
|
@ -4,7 +4,8 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
|
||||
#include <time.h>
|
||||
#include <ctime>
|
||||
#include <span>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
@ -22,23 +23,21 @@ class Context;
|
||||
/** Every two seconds checks configuration files for update.
|
||||
* If configuration is changed, then config will be reloaded by ConfigProcessor
|
||||
* and the reloaded config will be applied via Updater functor.
|
||||
* It doesn't take into account changes of --config-file, <users_config> and <include_from> parameters.
|
||||
* It doesn't take into account changes of --config-file and <users_config>.
|
||||
*/
|
||||
class ConfigReloader
|
||||
{
|
||||
public:
|
||||
using Updater = std::function<void(ConfigurationPtr, bool)>;
|
||||
|
||||
/** include_from_path is usually /etc/metrika.xml (i.e. value of <include_from> tag)
|
||||
*/
|
||||
ConfigReloader(
|
||||
const std::string & path,
|
||||
const std::string & include_from_path,
|
||||
const std::string & preprocessed_dir,
|
||||
zkutil::ZooKeeperNodeCache && zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
Updater && updater,
|
||||
bool already_loaded);
|
||||
std::string_view path_,
|
||||
const std::vector<std::string>& extra_paths_,
|
||||
const std::string & preprocessed_dir,
|
||||
zkutil::ZooKeeperNodeCache && zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
Updater && updater,
|
||||
bool already_loaded);
|
||||
|
||||
~ConfigReloader();
|
||||
|
||||
@ -73,8 +72,9 @@ private:
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("ConfigReloader");
|
||||
|
||||
std::string path;
|
||||
std::string include_from_path;
|
||||
std::string config_path;
|
||||
std::vector<std::string> extra_paths;
|
||||
|
||||
std::string preprocessed_dir;
|
||||
FilesChangesTracker files;
|
||||
zkutil::ZooKeeperNodeCache zk_node_cache;
|
||||
|
Loading…
Reference in New Issue
Block a user