#include "Server.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "MetricsTransmitter.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #include #if defined(OS_LINUX) # include # include # include # include # include #endif #if USE_SSL # include # include #endif #if USE_GRPC # include #endif #if USE_NURAFT # include # include #endif #if USE_AZURE_BLOB_STORAGE # include # include #endif #include /// A minimal file used when the server is run without installation INCBIN(resource_embedded_xml, SOURCE_DIR "/programs/server/embedded.xml"); namespace DB { namespace Setting { extern const SettingsSeconds http_receive_timeout; extern const SettingsSeconds http_send_timeout; extern const SettingsSeconds receive_timeout; extern const SettingsSeconds send_timeout; } namespace MergeTreeSetting { extern const MergeTreeSettingsBool allow_remote_fs_zero_copy_replication; } namespace ServerSetting { extern const ServerSettingsUInt32 asynchronous_heavy_metrics_update_period_s; extern const ServerSettingsUInt32 asynchronous_metrics_update_period_s; extern const ServerSettingsBool asynchronous_metrics_enable_heavy_metrics; extern const ServerSettingsBool async_insert_queue_flush_on_shutdown; extern const ServerSettingsUInt64 async_insert_threads; extern const ServerSettingsBool async_load_databases; extern const ServerSettingsBool async_load_system_database; extern const ServerSettingsUInt64 background_buffer_flush_schedule_pool_size; extern const ServerSettingsUInt64 background_common_pool_size; extern const ServerSettingsUInt64 background_distributed_schedule_pool_size; extern const ServerSettingsUInt64 background_fetches_pool_size; extern const ServerSettingsFloat background_merges_mutations_concurrency_ratio; extern const ServerSettingsString background_merges_mutations_scheduling_policy; extern const ServerSettingsUInt64 background_message_broker_schedule_pool_size; extern const ServerSettingsUInt64 background_move_pool_size; extern const ServerSettingsUInt64 background_pool_size; extern const ServerSettingsUInt64 background_schedule_pool_size; extern const ServerSettingsUInt64 backups_io_thread_pool_queue_size; extern const ServerSettingsDouble cache_size_to_ram_max_ratio; extern const ServerSettingsDouble cannot_allocate_thread_fault_injection_probability; extern const ServerSettingsUInt64 cgroups_memory_usage_observer_wait_time; extern const ServerSettingsUInt64 compiled_expression_cache_elements_size; extern const ServerSettingsUInt64 compiled_expression_cache_size; extern const ServerSettingsUInt64 concurrent_threads_soft_limit_num; extern const ServerSettingsUInt64 concurrent_threads_soft_limit_ratio_to_cores; extern const ServerSettingsUInt64 config_reload_interval_ms; extern const ServerSettingsUInt64 database_catalog_drop_table_concurrency; extern const ServerSettingsString default_database; extern const ServerSettingsBool disable_internal_dns_cache; extern const ServerSettingsUInt64 disk_connections_soft_limit; extern const ServerSettingsUInt64 disk_connections_store_limit; extern const ServerSettingsUInt64 disk_connections_warn_limit; extern const ServerSettingsBool dns_allow_resolve_names_to_ipv4; extern const ServerSettingsBool dns_allow_resolve_names_to_ipv6; extern const ServerSettingsUInt64 dns_cache_max_entries; extern const ServerSettingsInt32 dns_cache_update_period; extern const ServerSettingsUInt32 dns_max_consecutive_failures; extern const ServerSettingsBool enable_azure_sdk_logging; extern const ServerSettingsBool format_alter_operations_with_parentheses; extern const ServerSettingsUInt64 global_profiler_cpu_time_period_ns; extern const ServerSettingsUInt64 global_profiler_real_time_period_ns; extern const ServerSettingsUInt64 http_connections_soft_limit; extern const ServerSettingsUInt64 http_connections_store_limit; extern const ServerSettingsUInt64 http_connections_warn_limit; extern const ServerSettingsString index_mark_cache_policy; extern const ServerSettingsUInt64 index_mark_cache_size; extern const ServerSettingsDouble index_mark_cache_size_ratio; extern const ServerSettingsString index_uncompressed_cache_policy; extern const ServerSettingsUInt64 index_uncompressed_cache_size; extern const ServerSettingsDouble index_uncompressed_cache_size_ratio; extern const ServerSettingsUInt64 io_thread_pool_queue_size; extern const ServerSettingsSeconds keep_alive_timeout; extern const ServerSettingsString mark_cache_policy; extern const ServerSettingsUInt64 mark_cache_size; extern const ServerSettingsDouble mark_cache_size_ratio; extern const ServerSettingsUInt64 max_active_parts_loading_thread_pool_size; extern const ServerSettingsUInt64 max_backups_io_thread_pool_free_size; extern const ServerSettingsUInt64 max_backups_io_thread_pool_size; extern const ServerSettingsUInt64 max_concurrent_insert_queries; extern const ServerSettingsUInt64 max_concurrent_queries; extern const ServerSettingsUInt64 max_concurrent_select_queries; extern const ServerSettingsInt32 max_connections; extern const ServerSettingsUInt64 max_database_num_to_warn; extern const ServerSettingsUInt32 max_database_replicated_create_table_thread_pool_size; extern const ServerSettingsUInt64 max_dictionary_num_to_warn; extern const ServerSettingsUInt64 max_io_thread_pool_free_size; extern const ServerSettingsUInt64 max_io_thread_pool_size; extern const ServerSettingsUInt64 max_keep_alive_requests; extern const ServerSettingsUInt64 max_outdated_parts_loading_thread_pool_size; extern const ServerSettingsUInt64 max_partition_size_to_drop; extern const ServerSettingsUInt64 max_part_num_to_warn; extern const ServerSettingsUInt64 max_parts_cleaning_thread_pool_size; extern const ServerSettingsUInt64 max_server_memory_usage; extern const ServerSettingsDouble max_server_memory_usage_to_ram_ratio; extern const ServerSettingsUInt64 max_table_num_to_warn; extern const ServerSettingsUInt64 max_table_size_to_drop; extern const ServerSettingsUInt64 max_temporary_data_on_disk_size; extern const ServerSettingsUInt64 max_thread_pool_free_size; extern const ServerSettingsUInt64 max_thread_pool_size; extern const ServerSettingsUInt64 max_unexpected_parts_loading_thread_pool_size; extern const ServerSettingsUInt64 max_view_num_to_warn; extern const ServerSettingsUInt64 max_waiting_queries; extern const ServerSettingsUInt64 memory_worker_period_ms; extern const ServerSettingsUInt64 merges_mutations_memory_usage_soft_limit; extern const ServerSettingsDouble merges_mutations_memory_usage_to_ram_ratio; extern const ServerSettingsString merge_workload; extern const ServerSettingsUInt64 mmap_cache_size; extern const ServerSettingsString mutation_workload; extern const ServerSettingsUInt64 page_cache_chunk_size; extern const ServerSettingsUInt64 page_cache_mmap_size; extern const ServerSettingsUInt64 page_cache_size; extern const ServerSettingsBool page_cache_use_madv_free; extern const ServerSettingsBool page_cache_use_transparent_huge_pages; extern const ServerSettingsBool prepare_system_log_tables_on_startup; extern const ServerSettingsBool show_addresses_in_stack_traces; extern const ServerSettingsBool shutdown_wait_backups_and_restores; extern const ServerSettingsUInt64 shutdown_wait_unfinished; extern const ServerSettingsBool shutdown_wait_unfinished_queries; extern const ServerSettingsUInt64 storage_connections_soft_limit; extern const ServerSettingsUInt64 storage_connections_store_limit; extern const ServerSettingsUInt64 storage_connections_warn_limit; extern const ServerSettingsUInt64 tables_loader_background_pool_size; extern const ServerSettingsUInt64 tables_loader_foreground_pool_size; extern const ServerSettingsString temporary_data_in_cache; extern const ServerSettingsUInt64 thread_pool_queue_size; extern const ServerSettingsString tmp_policy; extern const ServerSettingsUInt64 total_memory_profiler_sample_max_allocation_size; extern const ServerSettingsUInt64 total_memory_profiler_sample_min_allocation_size; extern const ServerSettingsUInt64 total_memory_profiler_step; extern const ServerSettingsDouble total_memory_tracker_sample_probability; extern const ServerSettingsString uncompressed_cache_policy; extern const ServerSettingsUInt64 uncompressed_cache_size; extern const ServerSettingsDouble uncompressed_cache_size_ratio; extern const ServerSettingsBool use_legacy_mongodb_integration; } } namespace CurrentMetrics { extern const Metric Revision; extern const Metric VersionInteger; extern const Metric MemoryTracking; extern const Metric MergesMutationsMemoryTracking; extern const Metric MaxDDLEntryID; extern const Metric MaxPushedDDLEntryID; } namespace ProfileEvents { extern const Event MainConfigLoads; extern const Event ServerStartupMilliseconds; extern const Event InterfaceNativeSendBytes; extern const Event InterfaceNativeReceiveBytes; extern const Event InterfaceHTTPSendBytes; extern const Event InterfaceHTTPReceiveBytes; extern const Event InterfacePrometheusSendBytes; extern const Event InterfacePrometheusReceiveBytes; extern const Event InterfaceInterserverSendBytes; extern const Event InterfaceInterserverReceiveBytes; extern const Event InterfaceMySQLSendBytes; extern const Event InterfaceMySQLReceiveBytes; extern const Event InterfacePostgreSQLSendBytes; extern const Event InterfacePostgreSQLReceiveBytes; } namespace fs = std::filesystem; int mainEntryClickHouseServer(int argc, char ** argv) { DB::Server app; /// Do not fork separate process from watchdog if we attached to terminal. /// Otherwise it breaks gdb usage. /// Can be overridden by environment variable (cannot use server config at this moment). if (argc > 0) { const char * env_watchdog = getenv("CLICKHOUSE_WATCHDOG_ENABLE"); // NOLINT(concurrency-mt-unsafe) if (env_watchdog) { if (0 == strcmp(env_watchdog, "1")) app.shouldSetupWatchdog(argv[0]); /// Other values disable watchdog explicitly. } else if (!isatty(STDIN_FILENO) && !isatty(STDOUT_FILENO) && !isatty(STDERR_FILENO)) app.shouldSetupWatchdog(argv[0]); } try { return app.run(argc, argv); } catch (...) { std::cerr << DB::getCurrentExceptionMessage(true) << "\n"; auto code = DB::getCurrentExceptionCode(); return static_cast(code) ? code : 1; } } namespace DB { namespace ErrorCodes { extern const int NO_ELEMENTS_IN_CONFIG; extern const int SUPPORT_IS_DISABLED; extern const int ARGUMENT_OUT_OF_BOUND; extern const int EXCESSIVE_ELEMENT_IN_CONFIG; extern const int INVALID_CONFIG_PARAMETER; extern const int NETWORK_ERROR; extern const int CORRUPTED_DATA; } static std::string getCanonicalPath(std::string && path) { Poco::trimInPlace(path); if (path.empty()) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "path configuration parameter is empty"); if (path.back() != '/') path += '/'; return std::move(path); } Poco::Net::SocketAddress Server::socketBindListen( const Poco::Util::AbstractConfiguration & config, Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure) const { auto address = makeSocketAddress(host, port, &logger()); socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config.getBool("listen_reuse_port", false)); /// If caller requests any available port from the OS, discover it after binding. if (port == 0) { address = socket.address(); LOG_DEBUG(&logger(), "Requested any available port (port == 0), actual port is {:d}", address.port()); } socket.listen(/* backlog = */ config.getUInt("listen_backlog", 4096)); return address; } Strings getListenHosts(const Poco::Util::AbstractConfiguration & config) { auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host"); if (listen_hosts.empty()) { listen_hosts.emplace_back("::1"); listen_hosts.emplace_back("127.0.0.1"); } return listen_hosts; } Strings getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) { auto interserver_listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host"); if (!interserver_listen_hosts.empty()) return interserver_listen_hosts; /// Use more general restriction in case of emptiness return getListenHosts(config); } bool getListenTry(const Poco::Util::AbstractConfiguration & config) { bool listen_try = config.getBool("listen_try", false); if (!listen_try) { Poco::Util::AbstractConfiguration::Keys protocols; config.keys("protocols", protocols); listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty() && std::none_of(protocols.begin(), protocols.end(), [&](const auto & protocol) { return config.has("protocols." + protocol + ".host") && config.has("protocols." + protocol + ".port"); }); } return listen_try; } void Server::createServer( Poco::Util::AbstractConfiguration & config, const std::string & listen_host, const char * port_name, bool listen_try, bool start_server, std::vector & servers, CreateServerFunc && func) const { /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. if (config.getString(port_name, "").empty()) return; /// If we already have an active server for this listen_host/port_name, don't create it again for (const auto & server : servers) { if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name) return; } auto port = config.getInt(port_name); try { servers.push_back(func(port)); if (start_server) { servers.back().start(); LOG_INFO(&logger(), "Listening for {}", servers.back().getDescription()); } global_context->registerServerPort(port_name, port); } catch (const Poco::Exception &) { if (listen_try) { LOG_WARNING(&logger(), "Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, " "then consider to " "specify not disabled IPv4 or IPv6 address to listen in element of configuration " "file. Example for disabled IPv6: 0.0.0.0 ." " Example for disabled IPv4: ::", listen_host, port, getCurrentExceptionMessage(false)); } else { throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false)); } } } #if defined(OS_LINUX) namespace { void setOOMScore(int value, LoggerRawPtr log) { try { std::string value_string = std::to_string(value); DB::WriteBufferFromFile buf("/proc/self/oom_score_adj"); buf.write(value_string.c_str(), value_string.size()); buf.next(); buf.close(); } catch (const Poco::Exception & e) { LOG_WARNING(log, "Failed to adjust OOM score: '{}'.", e.displayText()); return; } LOG_INFO(log, "Set OOM score adjustment to {}", value); } } #endif void Server::uninitialize() { logger().information("shutting down"); BaseDaemon::uninitialize(); } int Server::run() { if (config().hasOption("help")) { Poco::Util::HelpFormatter help_formatter(Server::options()); auto header_str = fmt::format("{} [OPTION] [-- [ARG]...]\n" "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010", commandName()); help_formatter.setHeader(header_str); help_formatter.format(std::cout); return 0; } if (config().hasOption("version")) { std::cout << VERSION_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl; return 0; } return Application::run(); // NOLINT } void Server::initialize(Poco::Util::Application & self) { ConfigProcessor::registerEmbeddedConfig("config.xml", std::string_view(reinterpret_cast(gresource_embedded_xmlData), gresource_embedded_xmlSize)); BaseDaemon::initialize(self); logger().information("starting up"); LOG_INFO(&logger(), "OS name: {}, version: {}, architecture: {}", Poco::Environment::osName(), Poco::Environment::osVersion(), Poco::Environment::osArchitecture()); } std::string Server::getDefaultCorePath() const { return getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)) + "cores"; } void Server::defineOptions(Poco::Util::OptionSet & options) { options.addOption( Poco::Util::Option("help", "h", "show help and exit") .required(false) .repeatable(false) .binding("help")); options.addOption( Poco::Util::Option("version", "V", "show version and exit") .required(false) .repeatable(false) .binding("version")); BaseDaemon::defineOptions(options); } void checkForUsersNotInMainConfig( const Poco::Util::AbstractConfiguration & config, const std::string & config_path, const std::string & users_config_path, LoggerPtr log) { if (config.getBool("skip_check_for_incorrect_settings", false)) return; if (config.has("users") || config.has("profiles") || config.has("quotas")) { /// We cannot throw exception here, because we have support for obsolete 'conf.d' directory /// (that does not correspond to config.d or users.d) but substitute configuration to both of them. LOG_ERROR(log, "The , and elements should be located in users config file: {} not in main config {}." " Also note that you should place configuration changes to the appropriate *.d directory like 'users.d'.", users_config_path, config_path); } } namespace { /// Unused in other builds #if defined(OS_LINUX) String readLine(const String & path) { ReadBufferFromFile in(path); String contents; readStringUntilNewlineInto(contents, in); return contents; } int readNumber(const String & path) { ReadBufferFromFile in(path); int result; readText(result, in); return result; } #endif void sanityChecks(Server & server) { std::string data_path = getCanonicalPath(server.config().getString("path", DBMS_DEFAULT_PATH)); std::string logs_path = server.config().getString("logger.log", ""); if (server.logger().is(Poco::Message::PRIO_TEST)) server.context()->addWarningMessage("Server logging level is set to 'test' and performance is degraded. This cannot be used in production."); #if defined(OS_LINUX) try { const std::unordered_set fast_clock_sources = { // ARM clock "arch_sys_counter", // KVM guest clock "kvm-clock", // X86 clock "tsc", }; const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource"; if (!fast_clock_sources.contains(readLine(filename))) server.context()->addWarningMessage("Linux is not using a fast clock source. Performance can be degraded. Check " + String(filename)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { const char * filename = "/proc/sys/vm/overcommit_memory"; if (readNumber(filename) == 2) server.context()->addWarningMessage("Linux memory overcommit is disabled. Check " + String(filename)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { const char * filename = "/sys/kernel/mm/transparent_hugepage/enabled"; if (readLine(filename).find("[always]") != std::string::npos) server.context()->addWarningMessage("Linux transparent hugepages are set to \"always\". Check " + String(filename)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { const char * filename = "/proc/sys/kernel/pid_max"; if (readNumber(filename) < 30000) server.context()->addWarningMessage("Linux max PID is too low. Check " + String(filename)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { const char * filename = "/proc/sys/kernel/threads-max"; if (readNumber(filename) < 30000) server.context()->addWarningMessage("Linux threads max count is too low. Check " + String(filename)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { const char * filename = "/proc/sys/kernel/task_delayacct"; if (readNumber(filename) == 0) server.context()->addWarningMessage("Delay accounting is not enabled, OSIOWaitMicroseconds will not be gathered. You can enable it using `echo 1 > " + String(filename) + "` or by using sysctl."); } catch (...) // NOLINT(bugprone-empty-catch) { } std::string dev_id = getBlockDeviceId(data_path); if (getBlockDeviceType(dev_id) == BlockDeviceType::ROT && getBlockDeviceReadAheadBytes(dev_id) == 0) server.context()->addWarningMessage("Rotational disk with disabled readahead is in use. Performance can be degraded. Used for data: " + String(data_path)); #endif try { if (getAvailableMemoryAmount() < (2l << 30)) server.context()->addWarningMessage("Available memory at server startup is too low (2GiB)."); } catch (...) // NOLINT(bugprone-empty-catch) { } try { if (!enoughSpaceInDirectory(data_path, 1ull << 30)) server.context()->addWarningMessage("Available disk space for data at server startup is too low (1GiB): " + String(data_path)); } catch (...) // NOLINT(bugprone-empty-catch) { } try { if (!logs_path.empty()) { auto logs_parent = fs::path(logs_path).parent_path(); if (!enoughSpaceInDirectory(logs_parent, 1ull << 30)) server.context()->addWarningMessage("Available disk space for logs at server startup is too low (1GiB): " + String(logs_parent)); } } catch (...) // NOLINT(bugprone-empty-catch) { } if (server.context()->getMergeTreeSettings()[MergeTreeSetting::allow_remote_fs_zero_copy_replication]) { server.context()->addWarningMessage("The setting 'allow_remote_fs_zero_copy_replication' is enabled for MergeTree tables." " But the feature of 'zero-copy replication' is under development and is not ready for production." " The usage of this feature can lead to data corruption and loss. The setting should be disabled in production."); } } } void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, ContextMutablePtr context, Poco::Logger * log) { try { Poco::Util::AbstractConfiguration::Keys keys; config.keys("startup_scripts", keys); SetResultDetailsFunc callback; for (const auto & key : keys) { std::string full_prefix = "startup_scripts." + key; if (config.has(full_prefix + ".condition")) { auto condition = config.getString(full_prefix + ".condition"); auto condition_read_buffer = ReadBufferFromString(condition); auto condition_write_buffer = WriteBufferFromOwnString(); LOG_DEBUG(log, "Checking startup query condition `{}`", condition); auto startup_context = Context::createCopy(context); startup_context->makeQueryContext(); executeQuery(condition_read_buffer, condition_write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {}); auto result = condition_write_buffer.str(); if (result != "1\n" && result != "true\n") { if (result != "0\n" && result != "false\n") context->addWarningMessage(fmt::format("The condition query returned `{}`, which can't be interpreted as a boolean (`0`, `false`, `1`, `true`). Will skip this query.", result)); continue; } LOG_DEBUG(log, "Condition is true, will execute the query next"); } auto query = config.getString(full_prefix + ".query"); auto read_buffer = ReadBufferFromString(query); auto write_buffer = WriteBufferFromOwnString(); LOG_DEBUG(log, "Executing query `{}`", query); auto startup_context = Context::createCopy(context); startup_context->makeQueryContext(); executeQuery(read_buffer, write_buffer, true, startup_context, callback, QueryFlags{ .internal = true }, std::nullopt, {}); } } catch (...) { tryLogCurrentException(log, "Failed to parse startup scripts file"); } } static void initializeAzureSDKLogger( [[ maybe_unused ]] const ServerSettings & server_settings, [[ maybe_unused ]] int server_logs_level) { #if USE_AZURE_BLOB_STORAGE if (!server_settings[ServerSetting::enable_azure_sdk_logging]) return; using AzureLogsLevel = Azure::Core::Diagnostics::Logger::Level; static const std::unordered_map> azure_to_server_mapping = { {AzureLogsLevel::Error, {Poco::Message::PRIO_DEBUG, LogsLevel::debug}}, {AzureLogsLevel::Warning, {Poco::Message::PRIO_DEBUG, LogsLevel::debug}}, {AzureLogsLevel::Informational, {Poco::Message::PRIO_TRACE, LogsLevel::trace}}, {AzureLogsLevel::Verbose, {Poco::Message::PRIO_TEST, LogsLevel::test}}, }; static const std::map server_to_azure_mapping = { {Poco::Message::PRIO_DEBUG, AzureLogsLevel::Warning}, {Poco::Message::PRIO_TRACE, AzureLogsLevel::Informational}, {Poco::Message::PRIO_TEST, AzureLogsLevel::Verbose}, }; static const LoggerPtr azure_sdk_logger = getLogger("AzureSDK"); auto it = server_to_azure_mapping.lower_bound(static_cast(server_logs_level)); chassert(it != server_to_azure_mapping.end()); Azure::Core::Diagnostics::Logger::SetLevel(it->second); Azure::Core::Diagnostics::Logger::SetListener([](AzureLogsLevel level, const std::string & message) { auto [poco_level, db_level] = azure_to_server_mapping.at(level); LOG_IMPL(azure_sdk_logger, db_level, poco_level, fmt::runtime(message)); }); #endif } #if defined(SANITIZER) static std::vector getSanitizerNames() { std::vector names; #if defined(ADDRESS_SANITIZER) names.push_back("address"); #endif #if defined(THREAD_SANITIZER) names.push_back("thread"); #endif #if defined(MEMORY_SANITIZER) names.push_back("memory"); #endif #if defined(UNDEFINED_BEHAVIOR_SANITIZER) names.push_back("undefined behavior"); #endif return names; } #endif int Server::main(const std::vector & /*args*/) try { #if USE_JEMALLOC setJemallocBackgroundThreads(true); #endif Stopwatch startup_watch; Poco::Logger * log = &logger(); UseSSL use_ssl; MainThreadStatus::getInstance(); ServerSettings server_settings; server_settings.loadSettingsFromConfig(config()); ASTAlterCommand::setFormatAlterCommandsWithParentheses(server_settings[ServerSetting::format_alter_operations_with_parentheses]); StackTrace::setShowAddresses(server_settings[ServerSetting::show_addresses_in_stack_traces]); #if USE_HDFS /// This will point libhdfs3 to the right location for its config. /// Note: this has to be done once at server initialization, because 'setenv' is not thread-safe. String libhdfs3_conf = config().getString("hdfs.libhdfs3_conf", ""); if (!libhdfs3_conf.empty()) { if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf)) { const String config_path = config().getString("config-file", "config.xml"); const auto config_dir = std::filesystem::path{config_path}.remove_filename(); if (std::filesystem::exists(config_dir / libhdfs3_conf)) libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf); } setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), true /* overwrite */); // NOLINT } #endif /// When building openssl into clickhouse, clickhouse owns the configuration /// Therefore, the clickhouse openssl configuration should be kept separate from /// the OS. Default to the one in the standard config directory, unless overridden /// by a key in the config. /// Note: this has to be done once at server initialization, because 'setenv' is not thread-safe. if (config().has("opensslconf")) { std::string opensslconf_path = config().getString("opensslconf"); setenv("OPENSSL_CONF", opensslconf_path.c_str(), true); /// NOLINT } else { const String config_path = config().getString("config-file", "config.xml"); const auto config_dir = std::filesystem::path{config_path}.replace_filename("openssl.conf"); setenv("OPENSSL_CONF", config_dir.c_str(), true); /// NOLINT } if (auto total_numa_memory = getNumaNodesTotalMemory(); total_numa_memory.has_value()) { LOG_INFO( log, "ClickHouse is bound to a subset of NUMA nodes. Total memory of all available nodes: {}", ReadableSize(*total_numa_memory)); } registerInterpreters(); registerFunctions(); registerAggregateFunctions(); registerTableFunctions(server_settings[ServerSetting::use_legacy_mongodb_integration]); registerDatabases(); registerStorages(server_settings[ServerSetting::use_legacy_mongodb_integration]); registerDictionaries(server_settings[ServerSetting::use_legacy_mongodb_integration]); registerDisks(/* global_skip_access_check= */ false); registerFormats(); registerRemoteFileMetadatas(); registerSchedulerNodes(); CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision()); CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger()); /** Context contains all that query execution is dependent: * settings, available functions, data types, aggregate functions, databases, ... */ auto shared_context = Context::createShared(); global_context = Context::createGlobal(shared_context.get()); global_context->makeGlobalContext(); global_context->setApplicationType(Context::ApplicationType::SERVER); #if !defined(NDEBUG) || !defined(__OPTIMIZE__) global_context->addWarningMessage("Server was built in debug mode. It will work slowly."); #endif if (ThreadFuzzer::instance().isEffective()) global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable."); #if defined(SANITIZER) auto sanitizers = getSanitizerNames(); String log_message; if (sanitizers.empty()) log_message = "sanitizer"; else if (sanitizers.size() == 1) log_message = fmt::format("{} sanitizer", sanitizers.front()); else log_message = fmt::format("sanitizers ({})", fmt::join(sanitizers, ", ")); global_context->addWarningMessage(fmt::format("Server was built with {}. It will work slowly.", log_message)); #endif #if defined(SANITIZE_COVERAGE) || WITH_COVERAGE global_context->addWarningMessage("Server was built with code coverage. It will work slowly."); #endif const size_t physical_server_memory = getMemoryAmount(); LOG_INFO( log, "Available RAM: {}; logical cores: {}; used cores: {}.", formatReadableSizeWithBinarySuffix(physical_server_memory), std::thread::hardware_concurrency(), getNumberOfCPUCoresToUse() // on ARM processors it can show only enabled at current moment cores ); #if defined(__x86_64__) String cpu_info; #define COLLECT_FLAG(X) \ if (CPU::have##X()) \ { \ if (!cpu_info.empty()) \ cpu_info += ", "; \ cpu_info += #X; \ } CPU_ID_ENUMERATE(COLLECT_FLAG) #undef COLLECT_FLAG LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info); #endif bool has_trace_collector = false; /// Disable it if we collect test coverage information, because it will work extremely slow. #if !WITH_COVERAGE /// Profilers cannot work reliably with any other libunwind or without PHDR cache. has_trace_collector = hasPHDRCache() && config().has("trace_log"); #endif /// Describe multiple reasons when query profiler cannot work. #if WITH_COVERAGE LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage."); #endif #if defined(SANITIZER) LOG_INFO(log, "Query Profiler is disabled because it cannot work under sanitizers" " when two different stack unwinding methods will interfere with each other."); #endif if (!hasPHDRCache()) LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created" " (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe)."); // Initialize global thread pool. Do it before we fetch configs from zookeeper // nodes (`from_zk`), because ZooKeeper interface uses the pool. We will // ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well. GlobalThreadPool::initialize( server_settings[ServerSetting::max_thread_pool_size], server_settings[ServerSetting::max_thread_pool_free_size], server_settings[ServerSetting::thread_pool_queue_size], has_trace_collector ? server_settings[ServerSetting::global_profiler_real_time_period_ns] : 0, has_trace_collector ? server_settings[ServerSetting::global_profiler_cpu_time_period_ns] : 0); if (has_trace_collector) { global_context->createTraceCollector(); /// Set up server-wide memory profiler (for total memory tracker). if (server_settings[ServerSetting::total_memory_profiler_step]) total_memory_tracker.setProfilerStep(server_settings[ServerSetting::total_memory_profiler_step]); if (server_settings[ServerSetting::total_memory_tracker_sample_probability] > 0.0) total_memory_tracker.setSampleProbability(server_settings[ServerSetting::total_memory_tracker_sample_probability]); if (server_settings[ServerSetting::total_memory_profiler_sample_min_allocation_size]) total_memory_tracker.setSampleMinAllocationSize(server_settings[ServerSetting::total_memory_profiler_sample_min_allocation_size]); if (server_settings[ServerSetting::total_memory_profiler_sample_max_allocation_size]) total_memory_tracker.setSampleMaxAllocationSize(server_settings[ServerSetting::total_memory_profiler_sample_max_allocation_size]); } Poco::ThreadPool server_pool( /* minCapacity */3, /* maxCapacity */server_settings[ServerSetting::max_connections], /* idleTime */60, /* stackSize */POCO_THREAD_STACK_SIZE, server_settings[ServerSetting::global_profiler_real_time_period_ns], server_settings[ServerSetting::global_profiler_cpu_time_period_ns]); std::mutex servers_lock; std::vector servers; std::vector servers_to_start_before_tables; /// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed). SCOPE_EXIT({ Stopwatch watch; LOG_INFO(log, "Waiting for background threads"); GlobalThreadPool::instance().shutdown(); LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds()); }); MemoryWorker memory_worker(global_context->getServerSettings()[ServerSetting::memory_worker_period_ms]); /// This object will periodically calculate some metrics. ServerAsynchronousMetrics async_metrics( global_context, server_settings[ServerSetting::asynchronous_metrics_update_period_s], server_settings[ServerSetting::asynchronous_metrics_enable_heavy_metrics], server_settings[ServerSetting::asynchronous_heavy_metrics_update_period_s], [&]() -> std::vector { std::vector metrics; std::lock_guard lock(servers_lock); metrics.reserve(servers_to_start_before_tables.size() + servers.size()); for (const auto & server : servers_to_start_before_tables) metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()}); for (const auto & server : servers) metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()}); return metrics; }, /*update_jemalloc_epoch_=*/memory_worker.getSource() != MemoryWorker::MemoryUsageSource::Jemalloc, /*update_rss_=*/memory_worker.getSource() == MemoryWorker::MemoryUsageSource::None); /// NOTE: global context should be destroyed *before* GlobalThreadPool::shutdown() /// Otherwise GlobalThreadPool::shutdown() will hang, since Context holds some threads. SCOPE_EXIT({ async_metrics.stop(); /** Ask to cancel background jobs all table engines, * and also query_log. * It is important to do early, not in destructor of Context, because * table engines could use Context on destroy. */ LOG_INFO(log, "Shutting down storages."); global_context->shutdown(); LOG_DEBUG(log, "Shut down storages."); if (!servers_to_start_before_tables.empty()) { LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); size_t current_connections = 0; { std::lock_guard lock(servers_lock); for (auto & server : servers_to_start_before_tables) { server.stop(); current_connections += server.currentConnections(); } } if (current_connections) LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections); else LOG_INFO(log, "Closed all listening sockets."); if (current_connections > 0) current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, server_settings[ServerSetting::shutdown_wait_unfinished]); if (current_connections) LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); else LOG_INFO(log, "Closed connections to servers for tables."); } global_context->shutdownKeeperDispatcher(); /// Wait server pool to avoid use-after-free of destroyed context in the handlers server_pool.joinAll(); /** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available. * At this moment, no one could own shared part of Context. */ global_context->resetSharedContext(); global_context.reset(); shared_context.reset(); LOG_DEBUG(log, "Destroyed global context."); }); #if USE_AZURE_BLOB_STORAGE /// It makes sense to deinitialize libxml after joining of all threads /// in global pool because libxml uses thread-local memory allocations via /// 'pthread_key_create' and 'pthread_setspecific' which should be deallocated /// at 'pthread_exit'. Deinitialization of libxml leads to call of 'pthread_key_delete' /// and if it is done before joining of threads, allocated memory will not be freed /// and there may be memory leaks in threads that used libxml. GlobalThreadPool::instance().addOnDestroyCallback([] { Azure::Storage::_internal::XmlGlobalDeinitialize(); }); #endif getIOThreadPool().initialize( server_settings[ServerSetting::max_io_thread_pool_size], server_settings[ServerSetting::max_io_thread_pool_free_size], server_settings[ServerSetting::io_thread_pool_queue_size]); getBackupsIOThreadPool().initialize( server_settings[ServerSetting::max_backups_io_thread_pool_size], server_settings[ServerSetting::max_backups_io_thread_pool_free_size], server_settings[ServerSetting::backups_io_thread_pool_queue_size]); getActivePartsLoadingThreadPool().initialize( server_settings[ServerSetting::max_active_parts_loading_thread_pool_size], 0, // We don't need any threads once all the parts will be loaded server_settings[ServerSetting::max_active_parts_loading_thread_pool_size]); getOutdatedPartsLoadingThreadPool().initialize( server_settings[ServerSetting::max_outdated_parts_loading_thread_pool_size], 0, // We don't need any threads once all the parts will be loaded server_settings[ServerSetting::max_outdated_parts_loading_thread_pool_size]); /// It could grow if we need to synchronously wait until all the data parts will be loaded. getOutdatedPartsLoadingThreadPool().setMaxTurboThreads( server_settings[ServerSetting::max_active_parts_loading_thread_pool_size] ); getUnexpectedPartsLoadingThreadPool().initialize( server_settings[ServerSetting::max_unexpected_parts_loading_thread_pool_size], 0, // We don't need any threads once all the parts will be loaded server_settings[ServerSetting::max_unexpected_parts_loading_thread_pool_size]); /// It could grow if we need to synchronously wait until all the data parts will be loaded. getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads( server_settings[ServerSetting::max_active_parts_loading_thread_pool_size] ); getPartsCleaningThreadPool().initialize( server_settings[ServerSetting::max_parts_cleaning_thread_pool_size], 0, // We don't need any threads one all the parts will be deleted server_settings[ServerSetting::max_parts_cleaning_thread_pool_size]); auto max_database_replicated_create_table_thread_pool_size = server_settings[ServerSetting::max_database_replicated_create_table_thread_pool_size] ? server_settings[ServerSetting::max_database_replicated_create_table_thread_pool_size] : getNumberOfCPUCoresToUse(); getDatabaseReplicatedCreateTablesThreadPool().initialize( max_database_replicated_create_table_thread_pool_size, 0, // We don't need any threads once all the tables will be created max_database_replicated_create_table_thread_pool_size); getDatabaseCatalogDropTablesThreadPool().initialize( server_settings[ServerSetting::database_catalog_drop_table_concurrency], 0, // We don't need any threads if there are no DROP queries. server_settings[ServerSetting::database_catalog_drop_table_concurrency]); /// Initialize global local cache for remote filesystem. if (config().has("local_cache_for_remote_fs")) { bool enable = config().getBool("local_cache_for_remote_fs.enable", false); if (enable) { String root_dir = config().getString("local_cache_for_remote_fs.root_dir"); UInt64 limit_size = config().getUInt64("local_cache_for_remote_fs.limit_size"); UInt64 bytes_read_before_flush = config().getUInt64("local_cache_for_remote_fs.bytes_read_before_flush", DBMS_DEFAULT_BUFFER_SIZE); ExternalDataSourceCache::instance().initOnce(global_context, root_dir, limit_size, bytes_read_before_flush); } } std::string path_str = getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)); fs::path path = path_str; /// Check that the process user id matches the owner of the data. assertProcessUserMatchesDataOwner(path_str, [&](const std::string & message){ global_context->addWarningMessage(message); }); global_context->setPath(path_str); StatusFile status{path / "status", StatusFile::write_full_info}; ServerUUID::load(path / "uuid", log); PlacementInfo::PlacementInfo::instance().initialize(config()); zkutil::validateZooKeeperConfig(config()); bool has_zookeeper = zkutil::hasZooKeeperConfig(config()); zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); }); zkutil::EventPtr main_config_zk_changed_event = std::make_shared(); if (loaded_config.has_zk_includes) { auto old_configuration = loaded_config.configuration; ConfigProcessor config_processor(config_path); loaded_config = config_processor.loadConfigWithZooKeeperIncludes( main_config_zk_node_cache, main_config_zk_changed_event, /* fallback_to_preprocessed = */ true); config_processor.savePreprocessedConfig(loaded_config, path_str); config().removeConfiguration(old_configuration.get()); config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false); global_context->setConfig(loaded_config.configuration); } Settings::checkNoSettingNamesAtTopLevel(config(), config_path); /// We need to reload server settings because config could be updated via zookeeper. server_settings.loadSettingsFromConfig(config()); #if defined(OS_LINUX) std::string executable_path = getExecutablePath(); if (!executable_path.empty()) { /// Integrity check based on checksum of the executable code. /// Note: it is not intended to protect from malicious party, /// because the reference checksum can be easily modified as well. /// And we don't involve asymmetric encryption with PKI yet. /// It's only intended to protect from faulty hardware. /// Note: it is only based on machine code. /// But there are other sections of the binary (e.g. exception handling tables) /// that are interpreted (not executed) but can alter the behaviour of the program as well. /// Please keep the below log messages in-sync with the ones in daemon/BaseDaemon.cpp if (stored_binary_hash.empty()) { LOG_WARNING(log, "Integrity check of the executable skipped because the reference checksum could not be read."); } else { String calculated_binary_hash = getHashOfLoadedBinaryHex(); if (calculated_binary_hash == stored_binary_hash) { LOG_INFO(log, "Integrity check of the executable successfully passed (checksum: {})", calculated_binary_hash); } else { /// If program is run under debugger, ptrace will fail. if (ptrace(PTRACE_TRACEME, 0, nullptr, nullptr) == -1) { /// Program is run under debugger. Modification of it's binary image is ok for breakpoints. global_context->addWarningMessage(fmt::format( "Server is run under debugger and its binary image is modified (most likely with breakpoints).", calculated_binary_hash)); } else { throw Exception( ErrorCodes::CORRUPTED_DATA, "Calculated checksum of the executable ({0}) does not correspond" " to the reference checksum stored in the executable ({1})." " This may indicate one of the following:" " - the executable {2} was changed just after startup;" " - the executable {2} was corrupted on disk due to faulty hardware;" " - the loaded executable was corrupted in memory due to faulty hardware;" " - the file {2} was intentionally modified;" " - a logical error in the code.", calculated_binary_hash, stored_binary_hash, executable_path); } } } } else executable_path = "/usr/bin/clickhouse"; /// It is used for information messages. /// After full config loaded { if (config().getBool("remap_executable", false)) { LOG_DEBUG(log, "Will remap executable in memory."); size_t size = remapExecutable(); LOG_DEBUG(log, "The code ({}) in memory has been successfully remapped.", ReadableSize(size)); } if (config().getBool("mlock_executable", false)) { if (hasLinuxCapability(CAP_IPC_LOCK)) { try { /// Get the memory area with (current) code segment. /// It's better to lock only the code segment instead of calling "mlockall", /// because otherwise debug info will be also locked in memory, and it can be huge. auto [addr, len] = getMappedArea(reinterpret_cast(mainEntryClickHouseServer)); LOG_TRACE(log, "Will do mlock to prevent executable memory from being paged out. It may take a few seconds."); if (0 != mlock(addr, len)) LOG_WARNING(log, "Failed mlock: {}", errnoToString()); else LOG_TRACE(log, "The memory map of clickhouse executable has been mlock'ed, total {}", ReadableSize(len)); } catch (...) { LOG_WARNING(log, "Cannot mlock: {}", getCurrentExceptionMessage(false)); } } else { LOG_INFO(log, "It looks like the process has no CAP_IPC_LOCK capability, binary mlock will be disabled." " It could happen due to incorrect ClickHouse package installation." " You could resolve the problem manually with 'sudo setcap cap_ipc_lock=+ep {}'." " Note that it will not work on 'nosuid' mounted filesystems.", executable_path); } } } FailPointInjection::enableFromGlobalConfig(config()); #endif memory_worker.start(); #if defined(OS_LINUX) int default_oom_score = 0; #if !defined(NDEBUG) /// In debug version on Linux, increase oom score so that clickhouse is killed /// first, instead of some service. Use a carefully chosen random score of 555: /// the maximum is 1000, and chromium uses 300 for its tab processes. Ignore /// whatever errors that occur, because it's just a debugging aid and we don't /// care if it breaks. default_oom_score = 555; #endif int oom_score = config().getInt("oom_score", default_oom_score); if (oom_score) setOOMScore(oom_score, log); #endif global_context->setRemoteHostFilter(config()); global_context->setHTTPHeaderFilter(config()); /// Try to increase limit on number of open files. { rlimit rlim; if (getrlimit(RLIMIT_NOFILE, &rlim)) throw Poco::Exception("Cannot getrlimit"); if (rlim.rlim_cur == rlim.rlim_max) { LOG_DEBUG(log, "rlimit on number of file descriptors is {}", rlim.rlim_cur); } else { rlim_t old = rlim.rlim_cur; rlim.rlim_cur = config().getUInt("max_open_files", static_cast(rlim.rlim_max)); int rc = setrlimit(RLIMIT_NOFILE, &rlim); if (rc != 0) LOG_WARNING(log, "Cannot set max number of file descriptors to {}. Try to specify max_open_files according to your system limits. error: {}", rlim.rlim_cur, errnoToString()); else LOG_DEBUG(log, "Set max number of file descriptors to {} (was {}).", rlim.rlim_cur, old); } } /// Try to increase limit on number of threads. { rlimit rlim; if (getrlimit(RLIMIT_NPROC, &rlim)) throw Poco::Exception("Cannot getrlimit"); if (rlim.rlim_cur == rlim.rlim_max) { LOG_DEBUG(log, "rlimit on number of threads is {}", rlim.rlim_cur); } else { rlim_t old = rlim.rlim_cur; rlim.rlim_cur = rlim.rlim_max; int rc = setrlimit(RLIMIT_NPROC, &rlim); if (rc != 0) { LOG_WARNING(log, "Cannot set max number of threads to {}. error: {}", rlim.rlim_cur, errnoToString()); rlim.rlim_cur = old; } else { LOG_DEBUG(log, "Set max number of threads to {} (was {}).", rlim.rlim_cur, old); } } if (rlim.rlim_cur < 30000) { global_context->addWarningMessage("Maximum number of threads is lower than 30000. There could be problems with handling a lot of simultaneous queries."); } } static ServerErrorHandler error_handler; Poco::ErrorHandler::set(&error_handler); /// Initialize DateLUT early, to not interfere with running time of first query. LOG_DEBUG(log, "Initializing DateLUT."); DateLUT::serverTimezoneInstance(); LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::serverTimezoneInstance().getTimeZone()); /// Storage with temporary data for processing of heavy queries. if (!server_settings[ServerSetting::tmp_policy].value.empty()) { global_context->setTemporaryStoragePolicy(server_settings[ServerSetting::tmp_policy], server_settings[ServerSetting::max_temporary_data_on_disk_size]); } else if (!server_settings[ServerSetting::temporary_data_in_cache].value.empty()) { global_context->setTemporaryStorageInCache(server_settings[ServerSetting::temporary_data_in_cache], server_settings[ServerSetting::max_temporary_data_on_disk_size]); } else { std::string temporary_path = config().getString("tmp_path", path / "tmp/"); global_context->setTemporaryStoragePath(temporary_path, server_settings[ServerSetting::max_temporary_data_on_disk_size]); } /** Directory with 'flags': files indicating temporary settings for the server set by system administrator. * Flags may be cleared automatically after being applied by the server. * Examples: do repair of local data; clone all replicated tables from replica. */ { auto flags_path = path / "flags/"; fs::create_directories(flags_path); global_context->setFlagsPath(flags_path); } /** Directory with user provided files that are usable by 'file' table function. */ { std::string user_files_path = config().getString("user_files_path", path / "user_files/"); global_context->setUserFilesPath(user_files_path); fs::create_directories(user_files_path); } { std::string dictionaries_lib_path = config().getString("dictionaries_lib_path", path / "dictionaries_lib/"); global_context->setDictionariesLibPath(dictionaries_lib_path); fs::create_directories(dictionaries_lib_path); } { std::string user_scripts_path = config().getString("user_scripts_path", path / "user_scripts/"); global_context->setUserScriptsPath(user_scripts_path); fs::create_directories(user_scripts_path); } /// top_level_domains_lists { const std::string & top_level_domains_path = config().getString("top_level_domains_path", path / "top_level_domains/"); TLDListsHolder::getInstance().parseConfig(fs::path(top_level_domains_path) / "", config()); } { fs::create_directories(path / "data"); fs::create_directories(path / "metadata"); /// Directory with metadata of tables, which was marked as dropped by Atomic database fs::create_directories(path / "metadata_dropped"); } if (config().has("interserver_http_port") && config().has("interserver_https_port")) throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both http and https interserver ports are specified"); static const auto interserver_tags = { std::make_tuple("interserver_http_host", "interserver_http_port", "http"), std::make_tuple("interserver_https_host", "interserver_https_port", "https") }; for (auto [host_tag, port_tag, scheme] : interserver_tags) { if (config().has(port_tag)) { String this_host = config().getString(host_tag, ""); if (this_host.empty()) { this_host = getFQDNOrHostName(); LOG_DEBUG(log, "Configuration parameter '{}' doesn't exist or exists and empty. Will use '{}' as replica host.", host_tag, this_host); } String port_str = config().getString(port_tag); int port = parse(port_str); if (port < 0 || port > 0xFFFF) throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Out of range '{}': {}", String(port_tag), port); global_context->setInterserverIOAddress(this_host, port); global_context->setInterserverScheme(scheme); } } LOG_DEBUG(log, "Initializing interserver credentials."); global_context->updateInterserverCredentials(config()); if (config().has("macros")) global_context->setMacros(std::make_unique(config(), "macros", log)); /// Set up caches. const size_t max_cache_size = static_cast(physical_server_memory * server_settings[ServerSetting::cache_size_to_ram_max_ratio]); String uncompressed_cache_policy = server_settings[ServerSetting::uncompressed_cache_policy]; size_t uncompressed_cache_size = server_settings[ServerSetting::uncompressed_cache_size]; double uncompressed_cache_size_ratio = server_settings[ServerSetting::uncompressed_cache_size_ratio]; if (uncompressed_cache_size > max_cache_size) { uncompressed_cache_size = max_cache_size; LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); } global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size, uncompressed_cache_size_ratio); String mark_cache_policy = server_settings[ServerSetting::mark_cache_policy]; size_t mark_cache_size = server_settings[ServerSetting::mark_cache_size]; double mark_cache_size_ratio = server_settings[ServerSetting::mark_cache_size_ratio]; if (mark_cache_size > max_cache_size) { mark_cache_size = max_cache_size; LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size)); } global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio); size_t page_cache_size = server_settings[ServerSetting::page_cache_size]; if (page_cache_size != 0) global_context->setPageCache( server_settings[ServerSetting::page_cache_chunk_size], server_settings[ServerSetting::page_cache_mmap_size], page_cache_size, server_settings[ServerSetting::page_cache_use_madv_free], server_settings[ServerSetting::page_cache_use_transparent_huge_pages]); String index_uncompressed_cache_policy = server_settings[ServerSetting::index_uncompressed_cache_policy]; size_t index_uncompressed_cache_size = server_settings[ServerSetting::index_uncompressed_cache_size]; double index_uncompressed_cache_size_ratio = server_settings[ServerSetting::index_uncompressed_cache_size_ratio]; if (index_uncompressed_cache_size > max_cache_size) { index_uncompressed_cache_size = max_cache_size; LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(index_uncompressed_cache_size)); } global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio); String index_mark_cache_policy = server_settings[ServerSetting::index_mark_cache_policy]; size_t index_mark_cache_size = server_settings[ServerSetting::index_mark_cache_size]; double index_mark_cache_size_ratio = server_settings[ServerSetting::index_mark_cache_size_ratio]; if (index_mark_cache_size > max_cache_size) { index_mark_cache_size = max_cache_size; LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(index_mark_cache_size)); } global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio); size_t mmap_cache_size = server_settings[ServerSetting::mmap_cache_size]; if (mmap_cache_size > max_cache_size) { mmap_cache_size = max_cache_size; LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mmap_cache_size)); } global_context->setMMappedFileCache(mmap_cache_size); size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE); size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES); size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES); size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS); if (query_cache_max_size_in_bytes > max_cache_size) { query_cache_max_size_in_bytes = max_cache_size; LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(query_cache_max_size_in_bytes)); } global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows); #if USE_EMBEDDED_COMPILER size_t compiled_expression_cache_max_size_in_bytes = server_settings[ServerSetting::compiled_expression_cache_size]; size_t compiled_expression_cache_max_elements = server_settings[ServerSetting::compiled_expression_cache_elements_size]; CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements); #endif NamedCollectionFactory::instance().loadIfNot(); FileCacheFactory::instance().loadDefaultCaches(config()); /// Initialize main config reloader. std::string include_from_path = config().getString("include_from", "/etc/metrika.xml"); if (config().has("query_masking_rules")) { SensitiveDataMasker::setInstance(std::make_unique(config(), "query_masking_rules")); } std::optional cgroups_memory_usage_observer; try { auto wait_time = server_settings[ServerSetting::cgroups_memory_usage_observer_wait_time]; if (wait_time != 0) cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time)); } catch (Exception &) { tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization"); } std::string cert_path = config().getString("openSSL.server.certificateFile", ""); std::string key_path = config().getString("openSSL.server.privateKeyFile", ""); std::vector extra_paths = {include_from_path}; if (!cert_path.empty()) extra_paths.emplace_back(cert_path); if (!key_path.empty()) extra_paths.emplace_back(key_path); Poco::Util::AbstractConfiguration::Keys protocols; config().keys("protocols", protocols); for (const auto & protocol : protocols) { cert_path = config().getString("protocols." + protocol + ".certificateFile", ""); key_path = config().getString("protocols." + protocol + ".privateKeyFile", ""); if (!cert_path.empty()) extra_paths.emplace_back(cert_path); if (!key_path.empty()) extra_paths.emplace_back(key_path); } auto main_config_reloader = std::make_unique( config_path, extra_paths, config().getString("path", DBMS_DEFAULT_PATH), std::move(main_config_zk_node_cache), main_config_zk_changed_event, [&, config_file = config().getString("config-file", "config.xml")](ConfigurationPtr config, bool initial_loading) { if (!initial_loading) { /// Add back "config-file" key which is absent in the reloaded config. config->setString("config-file", config_file); /// Apply config updates in global context. global_context->setConfig(config); } Settings::checkNoSettingNamesAtTopLevel(*config, config_path); ServerSettings new_server_settings; new_server_settings.loadSettingsFromConfig(*config); size_t max_server_memory_usage = new_server_settings[ServerSetting::max_server_memory_usage]; double max_server_memory_usage_to_ram_ratio = new_server_settings[ServerSetting::max_server_memory_usage_to_ram_ratio]; size_t current_physical_server_memory = getMemoryAmount(); /// With cgroups, the amount of memory available to the server can be changed dynamically. size_t default_max_server_memory_usage = static_cast(current_physical_server_memory * max_server_memory_usage_to_ram_ratio); if (max_server_memory_usage == 0) { max_server_memory_usage = default_max_server_memory_usage; LOG_INFO(log, "Setting max_server_memory_usage was set to {}" " ({} available * {:.2f} max_server_memory_usage_to_ram_ratio)", formatReadableSizeWithBinarySuffix(max_server_memory_usage), formatReadableSizeWithBinarySuffix(current_physical_server_memory), max_server_memory_usage_to_ram_ratio); } else if (max_server_memory_usage > default_max_server_memory_usage) { max_server_memory_usage = default_max_server_memory_usage; LOG_INFO(log, "Setting max_server_memory_usage was lowered to {}" " because the system has low amount of memory. The amount was" " calculated as {} available" " * {:.2f} max_server_memory_usage_to_ram_ratio", formatReadableSizeWithBinarySuffix(max_server_memory_usage), formatReadableSizeWithBinarySuffix(current_physical_server_memory), max_server_memory_usage_to_ram_ratio); } total_memory_tracker.setHardLimit(max_server_memory_usage); total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); size_t merges_mutations_memory_usage_soft_limit = new_server_settings[ServerSetting::merges_mutations_memory_usage_soft_limit]; size_t default_merges_mutations_server_memory_usage = static_cast(current_physical_server_memory * new_server_settings[ServerSetting::merges_mutations_memory_usage_to_ram_ratio]); if (merges_mutations_memory_usage_soft_limit == 0) { merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), formatReadableSizeWithBinarySuffix(current_physical_server_memory), new_server_settings[ServerSetting::merges_mutations_memory_usage_to_ram_ratio]); } else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage) { merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), formatReadableSizeWithBinarySuffix(current_physical_server_memory), new_server_settings[ServerSetting::merges_mutations_memory_usage_to_ram_ratio]); } LOG_INFO(log, "Merges and mutations memory limit is set to {}", formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit)); background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit); background_memory_tracker.setDescription("(background)"); background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); total_memory_tracker.setOvercommitTracker(global_overcommit_tracker); // FIXME logging-related things need synchronization -- see the 'Logger * log' saved // in a lot of places. For now, disable updating log configuration without server restart. //setTextLog(global_context->getTextLog()); updateLevels(*config, logger()); global_context->setClustersConfig(config, has_zookeeper); global_context->setMacros(std::make_unique(*config, "macros", log)); global_context->setExternalAuthenticatorsConfig(*config); global_context->setDashboardsConfig(config); if (global_context->isServerCompletelyStarted()) { /// It does not make sense to reload anything before server has started. /// Moreover, it may break initialization order. global_context->loadOrReloadDictionaries(*config); global_context->loadOrReloadUserDefinedExecutableFunctions(*config); } global_context->setRemoteHostFilter(*config); global_context->setHTTPHeaderFilter(*config); global_context->setMaxTableSizeToDrop(new_server_settings[ServerSetting::max_table_size_to_drop]); global_context->setMaxPartitionSizeToDrop(new_server_settings[ServerSetting::max_partition_size_to_drop]); global_context->setMaxTableNumToWarn(new_server_settings[ServerSetting::max_table_num_to_warn]); global_context->setMaxViewNumToWarn(new_server_settings[ServerSetting::max_view_num_to_warn]); global_context->setMaxDictionaryNumToWarn(new_server_settings[ServerSetting::max_dictionary_num_to_warn]); global_context->setMaxDatabaseNumToWarn(new_server_settings[ServerSetting::max_database_num_to_warn]); global_context->setMaxPartNumToWarn(new_server_settings[ServerSetting::max_part_num_to_warn]); /// Only for system.server_settings global_context->setConfigReloaderInterval(new_server_settings[ServerSetting::config_reload_interval_ms]); SlotCount concurrent_threads_soft_limit = UnlimitedSlots; if (new_server_settings[ServerSetting::concurrent_threads_soft_limit_num] > 0 && new_server_settings[ServerSetting::concurrent_threads_soft_limit_num] < concurrent_threads_soft_limit) concurrent_threads_soft_limit = new_server_settings[ServerSetting::concurrent_threads_soft_limit_num]; if (new_server_settings[ServerSetting::concurrent_threads_soft_limit_ratio_to_cores] > 0) { auto value = new_server_settings[ServerSetting::concurrent_threads_soft_limit_ratio_to_cores] * getNumberOfCPUCoresToUse(); if (value > 0 && value < concurrent_threads_soft_limit) concurrent_threads_soft_limit = value; } ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit); LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit); global_context->getProcessList().setMaxSize(new_server_settings[ServerSetting::max_concurrent_queries]); global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings[ServerSetting::max_concurrent_insert_queries]); global_context->getProcessList().setMaxSelectQueriesAmount(new_server_settings[ServerSetting::max_concurrent_select_queries]); global_context->getProcessList().setMaxWaitingQueriesAmount(new_server_settings[ServerSetting::max_waiting_queries]); if (config->has("keeper_server")) global_context->updateKeeperConfiguration(*config); /// Reload the number of threads for global pools. /// Note: If you specified it in the top level config (not it config of default profile) /// then ClickHouse will use it exactly. /// This is done for backward compatibility. if (global_context->areBackgroundExecutorsInitialized()) { auto new_pool_size = new_server_settings[ServerSetting::background_pool_size]; auto new_ratio = new_server_settings[ServerSetting::background_merges_mutations_concurrency_ratio]; global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, static_cast(new_pool_size * new_ratio)); global_context->getMergeMutateExecutor()->updateSchedulingPolicy(new_server_settings[ServerSetting::background_merges_mutations_scheduling_policy].toString()); } if (global_context->areBackgroundExecutorsInitialized()) { auto new_pool_size = new_server_settings[ServerSetting::background_move_pool_size]; global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); } if (global_context->areBackgroundExecutorsInitialized()) { auto new_pool_size = new_server_settings[ServerSetting::background_fetches_pool_size]; global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); } if (global_context->areBackgroundExecutorsInitialized()) { auto new_pool_size = new_server_settings[ServerSetting::background_common_pool_size]; global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); } global_context->getBufferFlushSchedulePool().increaseThreadsCount(new_server_settings[ServerSetting::background_buffer_flush_schedule_pool_size]); global_context->getSchedulePool().increaseThreadsCount(new_server_settings[ServerSetting::background_schedule_pool_size]); global_context->getMessageBrokerSchedulePool().increaseThreadsCount(new_server_settings[ServerSetting::background_message_broker_schedule_pool_size]); global_context->getDistributedSchedulePool().increaseThreadsCount(new_server_settings[ServerSetting::background_distributed_schedule_pool_size]); global_context->getAsyncLoader().setMaxThreads(TablesLoaderForegroundPoolId, new_server_settings[ServerSetting::tables_loader_foreground_pool_size]); global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundLoadPoolId, new_server_settings[ServerSetting::tables_loader_background_pool_size]); global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundStartupPoolId, new_server_settings[ServerSetting::tables_loader_background_pool_size]); getIOThreadPool().reloadConfiguration( new_server_settings[ServerSetting::max_io_thread_pool_size], new_server_settings[ServerSetting::max_io_thread_pool_free_size], new_server_settings[ServerSetting::io_thread_pool_queue_size]); getBackupsIOThreadPool().reloadConfiguration( new_server_settings[ServerSetting::max_backups_io_thread_pool_size], new_server_settings[ServerSetting::max_backups_io_thread_pool_free_size], new_server_settings[ServerSetting::backups_io_thread_pool_queue_size]); getActivePartsLoadingThreadPool().reloadConfiguration( new_server_settings[ServerSetting::max_active_parts_loading_thread_pool_size], 0, // We don't need any threads once all the parts will be loaded new_server_settings[ServerSetting::max_active_parts_loading_thread_pool_size]); getOutdatedPartsLoadingThreadPool().reloadConfiguration( new_server_settings[ServerSetting::max_outdated_parts_loading_thread_pool_size], 0, // We don't need any threads once all the parts will be loaded new_server_settings[ServerSetting::max_outdated_parts_loading_thread_pool_size]); /// It could grow if we need to synchronously wait until all the data parts will be loaded. getOutdatedPartsLoadingThreadPool().setMaxTurboThreads( new_server_settings[ServerSetting::max_active_parts_loading_thread_pool_size] ); getPartsCleaningThreadPool().reloadConfiguration( new_server_settings[ServerSetting::max_parts_cleaning_thread_pool_size], 0, // We don't need any threads one all the parts will be deleted new_server_settings[ServerSetting::max_parts_cleaning_thread_pool_size]); global_context->setMergeWorkload(new_server_settings[ServerSetting::merge_workload]); global_context->setMutationWorkload(new_server_settings[ServerSetting::mutation_workload]); if (config->has("resources")) { global_context->getResourceManager()->updateConfiguration(*config); } if (!initial_loading) { /// We do not load ZooKeeper configuration on the first config loading /// because TestKeeper server is not started yet. if (zkutil::hasZooKeeperConfig(*config)) global_context->reloadZooKeeperIfChanged(config); global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config); global_context->reloadQueryMaskingRulesIfChanged(config); if (global_context->isServerCompletelyStarted()) { std::lock_guard lock(servers_lock); updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables); } } global_context->updateStorageConfiguration(*config); global_context->updateInterserverCredentials(*config); global_context->updateUncompressedCacheConfiguration(*config); global_context->updateMarkCacheConfiguration(*config); global_context->updateIndexUncompressedCacheConfiguration(*config); global_context->updateIndexMarkCacheConfiguration(*config); global_context->updateMMappedFileCacheConfiguration(*config); global_context->updateQueryCacheConfiguration(*config); CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "encryption_codecs"); #if USE_SSL CertificateReloader::instance().tryReloadAll(*config); #endif NamedCollectionFactory::instance().reloadFromConfig(*config); FileCacheFactory::instance().updateSettingsFromConfig(*config); HTTPConnectionPools::instance().setLimits( HTTPConnectionPools::Limits{ new_server_settings[ServerSetting::disk_connections_soft_limit], new_server_settings[ServerSetting::disk_connections_warn_limit], new_server_settings[ServerSetting::disk_connections_store_limit], }, HTTPConnectionPools::Limits{ new_server_settings[ServerSetting::storage_connections_soft_limit], new_server_settings[ServerSetting::storage_connections_warn_limit], new_server_settings[ServerSetting::storage_connections_store_limit], }, HTTPConnectionPools::Limits{ new_server_settings[ServerSetting::http_connections_soft_limit], new_server_settings[ServerSetting::http_connections_warn_limit], new_server_settings[ServerSetting::http_connections_store_limit], }); DNSResolver::instance().setFilterSettings(new_server_settings[ServerSetting::dns_allow_resolve_names_to_ipv4], new_server_settings[ServerSetting::dns_allow_resolve_names_to_ipv6]); if (global_context->isServerCompletelyStarted()) CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings[ServerSetting::cannot_allocate_thread_fault_injection_probability]); ProfileEvents::increment(ProfileEvents::MainConfigLoads); /// Must be the last. latest_config = config; }); const auto listen_hosts = getListenHosts(config()); const auto interserver_listen_hosts = getInterserverListenHosts(config()); const auto listen_try = getListenTry(config()); if (config().has("keeper_server.server_id")) { #if USE_NURAFT //// If we don't have configured connection probably someone trying to use clickhouse-server instead //// of clickhouse-keeper, so start synchronously. bool can_initialize_keeper_async = false; if (has_zookeeper) /// We have configured connection to some zookeeper cluster { /// If we cannot connect to some other node from our cluster then we have to wait our Keeper start /// synchronously. can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster(); } /// Initialize keeper RAFT. global_context->initializeKeeperDispatcher(can_initialize_keeper_async); FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher()); auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration & { return global_context->getConfigRef(); }; for (const auto & listen_host : listen_hosts) { /// TCP Keeper const char * port_name = "keeper_server.tcp_port"; createServer( config(), listen_host, port_name, listen_try, /* start_server: */ false, servers_to_start_before_tables, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config(), socket, listen_host, port); socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0)); socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); return ProtocolServerAdapter( listen_host, port_name, "Keeper (tcp): " + address.toString(), std::make_unique( new KeeperTCPHandlerFactory( config_getter, global_context->getKeeperDispatcher(), global_context->getSettingsRef()[Setting::receive_timeout].totalSeconds(), global_context->getSettingsRef()[Setting::send_timeout].totalSeconds(), false), server_pool, socket)); }); const char * secure_port_name = "keeper_server.tcp_port_secure"; createServer( config(), listen_host, secure_port_name, listen_try, /* start_server: */ false, servers_to_start_before_tables, [&](UInt16 port) -> ProtocolServerAdapter { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(config(), socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0)); socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); return ProtocolServerAdapter( listen_host, secure_port_name, "Keeper with secure protocol (tcp_secure): " + address.toString(), std::make_unique( new KeeperTCPHandlerFactory( config_getter, global_context->getKeeperDispatcher(), global_context->getSettingsRef()[Setting::receive_timeout].totalSeconds(), global_context->getSettingsRef()[Setting::send_timeout].totalSeconds(), true), server_pool, socket)); #else UNUSED(port); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); #endif }); /// HTTP control endpoints port_name = "keeper_server.http_control.port"; createServer(config(), listen_host, port_name, listen_try, /* start_server: */ false, servers_to_start_before_tables, [&](UInt16 port) -> ProtocolServerAdapter { auto http_context = httpContext(); Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; http_params->setTimeout(http_context->getReceiveTimeout()); http_params->setKeepAliveTimeout(keep_alive_timeout); Poco::Net::ServerSocket socket; auto address = socketBindListen(config(), socket, listen_host, port); socket.setReceiveTimeout(http_context->getReceiveTimeout()); socket.setSendTimeout(http_context->getSendTimeout()); return ProtocolServerAdapter( listen_host, port_name, "HTTP Control: http://" + address.toString(), std::make_unique( std::move(http_context), createKeeperHTTPControlMainHandlerFactory( config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"), server_pool, socket, http_params)); }); } #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination."); #endif } { std::lock_guard lock(servers_lock); /// We should start interserver communications before (and more important shutdown after) tables. /// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down. /// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can /// communicate with zookeeper, execute merges, etc. createInterserverServers( config(), interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ false); for (auto & server : servers_to_start_before_tables) { server.start(); LOG_INFO(log, "Listening for {}", server.getDescription()); } } /// Initialize access storages. auto & access_control = global_context->getAccessControl(); try { access_control.setupFromMainConfig(config(), config_path, [&] { return global_context->getZooKeeper(); }); } catch (...) { tryLogCurrentException(log, "Caught exception while setting up access control."); throw; } if (cgroups_memory_usage_observer) { cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); }); cgroups_memory_usage_observer->startThread(); } /// Reload config in SYSTEM RELOAD CONFIG query. global_context->setConfigReloadCallback([&]() { main_config_reloader->reload(); access_control.reload(AccessControl::ReloadMode::USERS_CONFIG_ONLY); }); global_context->setStopServersCallback([&](const ServerType & server_type) { std::lock_guard lock(servers_lock); stopServers(servers, server_type); }); global_context->setStartServersCallback([&](const ServerType & server_type) { std::lock_guard lock(servers_lock); createServers( config(), listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true, server_type); }); /// Limit on total number of concurrently executed queries. global_context->getProcessList().setMaxSize(server_settings[ServerSetting::max_concurrent_queries]); /// Load global settings from default_profile and system_profile. global_context->setDefaultProfiles(config()); /// Initialize background executors after we load default_profile config. /// This is needed to load proper values of background_pool_size etc. global_context->initializeBackgroundExecutorsIfNeeded(); if (server_settings[ServerSetting::async_insert_threads]) { global_context->setAsynchronousInsertQueue(std::make_shared( global_context, server_settings[ServerSetting::async_insert_threads], server_settings[ServerSetting::async_insert_queue_flush_on_shutdown])); } /// Set path for format schema files fs::path format_schema_path(config().getString("format_schema_path", path / "format_schemas/")); global_context->setFormatSchemaPath(format_schema_path); fs::create_directories(format_schema_path); /// Set the path for google proto files if (config().has("google_protos_path")) global_context->setGoogleProtosPath(fs::weakly_canonical(config().getString("google_protos_path"))); /// Set path for filesystem caches fs::path filesystem_caches_path(config().getString("filesystem_caches_path", "")); if (!filesystem_caches_path.empty()) global_context->setFilesystemCachesPath(filesystem_caches_path); /// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK /// Additionally, making the check after the default profile is initialized. /// It is important to initialize MergeTreeSettings after Settings, to support compatibility for MergeTreeSettings. sanityChecks(*this); /// Check sanity of MergeTreeSettings on server startup { size_t background_pool_tasks = global_context->getMergeMutateExecutor()->getMaxTasksCount(); global_context->getMergeTreeSettings().sanityCheck(background_pool_tasks); global_context->getReplicatedMergeTreeSettings().sanityCheck(background_pool_tasks); } /// try set up encryption. There are some errors in config, error will be printed and server wouldn't start. CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs"); /// DNSCacheUpdater uses BackgroundSchedulePool which lives in shared context /// and thus this object must be created after the SCOPE_EXIT object where shared /// context is destroyed. /// In addition this object has to be created before the loading of the tables. std::unique_ptr dns_cache_updater; if (server_settings[ServerSetting::disable_internal_dns_cache]) { /// Disable DNS caching at all DNSResolver::instance().setDisableCacheFlag(); LOG_DEBUG(log, "DNS caching disabled"); } else { DNSResolver::instance().setCacheMaxEntries(server_settings[ServerSetting::dns_cache_max_entries]); /// Initialize a watcher periodically updating DNS cache dns_cache_updater = std::make_unique( global_context, server_settings[ServerSetting::dns_cache_update_period], server_settings[ServerSetting::dns_max_consecutive_failures]); } if (dns_cache_updater) dns_cache_updater->start(); /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database].toString(); global_context->setCurrentDatabaseNameInGlobalContext(default_database); LOG_INFO(log, "Loading metadata from {}", path_str); LoadTaskPtrs load_system_metadata_tasks; LoadTaskPtrs load_metadata_tasks; // Make sure that if exception is thrown during startup async, new async loading jobs are not going to be called. // This is important for the case when exception is thrown from loading of metadata with `async_load_databases = false` // to avoid simultaneously running table startups and destructing databases. SCOPE_EXIT_SAFE( LOG_INFO(log, "Stopping AsyncLoader."); // Waits for all currently running jobs to finish and do not run any other pending jobs. global_context->getAsyncLoader().stop(); ); try { /// Don't run background queries until we loaded tables. /// (In particular things would break if a background drop query happens before the /// loadMarkedAsDroppedTables() call below - it'll see dropped table metadata and try to /// drop the table a second time and throw an exception.) global_context->getRefreshSet().setRefreshesStopped(true); auto & database_catalog = DatabaseCatalog::instance(); /// We load temporary database first, because projections need it. database_catalog.initializeAndLoadTemporaryDatabase(); load_system_metadata_tasks = loadMetadataSystem(global_context, server_settings[ServerSetting::async_load_system_database]); maybeConvertSystemDatabase(global_context, load_system_metadata_tasks); /// Startup scripts can depend on the system log tables. if (config().has("startup_scripts") && !server_settings[ServerSetting::prepare_system_log_tables_on_startup].changed) global_context->setServerSetting("prepare_system_log_tables_on_startup", true); /// After attaching system databases we can initialize system log. global_context->initializeSystemLogs(); global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); /// Build loggers before tables startup to make log messages from tables /// attach available in system.text_log buildLoggers(config(), logger()); initializeAzureSDKLogger(server_settings, logger().getLevel()); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper); attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA)); attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)); /// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread, /// that may execute DROP before loadMarkedAsDroppedTables() in background, /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap. database_catalog.loadMarkedAsDroppedTables(); database_catalog.createBackgroundTasks(); /// Then, load remaining databases (some of them maybe be loaded asynchronously) load_metadata_tasks = loadMetadata(global_context, default_database, server_settings[ServerSetting::async_load_databases]); /// If we need to convert database engines, disable async tables loading convertDatabasesEnginesIfNeed(load_metadata_tasks, global_context); database_catalog.startupBackgroundTasks(); /// After loading validate that default database exists database_catalog.assertDatabaseExists(default_database); /// Load user-defined SQL functions. global_context->getUserDefinedSQLObjectsStorage().loadObjects(); /// Load WORKLOADs and RESOURCEs. global_context->getWorkloadEntityStorage().loadEntities(); global_context->getRefreshSet().setRefreshesStopped(false); } catch (...) { tryLogCurrentException(log, "Caught exception while loading metadata"); throw; } bool found_stop_flag = false; if (has_zookeeper && global_context->getMacros()->getMacroMap().contains("replica")) { try { auto zookeeper = global_context->getZooKeeper(); String stop_flag_path = "/clickhouse/stop_replicated_ddl_queries/{replica}"; stop_flag_path = global_context->getMacros()->expand(stop_flag_path); found_stop_flag = zookeeper->exists(stop_flag_path); } catch (const Coordination::Exception & e) { if (e.code != Coordination::Error::ZCONNECTIONLOSS) throw; tryLogCurrentException(log); } } if (found_stop_flag) LOG_INFO(log, "Found a stop flag for replicated DDL queries. They will be disabled"); else DatabaseCatalog::instance().startReplicatedDDLQueries(); LOG_DEBUG(log, "Loaded metadata."); if (has_trace_collector) global_context->initializeTraceCollector(); #if defined(OS_LINUX) auto tasks_stats_provider = TasksStatsCounters::findBestAvailableProvider(); if (tasks_stats_provider == TasksStatsCounters::MetricsProvider::None) { LOG_INFO(log, "It looks like this system does not have procfs mounted at /proc location," " neither clickhouse-server process has CAP_NET_ADMIN capability." " 'taskstats' performance statistics will be disabled." " It could happen due to incorrect ClickHouse package installation." " You can try to resolve the problem manually with 'sudo setcap cap_net_admin=+ep {}'." " Note that it will not work on 'nosuid' mounted filesystems." " It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers.", executable_path); } else { LOG_INFO(log, "Tasks stats provider: {}", TasksStatsCounters::metricsProviderString(tasks_stats_provider)); } if (!hasLinuxCapability(CAP_SYS_NICE)) { LOG_INFO(log, "It looks like the process has no CAP_SYS_NICE capability, the setting 'os_thread_priority' will have no effect." " It could happen due to incorrect ClickHouse package installation." " You could resolve the problem manually with 'sudo setcap cap_sys_nice=+ep {}'." " Note that it will not work on 'nosuid' mounted filesystems.", executable_path); } #else LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); #endif { attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics); { std::lock_guard lock(servers_lock); createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers); if (servers.empty()) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No servers started (add valid listen_host and 'tcp_port' or 'http_port' " "to configuration file.)"); } if (servers.empty()) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No servers started (add valid listen_host and 'tcp_port' or 'http_port' " "to configuration file.)"); #if USE_SSL CertificateReloader::instance().tryLoad(config()); CertificateReloader::instance().tryLoadClient(config()); #endif /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. async_metrics.start(); global_context->setAsynchronousMetrics(&async_metrics); main_config_reloader->start(); access_control.startPeriodicReloading(); /// try to load dictionaries immediately, throw on error and die try { global_context->loadOrReloadDictionaries(config()); if (!config().getBool("dictionaries_lazy_load", true) && config().getBool("wait_dictionaries_load_at_startup", true)) global_context->waitForDictionariesLoad(); } catch (...) { tryLogCurrentException(log, "Caught exception while loading dictionaries."); throw; } /// try to load embedded dictionaries immediately, throw on error and die try { global_context->tryCreateEmbeddedDictionaries(config()); } catch (...) { tryLogCurrentException(log, "Caught exception while loading embedded dictionaries."); throw; } /// try to load user defined executable functions, throw on error and die try { global_context->loadOrReloadUserDefinedExecutableFunctions(config()); } catch (...) { tryLogCurrentException(log, "Caught exception while loading user defined executable functions."); throw; } if (has_zookeeper && config().has("distributed_ddl")) { /// DDL worker should be started after all tables were loaded String ddl_queue_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/"); String ddl_replicas_path = config().getString("distributed_ddl.replicas_path", "/clickhouse/task_queue/replicas/"); int pool_size = config().getInt("distributed_ddl.pool_size", 1); if (pool_size < 1) throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0"); global_context->setDDLWorker( std::make_unique( pool_size, ddl_queue_path, ddl_replicas_path, global_context, &config(), "distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID), joinTasks(load_system_metadata_tasks, load_metadata_tasks)); } /// Do not keep tasks in server, they should be kept inside databases. Used here to make dependent tasks only. load_system_metadata_tasks.clear(); load_system_metadata_tasks.shrink_to_fit(); load_metadata_tasks.clear(); load_metadata_tasks.shrink_to_fit(); if (config().has("startup_scripts")) loadStartupScripts(config(), global_context, log); { std::lock_guard lock(servers_lock); for (auto & server : servers) { server.start(); LOG_INFO(log, "Listening for {}", server.getDescription()); } global_context->setServerCompletelyStarted(); LOG_INFO(log, "Ready for connections."); } startup_watch.stop(); ProfileEvents::increment(ProfileEvents::ServerStartupMilliseconds, startup_watch.elapsedMilliseconds()); CannotAllocateThreadFaultInjector::setFaultProbability(server_settings[ServerSetting::cannot_allocate_thread_fault_injection_probability]); #if USE_GWP_ASAN GWPAsan::initFinished(); #endif try { global_context->startClusterDiscovery(); } catch (...) { tryLogCurrentException(log, "Caught exception while starting cluster discovery"); } #if defined(OS_LINUX) /// Tell the service manager that service startup is finished. /// NOTE: the parent clickhouse-watchdog process must do systemdNotify("MAINPID={}\n", child_pid); before /// the child process notifies 'READY=1'. systemdNotify("READY=1\n"); #endif SCOPE_EXIT_SAFE({ LOG_DEBUG(log, "Received termination signal."); /// Stop reloading of the main config. This must be done before everything else because it /// can try to access/modify already deleted objects. /// E.g. it can recreate new servers or it may pass a changed config to some destroyed parts of ContextSharedPart. main_config_reloader.reset(); access_control.stopPeriodicReloading(); is_cancelled = true; LOG_DEBUG(log, "Waiting for current connections to close."); size_t current_connections = 0; { std::lock_guard lock(servers_lock); for (auto & server : servers) { server.stop(); current_connections += server.currentConnections(); } } if (current_connections) LOG_WARNING(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections); else LOG_INFO(log, "Closed all listening sockets."); /// Wait for unfinished backups and restores. /// This must be done after closing listening sockets (no more backups/restores) but before ProcessList::killAllQueries /// (because killAllQueries() will cancel all running backups/restores). if (server_settings[ServerSetting::shutdown_wait_backups_and_restores]) global_context->waitAllBackupsAndRestores(); /// Killing remaining queries. if (!server_settings[ServerSetting::shutdown_wait_unfinished_queries]) global_context->getProcessList().killAllQueries(); if (current_connections) current_connections = waitServersToFinish(servers, servers_lock, server_settings[ServerSetting::shutdown_wait_unfinished]); if (current_connections) LOG_WARNING(log, "Closed connections. But {} remain." " Tip: To increase wait time add to config: 60", current_connections); else LOG_INFO(log, "Closed connections."); dns_cache_updater.reset(); if (current_connections) { /// There is no better way to force connections to close in Poco. /// Otherwise connection handlers will continue to live /// (they are effectively dangling objects, but they use global thread pool /// and global thread pool destructor will wait for threads, preventing server shutdown). /// Dump coverage here, because std::atexit callback would not be called. dumpCoverageReportIfPossible(); LOG_WARNING(log, "Will shutdown forcefully."); safeExit(0); } }); std::vector> metrics_transmitters; for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite")) { metrics_transmitters.emplace_back(std::make_unique( global_context->getConfigRef(), graphite_key, async_metrics)); } waitForTerminationRequest(); } return Application::EXIT_OK; } catch (...) { /// Poco does not provide stacktrace. tryLogCurrentException("Application"); auto code = getCurrentExceptionCode(); return static_cast(code) ? code : -1; } std::unique_ptr Server::buildProtocolStackFromConfig( const Poco::Util::AbstractConfiguration & config, const std::string & protocol, Poco::Net::HTTPServerParams::Ptr http_params, AsynchronousMetrics & async_metrics, bool & is_secure) { auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr { if (type == "tcp") return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes)); if (type == "tls") #if USE_SSL return TCPServerConnectionFactory::Ptr(new TLSHandlerFactory(*this, conf_name)); #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); #endif if (type == "proxy1") return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name)); if (type == "mysql") return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes)); if (type == "postgres") return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes)); if (type == "http") return TCPServerConnectionFactory::Ptr( new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes) ); if (type == "prometheus") return TCPServerConnectionFactory::Ptr( new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes) ); if (type == "interserver") return TCPServerConnectionFactory::Ptr( new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes) ); throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type); }; std::string conf_name = "protocols." + protocol; std::string prefix = conf_name + "."; std::unordered_set pset {conf_name}; auto stack = std::make_unique(*this, conf_name); while (true) { // if there is no "type" - it's a reference to another protocol and this is just an endpoint if (config.has(prefix + "type")) { std::string type = config.getString(prefix + "type"); if (type == "tls") { if (is_secure) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' contains more than one TLS layer", protocol); is_secure = true; } stack->append(create_factory(type, conf_name)); } if (!config.has(prefix + "impl")) break; conf_name = "protocols." + config.getString(prefix + "impl"); prefix = conf_name + "."; if (!pset.insert(conf_name).second) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name); } return stack; } HTTPContextPtr Server::httpContext() const { return std::make_shared(context()); } void Server::createServers( Poco::Util::AbstractConfiguration & config, const Strings & listen_hosts, bool listen_try, Poco::ThreadPool & server_pool, AsynchronousMetrics & async_metrics, std::vector & servers, bool start_servers, const ServerType & server_type) { const Settings & settings = global_context->getSettingsRef(); Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; http_params->setTimeout(settings[Setting::http_receive_timeout]); http_params->setKeepAliveTimeout(global_context->getServerSettings()[ServerSetting::keep_alive_timeout]); http_params->setMaxKeepAliveRequests(static_cast(global_context->getServerSettings()[ServerSetting::max_keep_alive_requests])); Poco::Util::AbstractConfiguration::Keys protocols; config.keys("protocols", protocols); for (const auto & protocol : protocols) { if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol)) continue; std::string prefix = "protocols." + protocol + "."; std::string port_name = prefix + "port"; std::string description {" protocol"}; if (config.has(prefix + "description")) description = config.getString(prefix + "description"); if (!config.has(prefix + "port")) continue; std::vector hosts; if (config.has(prefix + "host")) hosts.push_back(config.getString(prefix + "host")); else hosts = listen_hosts; for (const auto & host : hosts) { bool is_secure = false; auto stack = buildProtocolStackFromConfig(config, protocol, http_params, async_metrics, is_secure); if (stack->empty()) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' stack empty", protocol); createServer(config, host, port_name.c_str(), listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, host, port, is_secure); socket.setReceiveTimeout(settings[Setting::receive_timeout]); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( host, port_name.c_str(), description + ": " + address.toString(), std::make_unique( stack.release(), server_pool, socket, new Poco::Net::TCPServerParams)); }); } } for (const auto & listen_host : listen_hosts) { const char * port_name; if (server_type.shouldStart(ServerType::Type::HTTP)) { /// HTTP port_name = "http_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port); socket.setReceiveTimeout(settings[Setting::http_receive_timeout]); socket.setSendTimeout(settings[Setting::http_send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "http://" + address.toString(), std::make_unique( httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes)); }); } if (server_type.shouldStart(ServerType::Type::HTTPS)) { /// HTTPS port_name = "https_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings[Setting::http_receive_timeout]); socket.setSendTimeout(settings[Setting::http_send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "https://" + address.toString(), std::make_unique( httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes)); #else UNUSED(port); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "HTTPS protocol is disabled because Poco library was built without NetSSL support."); #endif }); } if (server_type.shouldStart(ServerType::Type::TCP)) { /// TCP port_name = "tcp_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port); socket.setReceiveTimeout(settings[Setting::receive_timeout]); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "native protocol (tcp): " + address.toString(), std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes), server_pool, socket, new Poco::Net::TCPServerParams)); }); } if (server_type.shouldStart(ServerType::Type::TCP_WITH_PROXY)) { /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt port_name = "tcp_with_proxy_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port); socket.setReceiveTimeout(settings[Setting::receive_timeout]); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "native protocol (tcp) with PROXY: " + address.toString(), std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes), server_pool, socket, new Poco::Net::TCPServerParams)); }); } if (server_type.shouldStart(ServerType::Type::TCP_SECURE)) { /// TCP with SSL port_name = "tcp_port_secure"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings[Setting::receive_timeout]); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "secure native protocol (tcp_secure): " + address.toString(), std::make_unique( new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes), server_pool, socket, new Poco::Net::TCPServerParams)); #else UNUSED(port); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); #endif }); } if (server_type.shouldStart(ServerType::Type::MYSQL)) { port_name = "mysql_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "MySQL compatibility protocol: " + address.toString(), std::make_unique(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams)); }); } if (server_type.shouldStart(ServerType::Type::POSTGRESQL)) { port_name = "postgresql_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings[Setting::send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "PostgreSQL compatibility protocol: " + address.toString(), std::make_unique(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams)); }); } #if USE_GRPC if (server_type.shouldStart(ServerType::Type::GRPC)) { port_name = "grpc_port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::SocketAddress server_address(listen_host, port); return ProtocolServerAdapter( listen_host, port_name, "gRPC protocol: " + server_address.toString(), std::make_unique(*this, makeSocketAddress(listen_host, port, &logger()))); }); } #endif if (server_type.shouldStart(ServerType::Type::PROMETHEUS)) { /// Prometheus (if defined and not setup yet with http_port) port_name = "prometheus.port"; createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, listen_host, port); socket.setReceiveTimeout(settings[Setting::http_receive_timeout]); socket.setSendTimeout(settings[Setting::http_send_timeout]); return ProtocolServerAdapter( listen_host, port_name, "Prometheus: http://" + address.toString(), std::make_unique( httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes)); }); } } } void Server::createInterserverServers( Poco::Util::AbstractConfiguration & config, const Strings & interserver_listen_hosts, bool listen_try, Poco::ThreadPool & server_pool, AsynchronousMetrics & async_metrics, std::vector & servers, bool start_servers, const ServerType & server_type) { const Settings & settings = global_context->getSettingsRef(); Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; http_params->setTimeout(settings[Setting::http_receive_timeout]); http_params->setKeepAliveTimeout(global_context->getServerSettings()[ServerSetting::keep_alive_timeout]); /// Now iterate over interserver_listen_hosts for (const auto & interserver_listen_host : interserver_listen_hosts) { const char * port_name; if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTP)) { /// Interserver IO HTTP port_name = "interserver_http_port"; createServer(config, interserver_listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { Poco::Net::ServerSocket socket; auto address = socketBindListen(config, socket, interserver_listen_host, port); socket.setReceiveTimeout(settings[Setting::http_receive_timeout]); socket.setSendTimeout(settings[Setting::http_send_timeout]); return ProtocolServerAdapter( interserver_listen_host, port_name, "replica communication (interserver): http://" + address.toString(), std::make_unique( httpContext(), createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes)); }); } if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTPS)) { port_name = "interserver_https_port"; createServer(config, interserver_listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(config, socket, interserver_listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings[Setting::http_receive_timeout]); socket.setSendTimeout(settings[Setting::http_send_timeout]); return ProtocolServerAdapter( interserver_listen_host, port_name, "secure replica communication (interserver): https://" + address.toString(), std::make_unique( httpContext(), createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes)); #else UNUSED(port); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); #endif }); } } } void Server::stopServers( std::vector & servers, const ServerType & server_type) const { LoggerRawPtr log = &logger(); /// Remove servers once all their connections are closed auto check_server = [&log](const char prefix[], auto & server) { if (!server.isStopping()) return false; size_t current_connections = server.currentConnections(); LOG_DEBUG(log, "Server {}{}: {} ({} connections)", server.getDescription(), prefix, !current_connections ? "finished" : "waiting", current_connections); return !current_connections; }; std::erase_if(servers, std::bind_front(check_server, " (from one of previous remove)")); for (auto & server : servers) { if (!server.isStopping()) { const std::string server_port_name = server.getPortName(); if (server_type.shouldStop(server_port_name)) server.stop(); } } std::erase_if(servers, std::bind_front(check_server, "")); } void Server::updateServers( Poco::Util::AbstractConfiguration & config, Poco::ThreadPool & server_pool, AsynchronousMetrics & async_metrics, std::vector & servers, std::vector & servers_to_start_before_tables) { LoggerRawPtr log = &logger(); const auto listen_hosts = getListenHosts(config); const auto interserver_listen_hosts = getInterserverListenHosts(config); const auto listen_try = getListenTry(config); /// Remove servers once all their connections are closed auto check_server = [&log](const char prefix[], auto & server) { if (!server.isStopping()) return false; size_t current_connections = server.currentConnections(); LOG_DEBUG(log, "Server {}{}: {} ({} connections)", server.getDescription(), prefix, !current_connections ? "finished" : "waiting", current_connections); return !current_connections; }; std::erase_if(servers, std::bind_front(check_server, " (from one of previous reload)")); Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config(); std::vector all_servers; all_servers.reserve(servers.size() + servers_to_start_before_tables.size()); for (auto & server : servers) all_servers.push_back(&server); for (auto & server : servers_to_start_before_tables) all_servers.push_back(&server); for (auto * server : all_servers) { if (server->supportsRuntimeReconfiguration() && !server->isStopping()) { std::string port_name = server->getPortName(); bool has_host = false; bool is_http = false; if (port_name.starts_with("protocols.")) { std::string protocol = port_name.substr(0, port_name.find_last_of('.')); has_host = config.has(protocol + ".host"); std::string conf_name = protocol; std::string prefix = protocol + "."; std::unordered_set pset {conf_name}; while (true) { if (config.has(prefix + "type")) { std::string type = config.getString(prefix + "type"); if (type == "http") { is_http = true; break; } } if (!config.has(prefix + "impl")) break; conf_name = "protocols." + config.getString(prefix + "impl"); prefix = conf_name + "."; if (!pset.insert(conf_name).second) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name); } } else { /// NOTE: better to compare using getPortName() over using /// dynamic_cast<> since HTTPServer is also used for prometheus and /// internal replication communications. is_http = server->getPortName() == "http_port" || server->getPortName() == "https_port"; } if (!has_host) has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server->getListenHost()) != listen_hosts.end(); bool has_port = !config.getString(port_name, "").empty(); bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers"); if (force_restart) LOG_TRACE(log, " had been changed, will reload {}", server->getDescription()); if (!has_host || !has_port || config.getInt(server->getPortName()) != server->portNumber() || force_restart) { server->stop(); LOG_INFO(log, "Stopped listening for {}", server->getDescription()); } } } createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true); createInterserverServers(config, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ true); std::erase_if(servers, std::bind_front(check_server, "")); std::erase_if(servers_to_start_before_tables, std::bind_front(check_server, "")); } }