Add parmeters to SYSTEM queries

This commit is contained in:
Nikolay Degterinsky 2021-10-12 21:11:00 +03:00
parent 2421cf29de
commit 2b7e1e2949
4 changed files with 104 additions and 48 deletions

View File

@ -219,12 +219,12 @@ BlockIO InterpreterSystemQuery::execute()
/// Make canonical query for simpler processing /// Make canonical query for simpler processing
if (query.type == Type::RELOAD_DICTIONARY) if (query.type == Type::RELOAD_DICTIONARY)
{ {
if (!query.database.empty()) if (!query.getDatabase().empty())
query.table = query.database + "." + query.table; query.setTable(query.getDatabase() + "." + query.getTable());
} }
else if (!query.table.empty()) else if (!query.getTable().empty())
{ {
table_id = getContext()->resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary); table_id = getContext()->resolveStorageID(StorageID(query.getDatabase(), query.getTable()), Context::ResolveOrdinary);
} }
@ -296,7 +296,7 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader(); auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader();
external_dictionaries_loader.reloadDictionary(query.table, getContext()); external_dictionaries_loader.reloadDictionary(query.getTable(), getContext());
ExternalDictionariesLoader::resetAll(); ExternalDictionariesLoader::resetAll();
break; break;
@ -588,10 +588,10 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
if (!dropReplicaImpl(query, table)) if (!dropReplicaImpl(query, table))
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs()); throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
} }
else if (!query.database.empty()) else if (!query.getDatabase().empty())
{ {
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.database); getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.database); DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.getDatabase());
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next()) for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
dropReplicaImpl(query, iterator->table()); dropReplicaImpl(query, iterator->table());
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName())); LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
@ -782,84 +782,84 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::STOP_MERGES: [[fallthrough]]; case Type::STOP_MERGES: [[fallthrough]];
case Type::START_MERGES: case Type::START_MERGES:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_MERGES); required_access.emplace_back(AccessType::SYSTEM_MERGES);
else else
required_access.emplace_back(AccessType::SYSTEM_MERGES, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_MERGES, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_TTL_MERGES: [[fallthrough]]; case Type::STOP_TTL_MERGES: [[fallthrough]];
case Type::START_TTL_MERGES: case Type::START_TTL_MERGES:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES); required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES);
else else
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_MOVES: [[fallthrough]]; case Type::STOP_MOVES: [[fallthrough]];
case Type::START_MOVES: case Type::START_MOVES:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_MOVES); required_access.emplace_back(AccessType::SYSTEM_MOVES);
else else
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_FETCHES: [[fallthrough]]; case Type::STOP_FETCHES: [[fallthrough]];
case Type::START_FETCHES: case Type::START_FETCHES:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_FETCHES); required_access.emplace_back(AccessType::SYSTEM_FETCHES);
else else
required_access.emplace_back(AccessType::SYSTEM_FETCHES, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_FETCHES, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_DISTRIBUTED_SENDS: [[fallthrough]]; case Type::STOP_DISTRIBUTED_SENDS: [[fallthrough]];
case Type::START_DISTRIBUTED_SENDS: case Type::START_DISTRIBUTED_SENDS:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS); required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS);
else else
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_REPLICATED_SENDS: [[fallthrough]]; case Type::STOP_REPLICATED_SENDS: [[fallthrough]];
case Type::START_REPLICATED_SENDS: case Type::START_REPLICATED_SENDS:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS); required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS);
else else
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS, query.getDatabase(), query.getTable());
break; break;
} }
case Type::STOP_REPLICATION_QUEUES: [[fallthrough]]; case Type::STOP_REPLICATION_QUEUES: [[fallthrough]];
case Type::START_REPLICATION_QUEUES: case Type::START_REPLICATION_QUEUES:
{ {
if (query.table.empty()) if (query.getTable().empty())
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES); required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES);
else else
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.getDatabase(), query.getTable());
break; break;
} }
case Type::DROP_REPLICA: case Type::DROP_REPLICA:
{ {
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase(), query.getTable());
break; break;
} }
case Type::RESTORE_REPLICA: case Type::RESTORE_REPLICA:
{ {
required_access.emplace_back(AccessType::SYSTEM_RESTORE_REPLICA, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_RESTORE_REPLICA, query.getDatabase(), query.getTable());
break; break;
} }
case Type::SYNC_REPLICA: case Type::SYNC_REPLICA:
{ {
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.getDatabase(), query.getTable());
break; break;
} }
case Type::RESTART_REPLICA: case Type::RESTART_REPLICA:
{ {
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.getDatabase(), query.getTable());
break; break;
} }
case Type::RESTART_REPLICAS: case Type::RESTART_REPLICAS:
@ -869,7 +869,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
} }
case Type::FLUSH_DISTRIBUTED: case Type::FLUSH_DISTRIBUTED:
{ {
required_access.emplace_back(AccessType::SYSTEM_FLUSH_DISTRIBUTED, query.database, query.table); required_access.emplace_back(AccessType::SYSTEM_FLUSH_DISTRIBUTED, query.getDatabase(), query.getTable());
break; break;
} }
case Type::FLUSH_LOGS: case Type::FLUSH_LOGS:

