Add SYSTEM SUSPEND command #15979

This commit is contained in:
Alexey Milovidov 2021-01-07 22:19:33 +03:00
parent 80d88a7b17
commit ffa5bbd5fd
6 changed files with 67 additions and 3 deletions

View File

@ -6,6 +6,7 @@
#include <Common/SymbolIndex.h> #include <Common/SymbolIndex.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/ShellCommand.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/ExternalDictionariesLoader.h> #include <Interpreters/ExternalDictionariesLoader.h>
@ -221,21 +222,40 @@ BlockIO InterpreterSystemQuery::execute()
switch (query.type) switch (query.type)
{ {
case Type::SHUTDOWN: case Type::SHUTDOWN:
{
context.checkAccess(AccessType::SYSTEM_SHUTDOWN); context.checkAccess(AccessType::SYSTEM_SHUTDOWN);
if (kill(0, SIGTERM)) if (kill(0, SIGTERM))
throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL); throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL);
break; break;
}
case Type::KILL: case Type::KILL:
{
context.checkAccess(AccessType::SYSTEM_SHUTDOWN); context.checkAccess(AccessType::SYSTEM_SHUTDOWN);
if (kill(0, SIGKILL)) if (kill(0, SIGKILL))
throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL); throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL);
break; break;
}
case Type::SUSPEND:
{
auto command = fmt::format("kill -STOP {0} && sleep {1} && kill -CONT {0}", getpid(), query.seconds);
LOG_DEBUG(log, "Will run {}", command);
auto res = ShellCommand::execute(command);
res->in.close();
WriteBufferFromOwnString out;
copyData(res->out, out);
copyData(res->err, out);
if (!out.str().empty())
LOG_DEBUG(log, "The command returned output: {}", command, out.str());
break;
}
case Type::DROP_DNS_CACHE: case Type::DROP_DNS_CACHE:
{
context.checkAccess(AccessType::SYSTEM_DROP_DNS_CACHE); context.checkAccess(AccessType::SYSTEM_DROP_DNS_CACHE);
DNSResolver::instance().dropCache(); DNSResolver::instance().dropCache();
/// Reinitialize clusters to update their resolved_addresses /// Reinitialize clusters to update their resolved_addresses
system_context.reloadClusterConfig(); system_context.reloadClusterConfig();
break; break;
}
case Type::DROP_MARK_CACHE: case Type::DROP_MARK_CACHE:
context.checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE); context.checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context.dropMarkCache(); system_context.dropMarkCache();
@ -251,12 +271,15 @@ BlockIO InterpreterSystemQuery::execute()
break; break;
#endif #endif
case Type::RELOAD_DICTIONARY: case Type::RELOAD_DICTIONARY:
{
context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
system_context.getExternalDictionariesLoader().loadOrReload( system_context.getExternalDictionariesLoader().loadOrReload(
DatabaseCatalog::instance().resolveDictionaryName(query.target_dictionary)); DatabaseCatalog::instance().resolveDictionaryName(query.target_dictionary));
ExternalDictionariesLoader::resetAll(); ExternalDictionariesLoader::resetAll();
break; break;
}
case Type::RELOAD_DICTIONARIES: case Type::RELOAD_DICTIONARIES:
{
context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
executeCommandsAndThrowIfError( executeCommandsAndThrowIfError(
[&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); }, [&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); },
@ -264,6 +287,7 @@ BlockIO InterpreterSystemQuery::execute()
); );
ExternalDictionariesLoader::resetAll(); ExternalDictionariesLoader::resetAll();
break; break;
}
case Type::RELOAD_EMBEDDED_DICTIONARIES: case Type::RELOAD_EMBEDDED_DICTIONARIES:
context.checkAccess(AccessType::SYSTEM_RELOAD_EMBEDDED_DICTIONARIES); context.checkAccess(AccessType::SYSTEM_RELOAD_EMBEDDED_DICTIONARIES);
system_context.getEmbeddedDictionaries().reload(); system_context.getEmbeddedDictionaries().reload();
@ -273,6 +297,7 @@ BlockIO InterpreterSystemQuery::execute()
system_context.reloadConfig(); system_context.reloadConfig();
break; break;
case Type::RELOAD_SYMBOLS: case Type::RELOAD_SYMBOLS:
{
#if defined(__ELF__) && !defined(__FreeBSD__) #if defined(__ELF__) && !defined(__FreeBSD__)
context.checkAccess(AccessType::SYSTEM_RELOAD_SYMBOLS); context.checkAccess(AccessType::SYSTEM_RELOAD_SYMBOLS);
(void)SymbolIndex::instance(true); (void)SymbolIndex::instance(true);
@ -280,6 +305,7 @@ BlockIO InterpreterSystemQuery::execute()
#else #else
throw Exception("SYSTEM RELOAD SYMBOLS is not supported on current platform", ErrorCodes::NOT_IMPLEMENTED); throw Exception("SYSTEM RELOAD SYMBOLS is not supported on current platform", ErrorCodes::NOT_IMPLEMENTED);
#endif #endif
}
case Type::STOP_MERGES: case Type::STOP_MERGES:
startStopAction(ActionLocks::PartsMerge, false); startStopAction(ActionLocks::PartsMerge, false);
break; break;
@ -340,6 +366,7 @@ BlockIO InterpreterSystemQuery::execute()
ErrorCodes::BAD_ARGUMENTS); ErrorCodes::BAD_ARGUMENTS);
break; break;
case Type::FLUSH_LOGS: case Type::FLUSH_LOGS:
{
context.checkAccess(AccessType::SYSTEM_FLUSH_LOGS); context.checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
executeCommandsAndThrowIfError( executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(true); }, [&] () { if (auto query_log = context.getQueryLog()) query_log->flush(true); },
@ -352,6 +379,7 @@ BlockIO InterpreterSystemQuery::execute()
[&] () { if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } [&] () { if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
); );
break; break;
}
case Type::STOP_LISTEN_QUERIES: case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES: case Type::START_LISTEN_QUERIES:
throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED); throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED);
@ -586,7 +614,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
switch (query.type) switch (query.type)
{ {
case Type::SHUTDOWN: [[fallthrough]]; case Type::SHUTDOWN: [[fallthrough]];
case Type::KILL: case Type::KILL: [[fallthrough]];
case Type::SUSPEND:
{ {
required_access.emplace_back(AccessType::SYSTEM_SHUTDOWN); required_access.emplace_back(AccessType::SYSTEM_SHUTDOWN);
break; break;

View File

@ -22,6 +22,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "SHUTDOWN"; return "SHUTDOWN";
case Type::KILL: case Type::KILL:
return "KILL"; return "KILL";
case Type::SUSPEND:
return "SUSPEND";
case Type::DROP_DNS_CACHE: case Type::DROP_DNS_CACHE:
return "DROP DNS CACHE"; return "DROP DNS CACHE";
case Type::DROP_MARK_CACHE: case Type::DROP_MARK_CACHE:
@ -146,7 +148,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
auto print_on_volume = [&] auto print_on_volume = [&]
{ {
settings.ostr << " ON VOLUME " settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON VOLUME "
<< (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(storage_policy) << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(storage_policy)
<< (settings.hilite ? hilite_none : "") << (settings.hilite ? hilite_none : "")
<< "." << "."
@ -182,9 +184,20 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
print_database_table(); print_database_table();
} }
else if (type == Type::RELOAD_DICTIONARY) else if (type == Type::RELOAD_DICTIONARY)
{
print_database_dictionary(); print_database_dictionary();
}
else if (type == Type::DROP_REPLICA) else if (type == Type::DROP_REPLICA)
{
print_drop_replica(); print_drop_replica();
}
else if (type == Type::SUSPEND)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FOR "
<< (settings.hilite ? hilite_none : "") << seconds
<< (settings.hilite ? hilite_keyword : "") << " SECOND"
<< (settings.hilite ? hilite_none : "");
}
} }

View File

@ -20,6 +20,7 @@ public:
UNKNOWN, UNKNOWN,
SHUTDOWN, SHUTDOWN,
KILL, KILL,
SUSPEND,
DROP_DNS_CACHE, DROP_DNS_CACHE,
DROP_MARK_CACHE, DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE, DROP_UNCOMPRESSED_CACHE,
@ -65,9 +66,10 @@ public:
String table; String table;
String replica; String replica;
String replica_zk_path; String replica_zk_path;
bool is_drop_whole_replica; bool is_drop_whole_replica{};
String storage_policy; String storage_policy;
String volume; String volume;
UInt64 seconds{};
String getID(char) const override { return "SYSTEM query"; } String getID(char) const override { return "SYSTEM query"; }

View File

@ -169,6 +169,20 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
parseDatabaseAndTableName(pos, expected, res->database, res->table); parseDatabaseAndTableName(pos, expected, res->database, res->table);
break; break;
case Type::SUSPEND:
{
ASTPtr seconds;
if (!(ParserKeyword{"FOR"}.ignore(pos, expected)
&& ParserUnsignedInteger().parse(pos, seconds, expected)
&& ParserKeyword{"SECOND"}.ignore(pos, expected))) /// SECOND, not SECONDS to be consistent with INTERVAL parsing in SQL
{
return false;
}
res->seconds = seconds->as<ASTLiteral>()->value.get<UInt64>();
break;
}
default: default:
/// There are no [db.table] after COMMAND NAME /// There are no [db.table] after COMMAND NAME
break; break;

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,5 @@
CREATE TEMPORARY TABLE t (x DateTime);
INSERT INTO t VALUES (now());
SYSTEM SUSPEND FOR 1 SECOND;
INSERT INTO t VALUES (now());
SELECT max(x) - min(x) >= 1 FROM t;