2021-05-12 10:39:07 +00:00
# include "Keeper.h"
2021-05-12 13:04:34 +00:00
# include <Common/ClickHouseRevision.h>
2021-10-03 09:54:23 +00:00
# include <Common/getMultipleKeysFromConfig.h>
2021-05-12 10:39:07 +00:00
# include <Common/DNSResolver.h>
# include <Interpreters/DNSCacheUpdater.h>
2021-10-03 09:54:23 +00:00
# include <Coordination/Defines.h>
2021-10-19 14:29:49 +00:00
# include <Common/Config/ConfigReloader.h>
2021-10-03 09:54:23 +00:00
# include <filesystem>
# include <IO/UseSSL.h>
# include <Core/ServerUUID.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
# include <Common/ErrorHandlers.h>
2021-10-02 07:13:14 +00:00
# include <base/scope_guard.h>
2022-03-02 09:59:10 +00:00
# include <base/safeExit.h>
2021-10-03 09:54:23 +00:00
# include <Poco/Net/NetException.h>
# include <Poco/Net/TCPServerParams.h>
# include <Poco/Net/TCPServer.h>
2021-05-12 10:39:07 +00:00
# include <Poco/Util/HelpFormatter.h>
# include <Poco/Version.h>
# include <Poco/Environment.h>
2021-10-03 09:54:23 +00:00
# include <sys/stat.h>
# include <pwd.h>
2022-11-09 12:37:42 +00:00
2021-10-27 12:26:42 +00:00
# include <Coordination/FourLetterCommand.h>
2022-11-09 12:37:42 +00:00
# include <Coordination/KeeperAsynchronousMetrics.h>
2021-05-12 10:39:07 +00:00
2022-11-09 08:02:04 +00:00
# include <Server/HTTP/HTTPServer.h>
2022-11-09 12:37:42 +00:00
# include <Server/TCPServer.h>
# include <Server/HTTPHandlerFactory.h>
2022-11-09 08:02:04 +00:00
# include "Core/Defines.h"
2022-09-28 13:29:29 +00:00
# include "config.h"
2022-09-28 08:55:05 +00:00
# include "config_version.h"
2021-05-12 10:39:07 +00:00
# if USE_SSL
# include <Poco / Net / Context.h>
# include <Poco / Net / SecureServerSocket.h>
# endif
2021-10-03 09:54:23 +00:00
# include <Server/ProtocolServerAdapter.h>
2021-05-22 23:26:40 +00:00
# include <Server/KeeperTCPHandlerFactory.h>
2021-05-12 10:39:07 +00:00
2021-05-17 09:38:44 +00:00
2021-05-12 10:39:07 +00:00
int mainEntryClickHouseKeeper ( int argc , char * * argv )
{
DB : : Keeper app ;
try
{
return app . run ( argc , argv ) ;
}
catch ( . . . )
{
std : : cerr < < DB : : getCurrentExceptionMessage ( true ) < < " \n " ;
auto code = DB : : getCurrentExceptionCode ( ) ;
return code ? code : 1 ;
}
}
2022-11-10 09:49:00 +00:00
# ifdef KEEPER_STANDALONE_BUILD
void collectCrashLog (
Int32 , UInt64 , const String & , const StackTrace & )
{ }
# endif
2021-05-12 10:39:07 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG ;
extern const int SUPPORT_IS_DISABLED ;
extern const int NETWORK_ERROR ;
2021-05-12 14:05:44 +00:00
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA ;
extern const int FAILED_TO_GETPWUID ;
2022-04-15 08:52:34 +00:00
extern const int LOGICAL_ERROR ;
2021-05-12 10:39:07 +00:00
}
namespace
{
2022-10-07 10:46:45 +00:00
size_t waitServersToFinish ( std : : vector < DB : : ProtocolServerAdapter > & servers , size_t seconds_to_wait )
2021-05-12 10:39:07 +00:00
{
2022-10-07 10:46:45 +00:00
const size_t sleep_max_ms = 1000 * seconds_to_wait ;
const size_t sleep_one_ms = 100 ;
size_t sleep_current_ms = 0 ;
size_t current_connections = 0 ;
2021-05-12 10:39:07 +00:00
for ( ; ; )
{
current_connections = 0 ;
for ( auto & server : servers )
{
server . stop ( ) ;
current_connections + = server . currentConnections ( ) ;
}
if ( ! current_connections )
break ;
sleep_current_ms + = sleep_one_ms ;
if ( sleep_current_ms < sleep_max_ms )
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( sleep_one_ms ) ) ;
else
break ;
}
return current_connections ;
}
Poco : : Net : : SocketAddress makeSocketAddress ( const std : : string & host , UInt16 port , Poco : : Logger * log )
{
Poco : : Net : : SocketAddress socket_address ;
try
{
socket_address = Poco : : Net : : SocketAddress ( host , port ) ;
}
catch ( const Poco : : Net : : DNSException & e )
{
const auto code = e . code ( ) ;
if ( code = = EAI_FAMILY
# if defined(EAI_ADDRFAMILY)
| | code = = EAI_ADDRFAMILY
# endif
)
{
LOG_ERROR ( log , " Cannot resolve listen_host ({}), error {}: {}. "
" If it is an IPv6 address and your host has disabled IPv6, then consider to "
" specify IPv4 address to listen in <listen_host> element of configuration "
" file. Example: <listen_host>0.0.0.0</listen_host> " ,
host , e . code ( ) , e . message ( ) ) ;
}
throw ;
}
return socket_address ;
}
2021-05-12 14:05:44 +00:00
std : : string getUserName ( uid_t user_id )
{
/// Try to convert user id into user name.
auto buffer_size = sysconf ( _SC_GETPW_R_SIZE_MAX ) ;
if ( buffer_size < = 0 )
buffer_size = 1024 ;
std : : string buffer ;
buffer . reserve ( buffer_size ) ;
struct passwd passwd_entry ;
struct passwd * result = nullptr ;
const auto error = getpwuid_r ( user_id , & passwd_entry , buffer . data ( ) , buffer_size , & result ) ;
if ( error )
throwFromErrno ( " Failed to find user name for " + toString ( user_id ) , ErrorCodes : : FAILED_TO_GETPWUID , error ) ;
else if ( result )
return result - > pw_name ;
return toString ( user_id ) ;
}
2021-05-12 10:39:07 +00:00
}
Poco : : Net : : SocketAddress Keeper : : socketBindListen ( Poco : : Net : : ServerSocket & socket , const std : : string & host , UInt16 port , [[maybe_unused]] bool secure ) const
{
auto address = makeSocketAddress ( host , port , & logger ( ) ) ;
socket . bind ( address , /* reuseAddress = */ true , /* reusePort = */ config ( ) . getBool ( " listen_reuse_port " , false ) ) ;
socket . listen ( /* backlog = */ config ( ) . getUInt ( " listen_backlog " , 64 ) ) ;
return address ;
}
void Keeper : : createServer ( const std : : string & listen_host , const char * port_name , bool listen_try , CreateServerFunc & & func ) const
{
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
if ( ! config ( ) . has ( port_name ) )
return ;
auto port = config ( ) . getInt ( port_name ) ;
try
{
func ( port ) ;
}
catch ( const Poco : : Exception & )
{
std : : string message = " Listen [ " + listen_host + " ]: " + std : : to_string ( port ) + " failed: " + getCurrentExceptionMessage ( false ) ;
if ( listen_try )
{
LOG_WARNING ( & logger ( ) , " {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
" specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
" file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . "
" Example for disabled IPv4: <listen_host>::</listen_host> " ,
message ) ;
}
else
{
throw Exception { message , ErrorCodes : : NETWORK_ERROR } ;
}
}
}
void Keeper : : uninitialize ( )
{
logger ( ) . information ( " shutting down " ) ;
BaseDaemon : : uninitialize ( ) ;
}
int Keeper : : run ( )
{
if ( config ( ) . hasOption ( " help " ) )
{
Poco : : Util : : HelpFormatter help_formatter ( Keeper : : options ( ) ) ;
auto header_str = fmt : : format ( " {} [OPTION] [-- [ARG]...] \n "
" positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010 " ,
commandName ( ) ) ;
help_formatter . setHeader ( header_str ) ;
help_formatter . format ( std : : cout ) ;
return 0 ;
}
if ( config ( ) . hasOption ( " version " ) )
{
2021-05-12 13:04:34 +00:00
std : : cout < < DBMS_NAME < < " keeper version " < < VERSION_STRING < < VERSION_OFFICIAL < < " . " < < std : : endl ;
2021-05-12 10:39:07 +00:00
return 0 ;
}
return Application : : run ( ) ; // NOLINT
}
void Keeper : : initialize ( Poco : : Util : : Application & self )
{
BaseDaemon : : initialize ( self ) ;
logger ( ) . information ( " starting up " ) ;
LOG_INFO ( & logger ( ) , " OS Name = {}, OS Version = {}, OS Architecture = {} " ,
Poco : : Environment : : osName ( ) ,
Poco : : Environment : : osVersion ( ) ,
Poco : : Environment : : osArchitecture ( ) ) ;
}
2021-05-12 13:04:34 +00:00
std : : string Keeper : : getDefaultConfigFileName ( ) const
2021-05-12 10:39:07 +00:00
{
2021-05-12 13:04:34 +00:00
return " keeper_config.xml " ;
2021-05-12 10:39:07 +00:00
}
2022-04-15 08:52:34 +00:00
void Keeper : : handleCustomArguments ( const std : : string & arg , [[maybe_unused]] const std : : string & value ) // NOLINT
2022-04-08 07:18:18 +00:00
{
if ( arg = = " force-recovery " )
{
assert ( value . empty ( ) ) ;
2022-04-14 12:07:33 +00:00
config ( ) . setBool ( " keeper_server.force_recovery " , true ) ;
2022-04-08 07:18:18 +00:00
return ;
}
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Invalid argument {} provided " , arg ) ;
}
2021-05-12 10:39:07 +00:00
void Keeper : : defineOptions ( Poco : : Util : : OptionSet & options )
{
options . addOption (
Poco : : Util : : Option ( " help " , " h " , " show help and exit " )
. required ( false )
. repeatable ( false )
. binding ( " help " ) ) ;
options . addOption (
Poco : : Util : : Option ( " version " , " V " , " show version and exit " )
. required ( false )
. repeatable ( false )
. binding ( " version " ) ) ;
2022-04-08 07:18:18 +00:00
options . addOption (
2022-04-14 12:07:33 +00:00
Poco : : Util : : Option ( " force-recovery " , " force-recovery " , " Force recovery mode allowing Keeper to overwrite cluster configuration without quorum " )
2022-04-08 07:18:18 +00:00
. required ( false )
. repeatable ( false )
. noArgument ( )
. callback ( Poco : : Util : : OptionCallback < Keeper > ( this , & Keeper : : handleCustomArguments ) ) ) ;
2021-05-12 10:39:07 +00:00
BaseDaemon : : defineOptions ( options ) ;
}
2022-11-09 08:02:04 +00:00
struct Keeper : : KeeperHTTPContext : public IHTTPContext
{
2022-11-09 14:51:41 +00:00
explicit KeeperHTTPContext ( const TinyContext & context_ )
: context ( context_ )
{ }
2022-11-09 08:02:04 +00:00
uint64_t getMaxHstsAge ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.hsts_max_age " , 0 ) ;
2022-11-09 08:02:04 +00:00
}
uint64_t getMaxUriSize ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_max_uri_size " , 1048576 ) ;
2022-11-09 08:02:04 +00:00
}
uint64_t getMaxFields ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_max_fields " , 1000000 ) ;
2022-11-09 08:02:04 +00:00
}
uint64_t getMaxFieldNameSize ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_max_field_name_size " , 1048576 ) ;
2022-11-09 08:02:04 +00:00
}
uint64_t getMaxFieldValueSize ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_max_field_value_size " , 1048576 ) ;
2022-11-09 08:02:04 +00:00
}
uint64_t getMaxChunkSize ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_max_chunk_size " , 100 _GiB ) ;
2022-11-09 08:02:04 +00:00
}
Poco : : Timespan getReceiveTimeout ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_receive_timeout " , DEFAULT_HTTP_READ_BUFFER_TIMEOUT ) ;
2022-11-09 08:02:04 +00:00
}
Poco : : Timespan getSendTimeout ( ) const override
{
2022-11-09 14:51:41 +00:00
return context . getConfigRef ( ) . getUInt64 ( " keeper_server.http_send_timeout " , DEFAULT_HTTP_READ_BUFFER_TIMEOUT ) ;
2022-11-09 08:02:04 +00:00
}
2022-11-09 14:51:41 +00:00
const TinyContext & context ;
2022-11-09 08:02:04 +00:00
} ;
HTTPContextPtr Keeper : : httpContext ( )
{
2022-11-09 14:51:41 +00:00
return std : : make_shared < KeeperHTTPContext > ( tiny_context ) ;
2022-11-09 08:02:04 +00:00
}
2021-05-12 10:39:07 +00:00
int Keeper : : main ( const std : : vector < std : : string > & /*args*/ )
2022-11-18 12:22:55 +00:00
try
2021-05-12 10:39:07 +00:00
{
Poco : : Logger * log = & logger ( ) ;
UseSSL use_ssl ;
MainThreadStatus : : getInstance ( ) ;
# if !defined(NDEBUG) || !defined(__OPTIMIZE__)
2021-05-12 13:04:34 +00:00
LOG_WARNING ( log , " Keeper was built in debug mode. It will work slowly. " ) ;
2021-05-12 10:39:07 +00:00
# endif
# if defined(SANITIZER)
2021-05-12 13:04:34 +00:00
LOG_WARNING ( log , " Keeper was built with sanitizer. It will work slowly. " ) ;
2021-05-12 10:39:07 +00:00
# endif
2021-05-12 13:04:34 +00:00
if ( ! config ( ) . has ( " keeper_server " ) )
throw Exception ( ErrorCodes : : NO_ELEMENTS_IN_CONFIG , " Keeper configuration (<keeper_server> section) not found in config " ) ;
2021-05-12 14:05:44 +00:00
std : : string path ;
if ( config ( ) . has ( " keeper_server.storage_path " ) )
path = config ( ) . getString ( " keeper_server.storage_path " ) ;
else if ( config ( ) . has ( " keeper_server.log_storage_path " ) )
2021-08-25 16:15:56 +00:00
path = std : : filesystem : : path ( config ( ) . getString ( " keeper_server.log_storage_path " ) ) . parent_path ( ) ;
2021-05-12 14:05:44 +00:00
else if ( config ( ) . has ( " keeper_server.snapshot_storage_path " ) )
2021-08-25 16:15:56 +00:00
path = std : : filesystem : : path ( config ( ) . getString ( " keeper_server.snapshot_storage_path " ) ) . parent_path ( ) ;
2021-05-12 14:05:44 +00:00
else
2021-05-18 14:08:56 +00:00
path = std : : filesystem : : path { KEEPER_DEFAULT_PATH } ;
2021-05-12 14:05:44 +00:00
/// Check that the process user id matches the owner of the data.
const auto effective_user_id = geteuid ( ) ;
struct stat statbuf ;
if ( stat ( path . c_str ( ) , & statbuf ) = = 0 & & effective_user_id ! = statbuf . st_uid )
{
const auto effective_user = getUserName ( effective_user_id ) ;
const auto data_owner = getUserName ( statbuf . st_uid ) ;
std : : string message = " Effective user of the process ( " + effective_user +
" ) does not match the owner of the data ( " + data_owner + " ). " ;
if ( effective_user_id = = 0 )
{
message + = " Run under 'sudo -u " + data_owner + " '. " ;
throw Exception ( message , ErrorCodes : : MISMATCHING_USERS_FOR_PROCESS_AND_DATA ) ;
}
else
{
Use fmt::runtime() for LOG_* for non constexpr
Here is oneliner:
$ gg 'LOG_\(DEBUG\|TRACE\|INFO\|TEST\|WARNING\|ERROR\|FATAL\)([^,]*, [a-zA-Z]' -- :*.cpp :*.h | cut -d: -f1 | sort -u | xargs -r sed -E -i 's#(LOG_[A-Z]*)\(([^,]*), ([A-Za-z][^,)]*)#\1(\2, fmt::runtime(\3)#'
Note, that I tried to do this with coccinelle (tool for semantic
patchin), but it cannot parse C++:
$ cat fmt.cocci
@@
expression log;
expression var;
@@
-LOG_DEBUG(log, var)
+LOG_DEBUG(log, fmt::runtime(var))
I've also tried to use some macros/templates magic to do this implicitly
in logger_useful.h, but I failed to do so, and apparently it is not
possible for now.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
v2: manual fixes
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 09:10:27 +00:00
LOG_WARNING ( log , fmt : : runtime ( message ) ) ;
2021-05-12 14:05:44 +00:00
}
}
2021-08-17 13:24:14 +00:00
DB : : ServerUUID : : load ( path + " /uuid " , log ) ;
2021-08-16 18:30:53 +00:00
2021-10-19 14:29:49 +00:00
std : : string include_from_path = config ( ) . getString ( " include_from " , " /etc/metrika.xml " ) ;
2021-11-12 13:24:47 +00:00
GlobalThreadPool : : initialize (
config ( ) . getUInt ( " max_thread_pool_size " , 100 ) ,
config ( ) . getUInt ( " max_thread_pool_free_size " , 1000 ) ,
config ( ) . getUInt ( " thread_pool_queue_size " , 10000 )
) ;
2021-05-12 10:39:07 +00:00
static ServerErrorHandler error_handler ;
Poco : : ErrorHandler : : set ( & error_handler ) ;
/// Initialize DateLUT early, to not interfere with running time of first query.
LOG_DEBUG ( log , " Initializing DateLUT. " ) ;
DateLUT : : instance ( ) ;
LOG_TRACE ( log , " Initialized DateLUT with time zone '{}'. " , DateLUT : : instance ( ) . getTimeZone ( ) ) ;
2021-05-12 13:04:34 +00:00
/// Don't want to use DNS cache
DNSResolver : : instance ( ) . setDisableCacheFlag ( ) ;
2021-05-12 10:39:07 +00:00
2021-05-12 13:04:34 +00:00
Poco : : ThreadPool server_pool ( 3 , config ( ) . getUInt ( " max_connections " , 1024 ) ) ;
2022-11-09 12:37:42 +00:00
std : : mutex servers_lock ;
auto servers = std : : make_shared < std : : vector < ProtocolServerAdapter > > ( ) ;
/// This object will periodically calculate some metrics.
KeeperAsynchronousMetrics async_metrics (
tiny_context ,
config ( ) . getUInt ( " asynchronous_metrics_update_period_s " , 1 ) ,
[ & ] ( ) - > std : : vector < ProtocolServerMetrics >
{
std : : vector < ProtocolServerMetrics > metrics ;
std : : lock_guard lock ( servers_lock ) ;
metrics . reserve ( servers - > size ( ) ) ;
for ( const auto & server : * servers )
metrics . emplace_back ( ProtocolServerMetrics { server . getPortName ( ) , server . currentThreads ( ) } ) ;
return metrics ;
}
) ;
2021-05-12 10:39:07 +00:00
std : : vector < std : : string > listen_hosts = DB : : getMultipleValuesFromConfig ( config ( ) , " " , " listen_host " ) ;
bool listen_try = config ( ) . getBool ( " listen_try " , false ) ;
if ( listen_hosts . empty ( ) )
{
listen_hosts . emplace_back ( " ::1 " ) ;
listen_hosts . emplace_back ( " 127.0.0.1 " ) ;
listen_try = true ;
}
2021-10-18 09:13:24 +00:00
/// Initialize keeper RAFT. Do nothing if no keeper_server in config.
2022-04-19 12:03:00 +00:00
tiny_context . initializeKeeperDispatcher ( /* start_async = */ true ) ;
2022-03-03 20:27:46 +00:00
FourLetterCommandFactory : : registerCommands ( * tiny_context . getKeeperDispatcher ( ) ) ;
auto config_getter = [ this ] ( ) - > const Poco : : Util : : AbstractConfiguration &
{
return tiny_context . getConfigRef ( ) ;
} ;
2021-10-27 12:26:42 +00:00
2021-05-12 13:04:34 +00:00
for ( const auto & listen_host : listen_hosts )
{
/// TCP Keeper
const char * port_name = " keeper_server.tcp_port " ;
createServer ( listen_host , port_name , listen_try , [ & ] ( UInt16 port )
{
Poco : : Net : : ServerSocket socket ;
auto address = socketBindListen ( socket , listen_host , port ) ;
2022-01-25 13:40:12 +00:00
socket . setReceiveTimeout ( config ( ) . getUInt64 ( " keeper_server.socket_receive_timeout_sec " , DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ) ;
socket . setSendTimeout ( config ( ) . getUInt64 ( " keeper_server.socket_send_timeout_sec " , DBMS_DEFAULT_SEND_TIMEOUT_SEC ) ) ;
2021-05-12 13:04:34 +00:00
servers - > emplace_back (
2021-10-22 07:15:34 +00:00
listen_host ,
2021-05-12 13:04:34 +00:00
port_name ,
2021-10-22 07:15:34 +00:00
" Keeper (tcp): " + address . toString ( ) ,
std : : make_unique < TCPServer > (
2022-03-03 20:27:46 +00:00
new KeeperTCPHandlerFactory (
config_getter , tiny_context . getKeeperDispatcher ( ) ,
config ( ) . getUInt64 ( " keeper_server.socket_receive_timeout_sec " , DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ,
config ( ) . getUInt64 ( " keeper_server.socket_send_timeout_sec " , DBMS_DEFAULT_SEND_TIMEOUT_SEC ) , false ) , server_pool , socket ) ) ;
2021-05-12 13:04:34 +00:00
} ) ;
const char * secure_port_name = " keeper_server.tcp_port_secure " ;
createServer ( listen_host , secure_port_name , listen_try , [ & ] ( UInt16 port )
2021-05-12 10:39:07 +00:00
{
# if USE_SSL
2021-05-12 13:04:34 +00:00
Poco : : Net : : SecureServerSocket socket ;
auto address = socketBindListen ( socket , listen_host , port , /* secure = */ true ) ;
2022-01-25 13:40:12 +00:00
socket . setReceiveTimeout ( config ( ) . getUInt64 ( " keeper_server.socket_receive_timeout_sec " , DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ) ;
socket . setSendTimeout ( config ( ) . getUInt64 ( " keeper_server.socket_send_timeout_sec " , DBMS_DEFAULT_SEND_TIMEOUT_SEC ) ) ;
2021-05-12 13:04:34 +00:00
servers - > emplace_back (
2021-10-22 07:15:34 +00:00
listen_host ,
2021-05-12 13:04:34 +00:00
secure_port_name ,
2021-10-22 07:15:34 +00:00
" Keeper with secure protocol (tcp_secure): " + address . toString ( ) ,
std : : make_unique < TCPServer > (
2022-03-03 20:27:46 +00:00
new KeeperTCPHandlerFactory (
config_getter , tiny_context . getKeeperDispatcher ( ) ,
config ( ) . getUInt64 ( " keeper_server.socket_receive_timeout_sec " , DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ,
config ( ) . getUInt64 ( " keeper_server.socket_send_timeout_sec " , DBMS_DEFAULT_SEND_TIMEOUT_SEC ) , true ) , server_pool , socket ) ) ;
2021-05-12 10:39:07 +00:00
# else
2021-05-12 13:04:34 +00:00
UNUSED ( port ) ;
throw Exception { " SSL support for TCP protocol is disabled because Poco library was built without NetSSL support. " ,
ErrorCodes : : SUPPORT_IS_DISABLED } ;
2021-05-12 10:39:07 +00:00
# endif
2021-05-12 13:04:34 +00:00
} ) ;
2022-11-09 08:02:04 +00:00
const auto & config = config_getter ( ) ;
Poco : : Timespan keep_alive_timeout ( config . getUInt ( " keep_alive_timeout " , 10 ) , 0 ) ;
Poco : : Net : : HTTPServerParams : : Ptr http_params = new Poco : : Net : : HTTPServerParams ;
http_params - > setTimeout ( DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ;
http_params - > setKeepAliveTimeout ( keep_alive_timeout ) ;
/// Prometheus (if defined and not setup yet with http_port)
port_name = " prometheus.port " ;
2022-11-09 12:37:42 +00:00
createServer ( listen_host , port_name , listen_try , [ & ] ( UInt16 port )
2022-11-09 08:02:04 +00:00
{
Poco : : Net : : ServerSocket socket ;
auto address = socketBindListen ( socket , listen_host , port ) ;
// TODO(antonio2368): use config
socket . setReceiveTimeout ( DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC ) ;
socket . setSendTimeout ( DBMS_DEFAULT_SEND_TIMEOUT_SEC ) ;
2022-11-09 12:37:42 +00:00
servers - > emplace_back (
2022-11-09 08:02:04 +00:00
listen_host ,
port_name ,
" Prometheus: http:// " + address . toString ( ) ,
std : : make_unique < HTTPServer > (
2022-11-09 12:37:42 +00:00
httpContext ( ) , createPrometheusMainHandlerFactory ( * this , config_getter ( ) , async_metrics , " PrometheusHandler-factory " ) , server_pool , socket , http_params ) ) ;
2022-11-09 08:02:04 +00:00
} ) ;
2021-05-12 13:04:34 +00:00
}
2021-05-12 10:39:07 +00:00
for ( auto & server : * servers )
2021-10-22 07:15:34 +00:00
{
2021-05-12 10:39:07 +00:00
server . start ( ) ;
2021-10-22 07:15:34 +00:00
LOG_INFO ( log , " Listening for {} " , server . getDescription ( ) ) ;
}
2021-05-12 10:39:07 +00:00
2022-11-09 12:37:42 +00:00
async_metrics . start ( ) ;
2021-10-19 14:29:49 +00:00
zkutil : : EventPtr unused_event = std : : make_shared < Poco : : Event > ( ) ;
zkutil : : ZooKeeperNodeCache unused_cache ( [ ] { return nullptr ; } ) ;
/// ConfigReloader have to strict parameters which are redundant in our case
auto main_config_reloader = std : : make_unique < ConfigReloader > (
config_path ,
include_from_path ,
config ( ) . getString ( " path " , " " ) ,
std : : move ( unused_cache ) ,
unused_event ,
[ & ] ( ConfigurationPtr config , bool /* initial_loading */ )
{
if ( config - > has ( " keeper_server " ) )
2022-03-03 20:27:46 +00:00
tiny_context . updateKeeperConfiguration ( * config ) ;
2021-10-19 14:29:49 +00:00
} ,
/* already_loaded = */ false ) ; /// Reload it right now (initial loading)
2021-05-12 10:39:07 +00:00
SCOPE_EXIT ( {
LOG_INFO ( log , " Shutting down. " ) ;
2021-10-19 14:29:49 +00:00
main_config_reloader . reset ( ) ;
2021-05-12 10:39:07 +00:00
2022-11-09 12:37:42 +00:00
async_metrics . stop ( ) ;
2021-05-18 14:08:56 +00:00
LOG_DEBUG ( log , " Waiting for current connections to Keeper to finish. " ) ;
2022-10-07 10:46:45 +00:00
size_t current_connections = 0 ;
2021-05-12 10:39:07 +00:00
for ( auto & server : * servers )
{
server . stop ( ) ;
current_connections + = server . currentConnections ( ) ;
}
if ( current_connections )
LOG_INFO ( log , " Closed all listening sockets. Waiting for {} outstanding connections. " , current_connections ) ;
else
LOG_INFO ( log , " Closed all listening sockets. " ) ;
if ( current_connections > 0 )
current_connections = waitServersToFinish ( * servers , config ( ) . getInt ( " shutdown_wait_unfinished " , 5 ) ) ;
if ( current_connections )
2021-05-18 14:08:56 +00:00
LOG_INFO ( log , " Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown. " , current_connections ) ;
2021-05-12 10:39:07 +00:00
else
2021-05-18 14:08:56 +00:00
LOG_INFO ( log , " Closed connections to Keeper. " ) ;
2021-05-12 10:39:07 +00:00
2022-03-03 20:27:46 +00:00
tiny_context . shutdownKeeperDispatcher ( ) ;
2021-05-12 10:39:07 +00:00
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool . joinAll ( ) ;
LOG_DEBUG ( log , " Destroyed global context. " ) ;
if ( current_connections )
{
LOG_INFO ( log , " Will shutdown forcefully. " ) ;
2022-03-02 09:59:10 +00:00
safeExit ( 0 ) ;
2021-05-12 10:39:07 +00:00
}
} ) ;
buildLoggers ( config ( ) , logger ( ) ) ;
2021-10-19 14:29:49 +00:00
main_config_reloader - > start ( ) ;
2021-05-12 10:39:07 +00:00
LOG_INFO ( log , " Ready for connections. " ) ;
waitForTerminationRequest ( ) ;
return Application : : EXIT_OK ;
}
2022-11-18 12:22:55 +00:00
catch ( . . . )
{
/// Poco does not provide stacktrace.
tryLogCurrentException ( " Application " ) ;
throw ;
}
2021-05-12 10:39:07 +00:00
2021-05-12 13:04:34 +00:00
void Keeper : : logRevision ( ) const
{
Poco : : Logger : : root ( ) . information ( " Starting ClickHouse Keeper " + std : : string { VERSION_STRING }
2022-09-26 15:04:56 +00:00
+ " (revision : " + std : : to_string ( ClickHouseRevision : : getVersionRevision ( ) )
+ " , git hash: " + ( git_hash . empty ( ) ? " <unknown> " : git_hash )
+ " , build id: " + ( build_id . empty ( ) ? " <unknown> " : build_id ) + " ) "
2021-05-12 13:04:34 +00:00
+ " , PID " + std : : to_string ( getpid ( ) ) ) ;
}
2021-05-12 10:39:07 +00:00
}