Building small keeper binary

This commit is contained in:
alesapin 2022-03-03 21:27:46 +01:00
parent 6bbea953d6
commit 0eb7d28192
14 changed files with 306 additions and 54 deletions

View File

@ -1,5 +1,13 @@
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(loggers .)
# Standart version depends on DBMS and works with text log
add_library(loggers ${loggers_sources} ${loggers_headers})
target_compile_definitions (loggers PRIVATE -DWITH_TEXT_LOG)
target_link_libraries(loggers PRIVATE dbms clickhouse_common_io)
target_include_directories(loggers PUBLIC ..)
# Lightweight version doesn't work with textlog and also doesn't depend on DBMS
add_library(loggers_no_text_log ${loggers_sources} ${loggers_headers})
target_link_libraries(loggers_no_text_log PRIVATE clickhouse_common_io)
target_include_directories(loggers PUBLIC ..)

View File

@ -9,7 +9,11 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/Net/RemoteSyslogChannel.h>
#ifdef WITH_TEXT_LOG
#include <Interpreters/TextLog.h>
#endif
#include <filesystem>
namespace fs = std::filesystem;
@ -30,17 +34,21 @@ static std::string createDirectory(const std::string & file)
return path;
};
#ifdef WITH_TEXT_LOG
void Loggers::setTextLog(std::shared_ptr<DB::TextLog> log, int max_priority)
{
text_log = log;
text_log_max_priority = max_priority;
}
#endif
void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger /*_root*/, const std::string & cmd_name)
{
#ifdef WITH_TEXT_LOG
if (split)
if (auto log = text_log.lock())
split->addTextLog(log, text_log_max_priority);
#endif
auto current_logger = config.getString("logger", "");
if (config_logger == current_logger) //-V1051

View File

@ -7,10 +7,12 @@
#include <Poco/Util/Application.h>
#include "OwnSplitChannel.h"
#ifdef WITH_TEXT_LOG
namespace DB
{
class TextLog;
}
#endif
namespace Poco::Util
{
@ -27,7 +29,9 @@ public:
/// Close log files. On next log write files will be reopened.
void closeLogs(Poco::Logger & logger);
#ifdef WITH_TEXT_LOG
void setTextLog(std::shared_ptr<DB::TextLog> log, int max_priority);
#endif
private:
Poco::AutoPtr<Poco::FileChannel> log_file;
@ -37,8 +41,10 @@ private:
/// Previous value of logger element in config. It is used to reinitialize loggers whenever the value changed.
std::string config_logger;
#ifdef WITH_TEXT_LOG
std::weak_ptr<DB::TextLog> text_log;
int text_log_max_priority = -1;
#endif
Poco::AutoPtr<DB::OwnSplitChannel> split;
};

View File

@ -20,10 +20,13 @@ namespace DB
{
void OwnSplitChannel::log(const Poco::Message & msg)
{
#ifdef WITH_TEXT_LOG
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
return;
#endif
if (auto * masker = SensitiveDataMasker::getInstance())
{
@ -86,6 +89,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
channel.first->log(msg); // ordinary child
}
#ifdef WITH_TEXT_LOG
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
/// Log to "TCP queue" if message is not too noisy
@ -137,6 +141,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
if (text_log_locked)
text_log_locked->add(elem);
}
#endif
}
@ -145,12 +150,14 @@ void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel, const std
channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get())));
}
#ifdef WITH_TEXT_LOG
void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority)
{
std::lock_guard<std::mutex> lock(text_log_mutex);
text_log = log;
text_log_max_priority.store(max_priority, std::memory_order_relaxed);
}
#endif
void OwnSplitChannel::setLevel(const std::string & name, int level)
{

View File

@ -7,10 +7,12 @@
#include <Poco/Channel.h>
#include "ExtendedLogChannel.h"
#ifdef WITH_TEXT_LOG
namespace DB
{
class TextLog;
}
#endif
namespace DB
{
@ -25,7 +27,9 @@ public:
/// Adds a child channel
void addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name);
#ifdef WITH_TEXT_LOG
void addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority);
#endif
void setLevel(const std::string & name, int level);
@ -40,8 +44,10 @@ private:
std::mutex text_log_mutex;
#ifdef WITH_TEXT_LOG
std::weak_ptr<DB::TextLog> text_log;
std::atomic<int> text_log_max_priority = -1;
#endif
};
}

View File

