mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Allow clickhouse-keeper to serve 4-letter commands before quorum (#35992)
* Use async start for clichkouse-keeper * Return non active server for 4lw commands if quorum not achieved
This commit is contained in:
parent
89d0c30f38
commit
3e77340a81
@ -341,7 +341,7 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
|
||||
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
|
||||
|
||||
/// Initialize keeper RAFT. Do nothing if no keeper_server in config.
|
||||
tiny_context.initializeKeeperDispatcher(/* start_async = */false);
|
||||
tiny_context.initializeKeeperDispatcher(/* start_async = */ true);
|
||||
FourLetterCommandFactory::registerCommands(*tiny_context.getKeeperDispatcher());
|
||||
|
||||
auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration &
|
||||
|
@ -29,8 +29,8 @@ struct Settings;
|
||||
M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \
|
||||
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
|
||||
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
|
||||
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
|
||||
M(Milliseconds, startup_timeout, 180000, "How many time we will until RAFT to start", 0) \
|
||||
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
|
||||
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
|
||||
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
|
||||
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
|
||||
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
|
||||
|
@ -44,6 +44,11 @@ int32_t IFourLetterCommand::toCode(const String & name)
|
||||
return __builtin_bswap32(res);
|
||||
}
|
||||
|
||||
bool IFourLetterCommand::serverIsActive() const
|
||||
{
|
||||
return keeper_dispatcher.hasLeader();
|
||||
}
|
||||
|
||||
IFourLetterCommand::~IFourLetterCommand() = default;
|
||||
|
||||
FourLetterCommandFactory & FourLetterCommandFactory::instance()
|
||||
@ -198,6 +203,8 @@ void print(IFourLetterCommand::StringBuffer & buf, const String & key, uint64_t
|
||||
print(buf, key, toString(value));
|
||||
}
|
||||
|
||||
constexpr auto * SERVER_NOT_ACTIVE_MSG = "This instance is not currently serving requests";
|
||||
|
||||
}
|
||||
|
||||
String MonitorCommand::run()
|
||||
@ -205,11 +212,11 @@ String MonitorCommand::run()
|
||||
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
|
||||
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
|
||||
|
||||
if (!keeper_info.has_leader)
|
||||
return "This instance is not currently serving requests";
|
||||
|
||||
const auto & state_machine = keeper_dispatcher.getStateMachine();
|
||||
|
||||
if (!keeper_info.has_leader)
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer ret;
|
||||
print(ret, "version", String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH);
|
||||
|
||||
@ -247,6 +254,9 @@ String MonitorCommand::run()
|
||||
|
||||
String StatResetCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
keeper_dispatcher.resetConnectionStats();
|
||||
return "Server stats reset.\n";
|
||||
}
|
||||
@ -258,6 +268,9 @@ String NopCommand::run()
|
||||
|
||||
String ConfCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
keeper_dispatcher.getKeeperConfigurationAndSettings()->dump(buf);
|
||||
return buf.str();
|
||||
@ -265,6 +278,9 @@ String ConfCommand::run()
|
||||
|
||||
String ConsCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
KeeperTCPHandler::dumpConnections(buf, false);
|
||||
return buf.str();
|
||||
@ -272,12 +288,18 @@ String ConsCommand::run()
|
||||
|
||||
String RestConnStatsCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
KeeperTCPHandler::resetConnsStats();
|
||||
return "Connection stats reset.\n";
|
||||
}
|
||||
|
||||
String ServerStatCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
|
||||
auto write = [&buf](const String & key, const String & value)
|
||||
@ -310,6 +332,9 @@ String ServerStatCommand::run()
|
||||
|
||||
String StatCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
|
||||
auto write = [&buf] (const String & key, const String & value) { buf << key << ": " << value << '\n'; };
|
||||
@ -340,6 +365,9 @@ String StatCommand::run()
|
||||
|
||||
String BriefWatchCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
const auto & state_machine = keeper_dispatcher.getStateMachine();
|
||||
buf << state_machine.getSessionsWithWatchesCount() << " connections watching "
|
||||
@ -350,6 +378,9 @@ String BriefWatchCommand::run()
|
||||
|
||||
String WatchCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
const auto & state_machine = keeper_dispatcher.getStateMachine();
|
||||
state_machine.dumpWatches(buf);
|
||||
@ -358,6 +389,9 @@ String WatchCommand::run()
|
||||
|
||||
String WatchByPathCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
const auto & state_machine = keeper_dispatcher.getStateMachine();
|
||||
state_machine.dumpWatchesByPath(buf);
|
||||
@ -366,6 +400,9 @@ String WatchByPathCommand::run()
|
||||
|
||||
String DataSizeCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
buf << "snapshot_dir_size: " << keeper_dispatcher.getSnapDirSize() << '\n';
|
||||
buf << "log_dir_size: " << keeper_dispatcher.getLogDirSize() << '\n';
|
||||
@ -374,6 +411,9 @@ String DataSizeCommand::run()
|
||||
|
||||
String DumpCommand::run()
|
||||
{
|
||||
if (!serverIsActive())
|
||||
return SERVER_NOT_ACTIVE_MSG;
|
||||
|
||||
StringBuffer buf;
|
||||
const auto & state_machine = keeper_dispatcher.getStateMachine();
|
||||
state_machine.dumpSessionsAndEphemerals(buf);
|
||||
|
@ -32,6 +32,9 @@ public:
|
||||
static String toName(int32_t code);
|
||||
static inline int32_t toCode(const String & name);
|
||||
|
||||
// Return true if server is running and serving requests
|
||||
bool serverIsActive() const;
|
||||
|
||||
protected:
|
||||
KeeperDispatcher & keeper_dispatcher;
|
||||
};
|
||||
|
@ -345,7 +345,11 @@ void KeeperTCPHandler::runImpl()
|
||||
return;
|
||||
}
|
||||
|
||||
if (keeper_dispatcher->checkInit() && keeper_dispatcher->hasLeader())
|
||||
// we store the checks because they can change during the execution
|
||||
// leading to weird results
|
||||
const auto is_initialized = keeper_dispatcher->checkInit();
|
||||
const auto has_leader = keeper_dispatcher->hasLeader();
|
||||
if (is_initialized && has_leader)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -366,9 +370,9 @@ void KeeperTCPHandler::runImpl()
|
||||
else
|
||||
{
|
||||
String reason;
|
||||
if (!keeper_dispatcher->checkInit() && !keeper_dispatcher->hasLeader())
|
||||
if (!is_initialized && !has_leader)
|
||||
reason = "server is not initialized yet and no alive leader exists";
|
||||
else if (!keeper_dispatcher->checkInit())
|
||||
else if (!is_initialized)
|
||||
reason = "server is not initialized yet";
|
||||
else
|
||||
reason = "no alive leader exists";
|
||||
|
Loading…
Reference in New Issue
Block a user