From ffa5bbd5fdf88a9dda6c715542173fdccb7c6e7d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 7 Jan 2021 22:19:33 +0300 Subject: [PATCH] Add SYSTEM SUSPEND command #15979 --- src/Interpreters/InterpreterSystemQuery.cpp | 31 ++++++++++++++++++- src/Parsers/ASTSystemQuery.cpp | 15 ++++++++- src/Parsers/ASTSystemQuery.h | 4 ++- src/Parsers/ParserSystemQuery.cpp | 14 +++++++++ .../01643_system_suspend.reference | 1 + .../0_stateless/01643_system_suspend.sql | 5 +++ 6 files changed, 67 insertions(+), 3 deletions(-) create mode 100644 tests/queries/0_stateless/01643_system_suspend.reference create mode 100644 tests/queries/0_stateless/01643_system_suspend.sql diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index fd36f3a6fd6..2a9c4cdb259 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -221,21 +222,40 @@ BlockIO InterpreterSystemQuery::execute() switch (query.type) { case Type::SHUTDOWN: + { context.checkAccess(AccessType::SYSTEM_SHUTDOWN); if (kill(0, SIGTERM)) throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL); break; + } case Type::KILL: + { context.checkAccess(AccessType::SYSTEM_SHUTDOWN); if (kill(0, SIGKILL)) throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL); break; + } + case Type::SUSPEND: + { + auto command = fmt::format("kill -STOP {0} && sleep {1} && kill -CONT {0}", getpid(), query.seconds); + LOG_DEBUG(log, "Will run {}", command); + auto res = ShellCommand::execute(command); + res->in.close(); + WriteBufferFromOwnString out; + copyData(res->out, out); + copyData(res->err, out); + if (!out.str().empty()) + LOG_DEBUG(log, "The command returned output: {}", command, out.str()); + break; + } case Type::DROP_DNS_CACHE: + { context.checkAccess(AccessType::SYSTEM_DROP_DNS_CACHE); DNSResolver::instance().dropCache(); /// Reinitialize clusters to update their resolved_addresses system_context.reloadClusterConfig(); break; + } case Type::DROP_MARK_CACHE: context.checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE); system_context.dropMarkCache(); @@ -251,12 +271,15 @@ BlockIO InterpreterSystemQuery::execute() break; #endif case Type::RELOAD_DICTIONARY: + { context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); system_context.getExternalDictionariesLoader().loadOrReload( DatabaseCatalog::instance().resolveDictionaryName(query.target_dictionary)); ExternalDictionariesLoader::resetAll(); break; + } case Type::RELOAD_DICTIONARIES: + { context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); executeCommandsAndThrowIfError( [&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); }, @@ -264,6 +287,7 @@ BlockIO InterpreterSystemQuery::execute() ); ExternalDictionariesLoader::resetAll(); break; + } case Type::RELOAD_EMBEDDED_DICTIONARIES: context.checkAccess(AccessType::SYSTEM_RELOAD_EMBEDDED_DICTIONARIES); system_context.getEmbeddedDictionaries().reload(); @@ -273,6 +297,7 @@ BlockIO InterpreterSystemQuery::execute() system_context.reloadConfig(); break; case Type::RELOAD_SYMBOLS: + { #if defined(__ELF__) && !defined(__FreeBSD__) context.checkAccess(AccessType::SYSTEM_RELOAD_SYMBOLS); (void)SymbolIndex::instance(true); @@ -280,6 +305,7 @@ BlockIO InterpreterSystemQuery::execute() #else throw Exception("SYSTEM RELOAD SYMBOLS is not supported on current platform", ErrorCodes::NOT_IMPLEMENTED); #endif + } case Type::STOP_MERGES: startStopAction(ActionLocks::PartsMerge, false); break; @@ -340,6 +366,7 @@ BlockIO InterpreterSystemQuery::execute() ErrorCodes::BAD_ARGUMENTS); break; case Type::FLUSH_LOGS: + { context.checkAccess(AccessType::SYSTEM_FLUSH_LOGS); executeCommandsAndThrowIfError( [&] () { if (auto query_log = context.getQueryLog()) query_log->flush(true); }, @@ -352,6 +379,7 @@ BlockIO InterpreterSystemQuery::execute() [&] () { if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); } ); break; + } case Type::STOP_LISTEN_QUERIES: case Type::START_LISTEN_QUERIES: throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED); @@ -586,7 +614,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() switch (query.type) { case Type::SHUTDOWN: [[fallthrough]]; - case Type::KILL: + case Type::KILL: [[fallthrough]]; + case Type::SUSPEND: { required_access.emplace_back(AccessType::SYSTEM_SHUTDOWN); break; diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 0d6e15a3d8c..f3a43d7f3fd 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -22,6 +22,8 @@ const char * ASTSystemQuery::typeToString(Type type) return "SHUTDOWN"; case Type::KILL: return "KILL"; + case Type::SUSPEND: + return "SUSPEND"; case Type::DROP_DNS_CACHE: return "DROP DNS CACHE"; case Type::DROP_MARK_CACHE: @@ -146,7 +148,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, auto print_on_volume = [&] { - settings.ostr << " ON VOLUME " + settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON VOLUME " << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(storage_policy) << (settings.hilite ? hilite_none : "") << "." @@ -182,9 +184,20 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, print_database_table(); } else if (type == Type::RELOAD_DICTIONARY) + { print_database_dictionary(); + } else if (type == Type::DROP_REPLICA) + { print_drop_replica(); + } + else if (type == Type::SUSPEND) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " FOR " + << (settings.hilite ? hilite_none : "") << seconds + << (settings.hilite ? hilite_keyword : "") << " SECOND" + << (settings.hilite ? hilite_none : ""); + } } diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 756b5b52600..ad7eb664659 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -20,6 +20,7 @@ public: UNKNOWN, SHUTDOWN, KILL, + SUSPEND, DROP_DNS_CACHE, DROP_MARK_CACHE, DROP_UNCOMPRESSED_CACHE, @@ -65,9 +66,10 @@ public: String table; String replica; String replica_zk_path; - bool is_drop_whole_replica; + bool is_drop_whole_replica{}; String storage_policy; String volume; + UInt64 seconds{}; String getID(char) const override { return "SYSTEM query"; } diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index b6a90b348a0..491037da9a9 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -169,6 +169,20 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & parseDatabaseAndTableName(pos, expected, res->database, res->table); break; + case Type::SUSPEND: + { + ASTPtr seconds; + if (!(ParserKeyword{"FOR"}.ignore(pos, expected) + && ParserUnsignedInteger().parse(pos, seconds, expected) + && ParserKeyword{"SECOND"}.ignore(pos, expected))) /// SECOND, not SECONDS to be consistent with INTERVAL parsing in SQL + { + return false; + } + + res->seconds = seconds->as()->value.get(); + break; + } + default: /// There are no [db.table] after COMMAND NAME break; diff --git a/tests/queries/0_stateless/01643_system_suspend.reference b/tests/queries/0_stateless/01643_system_suspend.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/01643_system_suspend.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/01643_system_suspend.sql b/tests/queries/0_stateless/01643_system_suspend.sql new file mode 100644 index 00000000000..c2cd37e6156 --- /dev/null +++ b/tests/queries/0_stateless/01643_system_suspend.sql @@ -0,0 +1,5 @@ +CREATE TEMPORARY TABLE t (x DateTime); +INSERT INTO t VALUES (now()); +SYSTEM SUSPEND FOR 1 SECOND; +INSERT INTO t VALUES (now()); +SELECT max(x) - min(x) >= 1 FROM t;