@ -28,3 +28,93 @@ clickhouse_embed_binaries(
RESOURCES keeper_config.xml keeper_embedded.xml
)
add_dependencies(clickhouse-keeper-lib clickhouse_keeper_configs)
set(CLICKHOUSE_KEEPER_STANDALONE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/ACLMap.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/Changelog.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/CoordinationSettings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/FourLetterCommand.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/InMemoryLogStore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperConnectionStats.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperDispatcher.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperLogStore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperServer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManager.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateMachine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateManager.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/pathUtils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SessionExpiryQueue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/SummingStateMachine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/WriteBufferFromNuraftBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/ZooKeeperDataReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/SettingsFields.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/BaseSettings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/Field.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/SettingsEnums.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/ServerUUID.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Core/UUID.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/KeeperTCPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/TCPServer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/ProtocolServerAdapter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CachedCompressedReadBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CheckingCompressedReadBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferBase.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferFromFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedWriteBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDoubleDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecEncrypted.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecGorilla.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecLZ4.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecMultiple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecNone.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecT64.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecZSTD.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/getCompressionCodecForFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/ICompressionCodec.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/LZ4_decompress_faster.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/IKeeper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/TestKeeper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperCommon.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperConstants.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperImpl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperIO.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperLock.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Common/ZooKeeper/ZooKeeperNodeCache.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/BaseDaemon.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/SentryWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../base/daemon/GraphiteWriter.cpp
Keeper.cpp
TinyContext.cpp
clickhouse-keeper.cpp
)
add_executable(clickhouse-keeper-standalone ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
target_compile_definitions (clickhouse-keeper-standalone PRIVATE -DENABLE_SENTRY=0 -DENABLE_GRPC=0)
target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src" )
target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/../../src/Core/include") # uses some includes from core
target_include_directories(clickhouse-keeper-standalone PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/../../src") # uses some includes from common
target_link_libraries(clickhouse-keeper-standalone PUBLIC
ch_contrib::abseil_swiss_tables ch_contrib::nuraft ch_contrib::lz4 ch_contrib::zstd ch_contrib::cityhash
common ch_contrib::double_conversion ch_contrib::dragonbox_to_chars pcg_random ch_contrib::pdqsort
ch_contrib::miniselect clickhouse_common_config_no_zookeeper_log loggers_no_text_log clickhouse_common_io clickhouse_parsers
${LINK_RESOURCE_LIB})
add_dependencies(clickhouse-keeper-standalone clickhouse_keeper_configs)
set_target_properties(clickhouse-keeper-standalone PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)

View File

@ -270,16 +270,9 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
LOG_WARNING(log, "Keeper was built with sanitizer. It will work slowly.");
#endif
auto shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::KEEPER);
if (!config().has("keeper_server"))
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Keeper configuration (<keeper_server> section) not found in config");
std::string path;
if (config().has("keeper_server.storage_path"))
@ -348,8 +341,13 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
/// Initialize keeper RAFT. Do nothing if no keeper_server in config.
global_context->initializeKeeperDispatcher(/* start_async = */false);
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
tiny_context.initializeKeeperDispatcher(/* start_async = */false);
FourLetterCommandFactory::registerCommands(*tiny_context.getKeeperDispatcher());
auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration &
{
return tiny_context.getConfigRef();
};
for (const auto & listen_host : listen_hosts)
{
@ -366,7 +364,10 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
port_name,
"Keeper (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(*this, false), server_pool, socket));
new KeeperTCPHandlerFactory(
config_getter, tiny_context.getKeeperDispatcher(),
config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC),
config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), false), server_pool, socket));
});
const char * secure_port_name = "keeper_server.tcp_port_secure";
@ -382,7 +383,10 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
secure_port_name,
"Keeper with secure protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(*this, true), server_pool, socket));
new KeeperTCPHandlerFactory(
config_getter, tiny_context.getKeeperDispatcher(),
config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC),
config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), true), server_pool, socket));
#else
UNUSED(port);
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
@ -409,18 +413,14 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
[&](ConfigurationPtr config, bool /* initial_loading */)
{
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
tiny_context.updateKeeperConfiguration(*config);
},
/* already_loaded = */ false); /// Reload it right now (initial loading)
SCOPE_EXIT({
LOG_INFO(log, "Shutting down.");
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
main_config_reloader.reset();
global_context->shutdown();
LOG_DEBUG(log, "Waiting for current connections to Keeper to finish.");
int current_connections = 0;
for (auto & server : *servers)
@ -442,17 +442,11 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
else
LOG_INFO(log, "Closed connections to Keeper.");
global_context->shutdownKeeperDispatcher();
tiny_context.shutdownKeeperDispatcher();
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
global_context.reset();
shared_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
if (current_connections)

View File

@ -2,6 +2,7 @@
#include <Server/IServer.h>
#include <daemon/BaseDaemon.h>
#include "TinyContext.h"
namespace Poco
{
@ -17,27 +18,22 @@ namespace DB
/// standalone clickhouse-keeper server (replacement for ZooKeeper). Uses the same
/// config as clickhouse-server. Serves requests on TCP ports with or without
/// SSL using ZooKeeper protocol.
class Keeper : public BaseDaemon, public IServer
class Keeper : public BaseDaemon
{
public:
using ServerApplication::run;
Poco::Util::LayeredConfiguration & config() const override
Poco::Util::LayeredConfiguration & config() const
{
return BaseDaemon::config();
}
Poco::Logger & logger() const override
Poco::Logger & logger() const
{
return BaseDaemon::logger();
}
ContextMutablePtr context() const override
{
return global_context;
}
bool isCancelled() const override
bool isCancelled() const
{
return BaseDaemon::isCancelled();
}
@ -58,7 +54,7 @@ protected:
std::string getDefaultConfigFileName() const override;
private:
ContextMutablePtr global_context;
TinyContext tiny_context;
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;

View File

@ -0,0 +1,71 @@
#include "TinyContext.h"
#include <Common/Exception.h>
#include <Coordination/KeeperDispatcher.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void TinyContext::setConfig(const ConfigurationPtr & config_)
{
std::lock_guard lock(keeper_dispatcher_mutex);
config = config_;
}
const Poco::Util::AbstractConfiguration & TinyContext::getConfigRef() const
{
std::lock_guard lock(keeper_dispatcher_mutex);
return config ? *config : Poco::Util::Application::instance().config();
}
void TinyContext::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const
{
const auto & config_ref = getConfigRef();
std::lock_guard lock(keeper_dispatcher_mutex);
if (keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
if (config_ref.has("keeper_server"))
{
keeper_dispatcher = std::make_shared<KeeperDispatcher>();
keeper_dispatcher->initialize(config_ref, true, start_async);
}
}
std::shared_ptr<KeeperDispatcher> TinyContext::getKeeperDispatcher() const
{
std::lock_guard lock(keeper_dispatcher_mutex);
if (!keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests");
return keeper_dispatcher;
}
void TinyContext::shutdownKeeperDispatcher() const
{
std::lock_guard lock(keeper_dispatcher_mutex);
if (keeper_dispatcher)
{
keeper_dispatcher->shutdown();
keeper_dispatcher.reset();
}
}
void TinyContext::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::AbstractConfiguration & config_)
{
std::lock_guard lock(keeper_dispatcher_mutex);
if (!keeper_dispatcher)
return;
keeper_dispatcher->updateConfiguration(config_);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <memory>
#include <mutex>
#include <Poco/Util/Application.h>
namespace DB
{
class KeeperDispatcher;
class TinyContext: public std::enable_shared_from_this<TinyContext>
{
public:
std::shared_ptr<KeeperDispatcher> getKeeperDispatcher() const;
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
void setConfig(const ConfigurationPtr & config);
const Poco::Util::AbstractConfiguration & getConfigRef() const;
private:
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
ConfigurationPtr config;
};
}

View File

@ -995,6 +995,11 @@ if (ThreadFuzzer::instance().isEffective())
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration &
{
return global_context->getConfigRef();
};
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper
@ -1013,7 +1018,11 @@ if (ThreadFuzzer::instance().isEffective())
port_name,
"Keeper (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(*this, false), server_pool, socket));
new KeeperTCPHandlerFactory(
config_getter, global_context->getKeeperDispatcher(),
global_context->getSettingsRef().receive_timeout,
global_context->getSettingsRef().send_timeout,
false), server_pool, socket));
});
const char * secure_port_name = "keeper_server.tcp_port_secure";
@ -1032,7 +1041,10 @@ if (ThreadFuzzer::instance().isEffective())
secure_port_name,
"Keeper with secure protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(*this, true), server_pool, socket));
new KeeperTCPHandlerFactory(
config_getter, global_context->getKeeperDispatcher(),
global_context->getSettingsRef().receive_timeout,
global_context->getSettingsRef().send_timeout, true), server_pool, socket));
#else
UNUSED(port);
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",