View File

@ -1,3 +1,4 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <Parsers/ASTSystemQuery.h> #include <Parsers/ASTSystemQuery.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
@ -39,6 +40,36 @@ const char * ASTSystemQuery::typeToString(Type type)
return type_name.data(); return type_name.data();
} }
String ASTSystemQuery::getDatabase() const
{
String name;
tryGetIdentifierNameInto(database, name);
return name;
}
String ASTSystemQuery::getTable() const
{
String name;
tryGetIdentifierNameInto(table, name);
return name;
}
void ASTSystemQuery::setDatabase(const String & name)
{
if (name.empty())
database.reset();
else
database = std::make_shared<ASTIdentifier>(name);
}
void ASTSystemQuery::setTable(const String & name)
{
if (name.empty())
table.reset();
else
table = std::make_shared<ASTIdentifier>(name);
}
void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const
{ {
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SYSTEM "; settings.ostr << (settings.hilite ? hilite_keyword : "") << "SYSTEM ";
@ -47,19 +78,19 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
auto print_database_table = [&] auto print_database_table = [&]
{ {
settings.ostr << " "; settings.ostr << " ";
if (!database.empty()) if (!getDatabase().empty())
{ {
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database) settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getDatabase())
<< (settings.hilite ? hilite_none : "") << "."; << (settings.hilite ? hilite_none : "") << ".";
} }
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(table) settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getTable())
<< (settings.hilite ? hilite_none : ""); << (settings.hilite ? hilite_none : "");
}; };
auto print_drop_replica = [&] auto print_drop_replica = [&]
{ {
settings.ostr << " " << quoteString(replica); settings.ostr << " " << quoteString(replica);
if (!table.empty()) if (!getTable().empty())
{ {
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM TABLE" settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM TABLE"
<< (settings.hilite ? hilite_none : ""); << (settings.hilite ? hilite_none : "");
@ -70,11 +101,11 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM ZKPATH " settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM ZKPATH "
<< (settings.hilite ? hilite_none : "") << quoteString(replica_zk_path); << (settings.hilite ? hilite_none : "") << quoteString(replica_zk_path);
} }
else if (!database.empty()) else if (!getDatabase().empty())
{ {
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM DATABASE " settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM DATABASE "
<< (settings.hilite ? hilite_none : ""); << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database) settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getDatabase())
<< (settings.hilite ? hilite_none : ""); << (settings.hilite ? hilite_none : "");
} }
}; };
@ -107,7 +138,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_DISTRIBUTED_SENDS || type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS) || type == Type::START_DISTRIBUTED_SENDS)
{ {
if (!table.empty()) if (!getTable().empty())
print_database_table(); print_database_table();
else if (!volume.empty()) else if (!volume.empty())
print_on_volume(); print_on_volume();

View File

@ -70,10 +70,17 @@ public:
Type type = Type::UNKNOWN; Type type = Type::UNKNOWN;
ASTPtr database;
ASTPtr table;
String getDatabase() const;
String getTable() const;
void setDatabase(const String & name);
void setTable(const String & name);
String target_model; String target_model;
String target_function; String target_function;
String database;
String table;
String replica; String replica;
String replica_zk_path; String replica_zk_path;
bool is_drop_whole_replica{}; bool is_drop_whole_replica{};
@ -84,11 +91,20 @@ public:
String getID(char) const override { return "SYSTEM query"; } String getID(char) const override { return "SYSTEM query"; }
ASTPtr clone() const override { return std::make_shared<ASTSystemQuery>(*this); } ASTPtr clone() const override
{
auto res = std::make_shared<ASTSystemQuery>(*this);
res->children.clear();
if (database) { res->database = database->clone(); res->children.push_back(res->database); }
if (table) { res->table = table->clone(); res->children.push_back(res->table); }
return res;
}
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
{ {
return removeOnClusterSystem<ASTSystemQuery>(clone(), new_database); return removeOnCluster<ASTSystemQuery>(clone(), new_database);
} }
const char * getQueryKindString() const override { return "System"; } const char * getQueryKindString() const override { return "System"; }

View File

@ -39,14 +39,14 @@ static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery>
ASTPtr ast; ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected)) if (ParserStringLiteral{}.parse(pos, ast, expected))
{ {
res->database = {}; res->setDatabase("");
res->table = ast->as<ASTLiteral &>().value.safeGet<String>(); res->setTable(ast->as<ASTLiteral &>().value.safeGet<String>());
parsed_table = true; parsed_table = true;
} }
} }
if (!parsed_table) if (!parsed_table)
parsed_table = parseDatabaseAndTableName(pos, expected, res->database, res->table); parsed_table = parseDatabaseAndTableASTPtr(pos, expected, res->database, res->table);
if (!parsed_table && require_table) if (!parsed_table && require_table)
return false; return false;
@ -56,6 +56,12 @@ static bool parseQueryWithOnClusterAndMaybeTable(std::shared_ptr<ASTSystemQuery>
return false; return false;
res->cluster = cluster; res->cluster = cluster;
if (res->database)
res->children.push_back(res->database);
if (res->table)
res->children.push_back(res->table);
return true; return true;
} }
@ -163,14 +169,12 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
if (ParserKeyword{"DATABASE"}.ignore(pos, expected)) if (ParserKeyword{"DATABASE"}.ignore(pos, expected))
{ {
ParserIdentifier database_parser; ParserIdentifier database_parser;
ASTPtr database; if (!database_parser.parse(pos, res->database, expected))
if (!database_parser.parse(pos, database, expected))
return false; return false;
tryGetIdentifierNameInto(database, res->database);
} }
else if (ParserKeyword{"TABLE"}.ignore(pos, expected)) else if (ParserKeyword{"TABLE"}.ignore(pos, expected))
{ {
parseDatabaseAndTableName(pos, expected, res->database, res->table); parseDatabaseAndTableASTPtr(pos, expected, res->database, res->table);
} }
else if (ParserKeyword{"ZKPATH"}.ignore(pos, expected)) else if (ParserKeyword{"ZKPATH"}.ignore(pos, expected))
{ {
@ -193,7 +197,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA: case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA: case Type::SYNC_REPLICA:
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table)) if (!parseDatabaseAndTableASTPtr(pos, expected, res->database, res->table))
return false; return false;
break; break;
@ -251,7 +255,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
res->storage_policy = storage_policy_str; res->storage_policy = storage_policy_str;
res->volume = volume_str; res->volume = volume_str;
if (res->volume.empty() && res->storage_policy.empty()) if (res->volume.empty() && res->storage_policy.empty())
parseDatabaseAndTableName(pos, expected, res->database, res->table); parseDatabaseAndTableASTPtr(pos, expected, res->database, res->table);
break; break;
} }
@ -265,7 +269,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS: case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES: case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES: case Type::START_REPLICATION_QUEUES:
parseDatabaseAndTableName(pos, expected, res->database, res->table); parseDatabaseAndTableASTPtr(pos, expected, res->database, res->table);
break; break;
case Type::SUSPEND: case Type::SUSPEND:
@ -287,6 +291,11 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
break; break;
} }
if (res->database)
res->children.push_back(res->database);
if (res->table)
res->children.push_back(res->table);
node = std::move(res); node = std::move(res);
return true; return true;
} }