diff --git a/base/loggers/CMakeLists.txt b/base/loggers/CMakeLists.txt index 22be002e069..81eecd2e2e7 100644 --- a/base/loggers/CMakeLists.txt +++ b/base/loggers/CMakeLists.txt @@ -1,5 +1,13 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake") add_headers_and_sources(loggers .) + +# Standart version depends on DBMS and works with text log add_library(loggers ${loggers_sources} ${loggers_headers}) +target_compile_definitions (loggers PRIVATE -DWITH_TEXT_LOG) target_link_libraries(loggers PRIVATE dbms clickhouse_common_io) target_include_directories(loggers PUBLIC ..) + +# Lightweight version doesn't work with textlog and also doesn't depend on DBMS +add_library(loggers_no_text_log ${loggers_sources} ${loggers_headers}) +target_link_libraries(loggers_no_text_log PRIVATE clickhouse_common_io) +target_include_directories(loggers PUBLIC ..) diff --git a/base/loggers/Loggers.cpp b/base/loggers/Loggers.cpp index 2f2eadea28f..7c627ad2272 100644 --- a/base/loggers/Loggers.cpp +++ b/base/loggers/Loggers.cpp @@ -9,7 +9,11 @@ #include #include #include -#include + +#ifdef WITH_TEXT_LOG + #include +#endif + #include namespace fs = std::filesystem; @@ -30,17 +34,21 @@ static std::string createDirectory(const std::string & file) return path; }; +#ifdef WITH_TEXT_LOG void Loggers::setTextLog(std::shared_ptr log, int max_priority) { text_log = log; text_log_max_priority = max_priority; } +#endif void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger /*_root*/, const std::string & cmd_name) { +#ifdef WITH_TEXT_LOG if (split) if (auto log = text_log.lock()) split->addTextLog(log, text_log_max_priority); +#endif auto current_logger = config.getString("logger", ""); if (config_logger == current_logger) //-V1051 diff --git a/base/loggers/Loggers.h b/base/loggers/Loggers.h index a859c32fa89..22b2b5e2c69 100644 --- a/base/loggers/Loggers.h +++ b/base/loggers/Loggers.h @@ -7,10 +7,12 @@ #include #include "OwnSplitChannel.h" +#ifdef WITH_TEXT_LOG namespace DB { class TextLog; } +#endif namespace Poco::Util { @@ -27,7 +29,9 @@ public: /// Close log files. On next log write files will be reopened. void closeLogs(Poco::Logger & logger); +#ifdef WITH_TEXT_LOG void setTextLog(std::shared_ptr log, int max_priority); +#endif private: Poco::AutoPtr log_file; @@ -37,8 +41,10 @@ private: /// Previous value of logger element in config. It is used to reinitialize loggers whenever the value changed. std::string config_logger; +#ifdef WITH_TEXT_LOG std::weak_ptr text_log; int text_log_max_priority = -1; +#endif Poco::AutoPtr split; }; diff --git a/base/loggers/OwnSplitChannel.cpp b/base/loggers/OwnSplitChannel.cpp index 2267b8f425d..b255d89f124 100644 --- a/base/loggers/OwnSplitChannel.cpp +++ b/base/loggers/OwnSplitChannel.cpp @@ -20,10 +20,13 @@ namespace DB { void OwnSplitChannel::log(const Poco::Message & msg) { + +#ifdef WITH_TEXT_LOG auto logs_queue = CurrentThread::getInternalTextLogsQueue(); if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority)) return; +#endif if (auto * masker = SensitiveDataMasker::getInstance()) { @@ -86,6 +89,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg) channel.first->log(msg); // ordinary child } +#ifdef WITH_TEXT_LOG auto logs_queue = CurrentThread::getInternalTextLogsQueue(); /// Log to "TCP queue" if message is not too noisy @@ -137,6 +141,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg) if (text_log_locked) text_log_locked->add(elem); } +#endif } @@ -145,12 +150,14 @@ void OwnSplitChannel::addChannel(Poco::AutoPtr channel, const std channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast(channel.get()))); } +#ifdef WITH_TEXT_LOG void OwnSplitChannel::addTextLog(std::shared_ptr log, int max_priority) { std::lock_guard lock(text_log_mutex); text_log = log; text_log_max_priority.store(max_priority, std::memory_order_relaxed); } +#endif void OwnSplitChannel::setLevel(const std::string & name, int level) { diff --git a/base/loggers/OwnSplitChannel.h b/base/loggers/OwnSplitChannel.h index 364a6346ede..72027f66afd 100644 --- a/base/loggers/OwnSplitChannel.h +++ b/base/loggers/OwnSplitChannel.h @@ -7,10 +7,12 @@ #include #include "ExtendedLogChannel.h" +#ifdef WITH_TEXT_LOG namespace DB { class TextLog; } +#endif namespace DB { @@ -25,7 +27,9 @@ public: /// Adds a child channel void addChannel(Poco::AutoPtr channel, const std::string & name); +#ifdef WITH_TEXT_LOG void addTextLog(std::shared_ptr log, int max_priority); +#endif void setLevel(const std::string & name, int level); @@ -40,8 +44,10 @@ private: std::mutex text_log_mutex; +#ifdef WITH_TEXT_LOG std::weak_ptr text_log; std::atomic text_log_max_priority = -1; +#endif }; } diff --git a/programs/keeper/CMakeLists.txt b/programs/keeper/CMakeLists.txt index 5a50a7074d3..3ab9c905179 100644 --- a/programs/keeper/CMakeLists.txt +++ b/programs/keeper/CMakeLists.txt @@ -28,3 +28,93 @@ clickhouse_embed_binaries( RESOURCES keeper_config.xml keeper_embedded.xml ) add_dependencies(clickhouse-keeper-lib clickhouse_keeper_configs) + +set(CLICKHOUSE_KEEPER_STANDALONE_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/ACLMap.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/Changelog.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/CoordinationSettings.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/FourLetterCommand.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/InMemoryLogStore.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperConnectionStats.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperDispatcher.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperLogStore.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperServer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManager.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateMachine.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateManager.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStorage.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/pathUtils.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SessionExpiryQueue.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SummingStateMachine.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/WriteBufferFromNuraftBuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/ZooKeeperDataReader.cpp + + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/SettingsFields.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/BaseSettings.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/Field.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/SettingsEnums.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/ServerUUID.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/UUID.cpp + + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/KeeperTCPHandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/TCPServer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/ProtocolServerAdapter.cpp + + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CachedCompressedReadBuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CheckingCompressedReadBuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferBase.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferFromFile.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedWriteBuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDelta.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDoubleDelta.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecEncrypted.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecGorilla.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecLZ4.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecMultiple.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecNone.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecT64.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecZSTD.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionFactory.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/getCompressionCodecForFile.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/ICompressionCodec.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/LZ4_decompress_faster.cpp + + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/IKeeper.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/TestKeeper.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperCommon.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperConstants.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeper.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperImpl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperIO.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperLock.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperNodeCache.cpp + + ${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/BaseDaemon.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/SentryWriter.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/GraphiteWriter.cpp + + Keeper.cpp + TinyContext.cpp + clickhouse-keeper.cpp + +) + +add_executable(clickhouse-keeper-standalone ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES}) + +target_compile_definitions (clickhouse-keeper-standalone PRIVATE -DENABLE_SENTRY=0 -DENABLE_GRPC=0) + +target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src" ) + +target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/../../src/Core/include") # uses some includes from core + +target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/../../src") # uses some includes from common + +target_link_libraries(clickhouse-keeper-standalone PUBLIC + ch_contrib::abseil_swiss_tables ch_contrib::nuraft ch_contrib::lz4 ch_contrib::zstd ch_contrib::cityhash + common ch_contrib::double_conversion ch_contrib::dragonbox_to_chars pcg_random ch_contrib::pdqsort + ch_contrib::miniselect clickhouse_common_config_no_zookeeper_log loggers_no_text_log clickhouse_common_io clickhouse_parsers + ${LINK_RESOURCE_LIB}) + +add_dependencies(clickhouse-keeper-standalone clickhouse_keeper_configs) +set_target_properties(clickhouse-keeper-standalone PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index fd317f88912..1d9bbef58a5 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -270,16 +270,9 @@ int Keeper::main(const std::vector & /*args*/) LOG_WARNING(log, "Keeper was built with sanitizer. It will work slowly."); #endif - auto shared_context = Context::createShared(); - global_context = Context::createGlobal(shared_context.get()); - - global_context->makeGlobalContext(); - global_context->setApplicationType(Context::ApplicationType::KEEPER); - if (!config().has("keeper_server")) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Keeper configuration ( section) not found in config"); - std::string path; if (config().has("keeper_server.storage_path")) @@ -348,8 +341,13 @@ int Keeper::main(const std::vector & /*args*/) auto servers = std::make_shared>(); /// Initialize keeper RAFT. Do nothing if no keeper_server in config. - global_context->initializeKeeperDispatcher(/* start_async = */false); - FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher()); + tiny_context.initializeKeeperDispatcher(/* start_async = */false); + FourLetterCommandFactory::registerCommands(*tiny_context.getKeeperDispatcher()); + + auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration & + { + return tiny_context.getConfigRef(); + }; for (const auto & listen_host : listen_hosts) { @@ -366,7 +364,10 @@ int Keeper::main(const std::vector & /*args*/) port_name, "Keeper (tcp): " + address.toString(), std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); + new KeeperTCPHandlerFactory( + config_getter, tiny_context.getKeeperDispatcher(), + config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), + config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), false), server_pool, socket)); }); const char * secure_port_name = "keeper_server.tcp_port_secure"; @@ -382,7 +383,10 @@ int Keeper::main(const std::vector & /*args*/) secure_port_name, "Keeper with secure protocol (tcp_secure): " + address.toString(), std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); + new KeeperTCPHandlerFactory( + config_getter, tiny_context.getKeeperDispatcher(), + config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), + config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), true), server_pool, socket)); #else UNUSED(port); throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", @@ -409,18 +413,14 @@ int Keeper::main(const std::vector & /*args*/) [&](ConfigurationPtr config, bool /* initial_loading */) { if (config->has("keeper_server")) - global_context->updateKeeperConfiguration(*config); + tiny_context.updateKeeperConfiguration(*config); }, /* already_loaded = */ false); /// Reload it right now (initial loading) SCOPE_EXIT({ LOG_INFO(log, "Shutting down."); - /// Stop reloading of the main config. This must be done before `global_context->shutdown()` because - /// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart. main_config_reloader.reset(); - global_context->shutdown(); - LOG_DEBUG(log, "Waiting for current connections to Keeper to finish."); int current_connections = 0; for (auto & server : *servers) @@ -442,17 +442,11 @@ int Keeper::main(const std::vector & /*args*/) else LOG_INFO(log, "Closed connections to Keeper."); - global_context->shutdownKeeperDispatcher(); + tiny_context.shutdownKeeperDispatcher(); /// Wait server pool to avoid use-after-free of destroyed context in the handlers server_pool.joinAll(); - /** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available. - * At this moment, no one could own shared part of Context. - */ - global_context.reset(); - shared_context.reset(); - LOG_DEBUG(log, "Destroyed global context."); if (current_connections) diff --git a/programs/keeper/Keeper.h b/programs/keeper/Keeper.h index f5b97dacf7d..5b8fbadd0a2 100644 --- a/programs/keeper/Keeper.h +++ b/programs/keeper/Keeper.h @@ -2,6 +2,7 @@ #include #include +#include "TinyContext.h" namespace Poco { @@ -17,27 +18,22 @@ namespace DB /// standalone clickhouse-keeper server (replacement for ZooKeeper). Uses the same /// config as clickhouse-server. Serves requests on TCP ports with or without /// SSL using ZooKeeper protocol. -class Keeper : public BaseDaemon, public IServer +class Keeper : public BaseDaemon { public: using ServerApplication::run; - Poco::Util::LayeredConfiguration & config() const override + Poco::Util::LayeredConfiguration & config() const { return BaseDaemon::config(); } - Poco::Logger & logger() const override + Poco::Logger & logger() const { return BaseDaemon::logger(); } - ContextMutablePtr context() const override - { - return global_context; - } - - bool isCancelled() const override + bool isCancelled() const { return BaseDaemon::isCancelled(); } @@ -58,7 +54,7 @@ protected: std::string getDefaultConfigFileName() const override; private: - ContextMutablePtr global_context; + TinyContext tiny_context; Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const; diff --git a/programs/keeper/TinyContext.cpp b/programs/keeper/TinyContext.cpp new file mode 100644 index 00000000000..386fb1e0c1d --- /dev/null +++ b/programs/keeper/TinyContext.cpp @@ -0,0 +1,71 @@ +#include "TinyContext.h" + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +void TinyContext::setConfig(const ConfigurationPtr & config_) +{ + std::lock_guard lock(keeper_dispatcher_mutex); + config = config_; +} + +const Poco::Util::AbstractConfiguration & TinyContext::getConfigRef() const +{ + std::lock_guard lock(keeper_dispatcher_mutex); + return config ? *config : Poco::Util::Application::instance().config(); +} + + +void TinyContext::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const +{ + const auto & config_ref = getConfigRef(); + + std::lock_guard lock(keeper_dispatcher_mutex); + + if (keeper_dispatcher) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times"); + + if (config_ref.has("keeper_server")) + { + keeper_dispatcher = std::make_shared(); + keeper_dispatcher->initialize(config_ref, true, start_async); + } +} + +std::shared_ptr TinyContext::getKeeperDispatcher() const +{ + std::lock_guard lock(keeper_dispatcher_mutex); + if (!keeper_dispatcher) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests"); + + return keeper_dispatcher; +} + +void TinyContext::shutdownKeeperDispatcher() const +{ + std::lock_guard lock(keeper_dispatcher_mutex); + if (keeper_dispatcher) + { + keeper_dispatcher->shutdown(); + keeper_dispatcher.reset(); + } +} + +void TinyContext::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::AbstractConfiguration & config_) +{ + std::lock_guard lock(keeper_dispatcher_mutex); + if (!keeper_dispatcher) + return; + + keeper_dispatcher->updateConfiguration(config_); +} + +} diff --git a/programs/keeper/TinyContext.h b/programs/keeper/TinyContext.h new file mode 100644 index 00000000000..a53a6d0377d --- /dev/null +++ b/programs/keeper/TinyContext.h @@ -0,0 +1,32 @@ +#pragma once +#include +#include + +#include + +namespace DB +{ + +class KeeperDispatcher; + +class TinyContext: public std::enable_shared_from_this +{ +public: + std::shared_ptr getKeeperDispatcher() const; + void initializeKeeperDispatcher(bool start_async) const; + void shutdownKeeperDispatcher() const; + void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config); + + using ConfigurationPtr = Poco::AutoPtr; + + void setConfig(const ConfigurationPtr & config); + const Poco::Util::AbstractConfiguration & getConfigRef() const; + +private: + mutable std::mutex keeper_dispatcher_mutex; + mutable std::shared_ptr keeper_dispatcher; + + ConfigurationPtr config; +}; + +} diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 50c53488187..239713452c4 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -995,6 +995,11 @@ if (ThreadFuzzer::instance().isEffective()) global_context->initializeKeeperDispatcher(can_initialize_keeper_async); FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher()); + auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration & + { + return global_context->getConfigRef(); + }; + for (const auto & listen_host : listen_hosts) { /// TCP Keeper @@ -1013,7 +1018,11 @@ if (ThreadFuzzer::instance().isEffective()) port_name, "Keeper (tcp): " + address.toString(), std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); + new KeeperTCPHandlerFactory( + config_getter, global_context->getKeeperDispatcher(), + global_context->getSettingsRef().receive_timeout, + global_context->getSettingsRef().send_timeout, + false), server_pool, socket)); }); const char * secure_port_name = "keeper_server.tcp_port_secure"; @@ -1032,7 +1041,10 @@ if (ThreadFuzzer::instance().isEffective()) secure_port_name, "Keeper with secure protocol (tcp_secure): " + address.toString(), std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); + new KeeperTCPHandlerFactory( + config_getter, global_context->getKeeperDispatcher(), + global_context->getSettingsRef().receive_timeout, + global_context->getSettingsRef().send_timeout, true), server_pool, socket)); #else UNUSED(port); throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 07964c29577..a9d2dacc50e 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -202,25 +202,30 @@ struct SocketInterruptablePollWrapper #endif }; -KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_) +KeeperTCPHandler::KeeperTCPHandler( + const Poco::Util::AbstractConfiguration & config_ref, + std::shared_ptr keeper_dispatcher_, + Poco::Timespan receive_timeout_, + Poco::Timespan send_timeout_, + const Poco::Net::StreamSocket & socket_) : Poco::Net::TCPServerConnection(socket_) - , server(server_) , log(&Poco::Logger::get("KeeperTCPHandler")) - , global_context(Context::createCopy(server.context())) - , keeper_dispatcher(global_context->getKeeperDispatcher()) + , keeper_dispatcher(keeper_dispatcher_) , operation_timeout( 0, - global_context->getConfigRef().getUInt( + config_ref.getUInt( "keeper_server.coordination_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , min_session_timeout( 0, - global_context->getConfigRef().getUInt( + config_ref.getUInt( "keeper_server.coordination_settings.min_session_timeout_ms", Coordination::DEFAULT_MIN_SESSION_TIMEOUT_MS) * 1000) , max_session_timeout( 0, - global_context->getConfigRef().getUInt( + config_ref.getUInt( "keeper_server.coordination_settings.session_timeout_ms", Coordination::DEFAULT_MAX_SESSION_TIMEOUT_MS) * 1000) , poll_wrapper(std::make_unique(socket_)) + , send_timeout(send_timeout_) + , receive_timeout(receive_timeout_) , responses(std::make_unique(std::numeric_limits::max())) , last_op(std::make_unique(EMPTY_LAST_OP)) { @@ -289,11 +294,9 @@ void KeeperTCPHandler::runImpl() { setThreadName("KeeperHandler"); ThreadStatus thread_status; - auto global_receive_timeout = global_context->getSettingsRef().receive_timeout; - auto global_send_timeout = global_context->getSettingsRef().send_timeout; - socket().setReceiveTimeout(global_receive_timeout); - socket().setSendTimeout(global_send_timeout); + socket().setReceiveTimeout(receive_timeout); + socket().setSendTimeout(send_timeout); socket().setNoDelay(true); in = std::make_shared(socket()); diff --git a/src/Server/KeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h index 7953dfd2cbe..3f388abeabb 100644 --- a/src/Server/KeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -48,7 +48,12 @@ private: static std::unordered_set connections; public: - KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_); + KeeperTCPHandler( + const Poco::Util::AbstractConfiguration & config_ref, + std::shared_ptr keeper_dispatcher_, + Poco::Timespan receive_timeout_, + Poco::Timespan send_timeout_, + const Poco::Net::StreamSocket & socket_); void run() override; KeeperConnectionStats getConnectionStats() const; @@ -58,9 +63,7 @@ public: ~KeeperTCPHandler() override; private: - IServer & server; Poco::Logger * log; - ContextPtr global_context; std::shared_ptr keeper_dispatcher; Poco::Timespan operation_timeout; Poco::Timespan min_session_timeout; @@ -69,6 +72,8 @@ private: int64_t session_id{-1}; Stopwatch session_stopwatch; SocketInterruptablePollWrapperPtr poll_wrapper; + Poco::Timespan send_timeout; + Poco::Timespan receive_timeout; ThreadSafeResponseQueuePtr responses; diff --git a/src/Server/KeeperTCPHandlerFactory.h b/src/Server/KeeperTCPHandlerFactory.h index 58dc73d7c27..76309ffc119 100644 --- a/src/Server/KeeperTCPHandlerFactory.h +++ b/src/Server/KeeperTCPHandlerFactory.h @@ -10,11 +10,17 @@ namespace DB { +using ConfigGetter = std::function; + class KeeperTCPHandlerFactory : public TCPServerConnectionFactory { private: - IServer & server; + ConfigGetter config_getter; + std::shared_ptr keeper_dispatcher; Poco::Logger * log; + Poco::Timespan receive_timeout; + Poco::Timespan send_timeout; + class DummyTCPHandler : public Poco::Net::TCPServerConnection { public: @@ -23,9 +29,17 @@ private: }; public: - KeeperTCPHandlerFactory(IServer & server_, bool secure) - : server(server_) + KeeperTCPHandlerFactory( + ConfigGetter config_getter_, + std::shared_ptr keeper_dispatcher_, + Poco::Timespan receive_timeout_, + Poco::Timespan send_timeout_, + bool secure) + : config_getter(config_getter_) + , keeper_dispatcher(keeper_dispatcher_) , log(&Poco::Logger::get(std::string{"KeeperTCP"} + (secure ? "S" : "") + "HandlerFactory")) + , receive_timeout(receive_timeout_) + , send_timeout(send_timeout_) { } @@ -34,7 +48,7 @@ public: try { LOG_TRACE(log, "Keeper request. Address: {}", socket.peerAddress().toString()); - return new KeeperTCPHandler(server, socket); + return new KeeperTCPHandler(config_getter(), keeper_dispatcher, receive_timeout, send_timeout, socket); } catch (const Poco::Net::NetException &) {