View File

@ -202,25 +202,30 @@ struct SocketInterruptablePollWrapper
#endif
};
KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
KeeperTCPHandler::KeeperTCPHandler(
const Poco::Util::AbstractConfiguration & config_ref,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher_,
Poco::Timespan receive_timeout_,
Poco::Timespan send_timeout_,
const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("KeeperTCPHandler"))
, global_context(Context::createCopy(server.context()))
, keeper_dispatcher(global_context->getKeeperDispatcher())
, keeper_dispatcher(keeper_dispatcher_)
, operation_timeout(
0,
global_context->getConfigRef().getUInt(
config_ref.getUInt(
"keeper_server.coordination_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, min_session_timeout(
0,
global_context->getConfigRef().getUInt(
config_ref.getUInt(
"keeper_server.coordination_settings.min_session_timeout_ms", Coordination::DEFAULT_MIN_SESSION_TIMEOUT_MS) * 1000)
, max_session_timeout(
0,
global_context->getConfigRef().getUInt(
config_ref.getUInt(
"keeper_server.coordination_settings.session_timeout_ms", Coordination::DEFAULT_MAX_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, responses(std::make_unique<ThreadSafeResponseQueue>(std::numeric_limits<size_t>::max()))
, last_op(std::make_unique<LastOp>(EMPTY_LAST_OP))
{
@ -289,11 +294,9 @@ void KeeperTCPHandler::runImpl()
{
setThreadName("KeeperHandler");
ThreadStatus thread_status;
auto global_receive_timeout = global_context->getSettingsRef().receive_timeout;
auto global_send_timeout = global_context->getSettingsRef().send_timeout;
socket().setReceiveTimeout(global_receive_timeout);
socket().setSendTimeout(global_send_timeout);
socket().setReceiveTimeout(receive_timeout);
socket().setSendTimeout(send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());

View File

@ -48,7 +48,12 @@ private:
static std::unordered_set<KeeperTCPHandler *> connections;
public:
KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
KeeperTCPHandler(
const Poco::Util::AbstractConfiguration & config_ref,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher_,
Poco::Timespan receive_timeout_,
Poco::Timespan send_timeout_,
const Poco::Net::StreamSocket & socket_);
void run() override;
KeeperConnectionStats getConnectionStats() const;
@ -58,9 +63,7 @@ public:
~KeeperTCPHandler() override;
private:
IServer & server;
Poco::Logger * log;
ContextPtr global_context;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan min_session_timeout;
@ -69,6 +72,8 @@ private:
int64_t session_id{-1};
Stopwatch session_stopwatch;
SocketInterruptablePollWrapperPtr poll_wrapper;
Poco::Timespan send_timeout;
Poco::Timespan receive_timeout;
ThreadSafeResponseQueuePtr responses;

View File

@ -10,11 +10,17 @@
namespace DB
{
using ConfigGetter = std::function<const Poco::Util::AbstractConfiguration & ()>;
class KeeperTCPHandlerFactory : public TCPServerConnectionFactory
{
private:
IServer & server;
ConfigGetter config_getter;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
Poco::Logger * log;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
public:
@ -23,9 +29,17 @@ private:
};
public:
KeeperTCPHandlerFactory(IServer & server_, bool secure)
: server(server_)
KeeperTCPHandlerFactory(
ConfigGetter config_getter_,
std::shared_ptr<KeeperDispatcher> keeper_dispatcher_,
Poco::Timespan receive_timeout_,
Poco::Timespan send_timeout_,
bool secure)
: config_getter(config_getter_)
, keeper_dispatcher(keeper_dispatcher_)
, log(&Poco::Logger::get(std::string{"KeeperTCP"} + (secure ? "S" : "") + "HandlerFactory"))
, receive_timeout(receive_timeout_)
, send_timeout(send_timeout_)
{
}
@ -34,7 +48,7 @@ public:
try
{
LOG_TRACE(log, "Keeper request. Address: {}", socket.peerAddress().toString());
return new KeeperTCPHandler(server, socket);
return new KeeperTCPHandler(config_getter(), keeper_dispatcher, receive_timeout, send_timeout, socket);
}
catch (const Poco::Net::NetException &)
{