ClickHouse/src/Interpreters/InterpreterSystemQuery.cpp

817 lines
32 KiB
C++
Raw Normal View History

#include <Interpreters/InterpreterSystemQuery.h>
2018-04-19 13:56:14 +00:00
#include <Common/DNSResolver.h>
#include <Common/ActionLock.h>
#include <Common/typeid_cast.h>
2018-06-05 19:46:49 +00:00
#include <Common/getNumberOfPhysicalCPUCores.h>
2020-11-30 14:30:55 +00:00
#include <Common/SymbolIndex.h>
#include <Common/ThreadPool.h>
2019-10-25 19:07:47 +00:00
#include <Common/escapeForFileName.h>
2021-01-07 19:19:33 +00:00
#include <Common/ShellCommand.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/ExternalDictionariesLoader.h>
2021-04-16 09:56:40 +00:00
#include <Interpreters/ExternalModelsLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ActionLocksManager.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/QueryLog.h>
2020-11-03 13:47:26 +00:00
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
2019-02-03 21:30:45 +00:00
#include <Interpreters/TraceLog.h>
#include <Interpreters/TextLog.h>
2019-08-13 14:31:46 +00:00
#include <Interpreters/MetricLog.h>
2020-06-10 19:17:30 +00:00
#include <Interpreters/AsynchronousMetricLog.h>
2020-10-22 16:47:20 +00:00
#include <Interpreters/OpenTelemetrySpanLog.h>
2021-03-04 17:38:12 +00:00
#include <Interpreters/ExpressionJIT.h>
#include <Access/ContextAccess.h>
2020-04-30 22:29:47 +00:00
#include <Access/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
2021-04-20 19:23:54 +00:00
#include <Disks/DiskRestartProxy.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageFactory.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <csignal>
2019-01-09 15:44:20 +00:00
#include <algorithm>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_KILL;
extern const int NOT_IMPLEMENTED;
2019-09-19 11:04:57 +00:00
extern const int TIMEOUT_EXCEEDED;
2020-06-23 12:01:51 +00:00
extern const int TABLE_WAS_NOT_DROPPED;
}
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
2019-04-22 15:11:16 +00:00
extern StorageActionBlockType DistributedSend;
2019-08-01 15:36:12 +00:00
extern StorageActionBlockType PartsTTLMerge;
2019-09-03 14:50:49 +00:00
extern StorageActionBlockType PartsMove;
}
namespace
{
ExecutionStatus getOverallExecutionStatusOfCommands()
{
return ExecutionStatus(0);
}
2019-01-22 19:56:53 +00:00
/// Consequently tries to execute all commands and generates final exception message for failed commands
template <typename Callable, typename ... Callables>
ExecutionStatus getOverallExecutionStatusOfCommands(Callable && command, Callables && ... commands)
{
ExecutionStatus status_head(0);
try
{
command();
}
catch (...)
{
status_head = ExecutionStatus::fromCurrentException();
}
ExecutionStatus status_tail = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...);
auto res_status = status_head.code != 0 ? status_head.code : status_tail.code;
auto res_message = status_head.message + (status_tail.message.empty() ? "" : ("\n" + status_tail.message));
return ExecutionStatus(res_status, res_message);
}
/// Consequently tries to execute all commands and throws exception with info about failed commands
template <typename ... Callables>
void executeCommandsAndThrowIfError(Callables && ... commands)
{
auto status = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...);
if (status.code != 0)
throw Exception(status.message, status.code);
}
2020-01-24 16:20:36 +00:00
AccessType getRequiredAccessType(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return AccessType::SYSTEM_MERGES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsFetch)
return AccessType::SYSTEM_FETCHES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsSend)
return AccessType::SYSTEM_REPLICATED_SENDS;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::ReplicationQueue)
return AccessType::SYSTEM_REPLICATION_QUEUES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::DistributedSend)
return AccessType::SYSTEM_DISTRIBUTED_SENDS;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsTTLMerge)
return AccessType::SYSTEM_TTL_MERGES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsMove)
return AccessType::SYSTEM_MOVES;
2020-01-24 16:20:36 +00:00
else
throw Exception("Unknown action type: " + std::to_string(action_type), ErrorCodes::LOGICAL_ERROR);
}
2020-03-04 20:29:52 +00:00
}
2020-01-24 16:20:36 +00:00
/// Implements SYSTEM [START|STOP] <something action from ActionLocks>
2020-03-04 20:29:52 +00:00
void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type, bool start)
{
auto manager = getContext()->getActionLocksManager();
manager->cleanExpired();
if (volume_ptr && action_type == ActionLocks::PartsMerge)
{
volume_ptr->setAvoidMergesUserOverride(!start);
}
else if (table_id)
{
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
2020-10-15 19:23:46 +00:00
if (table)
2020-10-15 16:10:22 +00:00
{
2020-10-15 19:23:46 +00:00
if (start)
{
manager->remove(table, action_type);
table->onActionLockRemove(action_type);
}
else
manager->add(table, action_type);
2020-10-15 16:10:22 +00:00
}
}
else
{
auto access = getContext()->getAccess();
for (auto & elem : DatabaseCatalog::instance().getDatabases())
2020-01-24 16:20:36 +00:00
{
for (auto iterator = elem.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
2020-01-24 16:20:36 +00:00
{
StoragePtr table = iterator->table();
if (!table)
continue;
2020-06-20 22:44:52 +00:00
if (!access->isGranted(getRequiredAccessType(action_type), elem.first, iterator->name()))
{
LOG_INFO(
log,
"Access {} denied, skipping {}.{}",
toString(getRequiredAccessType(action_type)),
elem.first,
iterator->name());
continue;
2020-06-20 22:44:52 +00:00
}
if (start)
2020-10-15 16:10:22 +00:00
{
manager->remove(table, action_type);
2020-10-15 16:10:22 +00:00
table->onActionLockRemove(action_type);
}
else
manager->add(table, action_type);
2020-01-24 16:20:36 +00:00
}
}
}
}
2020-01-24 16:20:36 +00:00
2020-03-04 20:29:52 +00:00
InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_), query_ptr(query_ptr_->clone()), log(&Poco::Logger::get("InterpreterSystemQuery"))
{
}
BlockIO InterpreterSystemQuery::execute()
{
auto & query = query_ptr->as<ASTSystemQuery &>();
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), getRequiredAccessForDDLOnCluster());
using Type = ASTSystemQuery::Type;
/// Use global context with fresh system profile settings
auto system_context = Context::createCopy(getContext()->getGlobalContext());
system_context->setSetting("profile", getContext()->getSystemProfileName());
/// Make canonical query for simpler processing
2020-03-04 20:29:52 +00:00
if (!query.table.empty())
table_id = getContext()->resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary);
2019-12-23 03:48:34 +00:00
if (!query.target_dictionary.empty() && !query.database.empty())
query.target_dictionary = query.database + "." + query.target_dictionary;
volume_ptr = {};
if (!query.storage_policy.empty() && !query.volume.empty())
volume_ptr = getContext()->getStoragePolicy(query.storage_policy)->getVolumeByName(query.volume);
switch (query.type)
{
case Type::SHUTDOWN:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
if (kill(0, SIGTERM))
throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL);
break;
2021-01-07 19:19:33 +00:00
}
case Type::KILL:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
2021-01-07 18:36:38 +00:00
/// Exit with the same code as it is usually set by shell when process is terminated by SIGKILL.
/// It's better than doing 'raise' or 'kill', because they have no effect for 'init' process (with pid = 0, usually in Docker).
LOG_INFO(log, "Exit immediately as the SYSTEM KILL command has been issued.");
_exit(128 + SIGKILL);
// break; /// unreachable
2021-01-07 19:19:33 +00:00
}
case Type::SUSPEND:
{
auto command = fmt::format("kill -STOP {0} && sleep {1} && kill -CONT {0}", getpid(), query.seconds);
LOG_DEBUG(log, "Will run {}", command);
auto res = ShellCommand::execute(command);
res->in.close();
WriteBufferFromOwnString out;
copyData(res->out, out);
copyData(res->err, out);
if (!out.str().empty())
LOG_DEBUG(log, "The command returned output: {}", command, out.str());
res->wait();
break;
2021-01-07 19:19:33 +00:00
}
case Type::DROP_DNS_CACHE:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_DNS_CACHE);
2018-04-19 13:56:14 +00:00
DNSResolver::instance().dropCache();
/// Reinitialize clusters to update their resolved_addresses
system_context->reloadClusterConfig();
break;
2021-01-07 19:19:33 +00:00
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropMarkCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache();
break;
#if USE_EMBEDDED_COMPILER
case Type::DROP_COMPILED_EXPRESSION_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
2021-03-04 17:38:12 +00:00
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
break;
#endif
case Type::RELOAD_DICTIONARY:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
2021-03-19 12:47:27 +00:00
auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader();
external_dictionaries_loader.reloadDictionary(query.target_dictionary, getContext());
2021-03-19 12:47:27 +00:00
2021-04-16 09:56:40 +00:00
2020-03-03 08:32:58 +00:00
ExternalDictionariesLoader::resetAll();
break;
2021-01-07 19:19:33 +00:00
}
case Type::RELOAD_DICTIONARIES:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
executeCommandsAndThrowIfError(
[&] () { system_context->getExternalDictionariesLoader().reloadAllTriedToLoad(); },
[&] () { system_context->getEmbeddedDictionaries().reload(); }
);
2020-03-03 08:32:58 +00:00
ExternalDictionariesLoader::resetAll();
break;
2021-01-07 19:19:33 +00:00
}
2021-04-16 09:56:40 +00:00
case Type::RELOAD_MODEL:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_MODEL);
auto & external_models_loader = system_context->getExternalModelsLoader();
external_models_loader.reloadModel(query.target_model);
break;
}
case Type::RELOAD_MODELS:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_MODEL);
auto & external_models_loader = system_context->getExternalModelsLoader();
external_models_loader.reloadAllTriedToLoad();
break;
}
case Type::RELOAD_EMBEDDED_DICTIONARIES:
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_EMBEDDED_DICTIONARIES);
system_context->getEmbeddedDictionaries().reload();
break;
case Type::RELOAD_CONFIG:
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_CONFIG);
system_context->reloadConfig();
break;
2020-11-30 14:30:55 +00:00
case Type::RELOAD_SYMBOLS:
2021-01-07 19:19:33 +00:00
{
2020-12-07 17:40:49 +00:00
#if defined(__ELF__) && !defined(__FreeBSD__)
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_SYMBOLS);
2020-11-30 14:30:55 +00:00
(void)SymbolIndex::instance(true);
2020-12-07 21:12:09 +00:00
break;
2020-12-07 17:40:49 +00:00
#else
throw Exception("SYSTEM RELOAD SYMBOLS is not supported on current platform", ErrorCodes::NOT_IMPLEMENTED);
#endif
2021-01-07 19:19:33 +00:00
}
case Type::STOP_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMerge, false);
break;
case Type::START_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMerge, true);
break;
2019-08-01 15:36:12 +00:00
case Type::STOP_TTL_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsTTLMerge, false);
2019-08-01 15:36:12 +00:00
break;
case Type::START_TTL_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsTTLMerge, true);
2019-08-01 15:36:12 +00:00
break;
2019-09-03 14:50:49 +00:00
case Type::STOP_MOVES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMove, false);
2019-09-03 14:50:49 +00:00
break;
case Type::START_MOVES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMove, true);
2019-09-03 14:50:49 +00:00
break;
case Type::STOP_FETCHES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsFetch, false);
break;
case Type::START_FETCHES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsFetch, true);
break;
case Type::STOP_REPLICATED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsSend, false);
break;
2019-02-02 11:28:43 +00:00
case Type::START_REPLICATED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsSend, true);
break;
case Type::STOP_REPLICATION_QUEUES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::ReplicationQueue, false);
break;
case Type::START_REPLICATION_QUEUES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::ReplicationQueue, true);
break;
2019-04-22 15:11:16 +00:00
case Type::STOP_DISTRIBUTED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::DistributedSend, false);
2019-04-22 15:11:16 +00:00
break;
case Type::START_DISTRIBUTED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::DistributedSend, true);
2019-04-22 15:11:16 +00:00
break;
2020-05-17 12:44:22 +00:00
case Type::DROP_REPLICA:
dropReplica(query);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::FLUSH_DISTRIBUTED:
flushDistributed(query);
2019-04-22 15:11:16 +00:00
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
break;
case Type::RESTART_REPLICA:
2020-03-04 20:29:52 +00:00
if (!tryRestartReplica(table_id, system_context))
throw Exception("There is no " + query.database + "." + query.table + " replicated table",
ErrorCodes::BAD_ARGUMENTS);
break;
2021-04-20 19:23:54 +00:00
case Type::RESTART_DISK:
restartDisk(query.disk);
break;
case Type::FLUSH_LOGS:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = getContext()->getQueryLog()) query_log->flush(true); },
[&] () { if (auto part_log = getContext()->getPartLog("")) part_log->flush(true); },
[&] () { if (auto query_thread_log = getContext()->getQueryThreadLog()) query_thread_log->flush(true); },
[&] () { if (auto trace_log = getContext()->getTraceLog()) trace_log->flush(true); },
[&] () { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
[&] () { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
[&] () { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
[&] () { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
);
break;
2021-01-07 19:19:33 +00:00
}
case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES:
throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED);
default:
throw Exception("Unknown type of SYSTEM query", ErrorCodes::BAD_ARGUMENTS);
}
return BlockIO();
}
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextPtr system_context, bool need_ddl_guard)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
2020-01-24 16:20:36 +00:00
auto table_ddl_guard = need_ddl_guard ? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName()) : nullptr;
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica, getContext());
ASTPtr create_ast;
/// Detach actions
2020-03-04 20:29:52 +00:00
if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
return nullptr;
2020-03-04 20:29:52 +00:00
table->shutdown();
{
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
create_ast = database->getCreateTableQuery(replica.table_name, getContext());
2020-03-04 20:29:52 +00:00
database->detachTable(replica.table_name);
}
2020-03-04 20:29:52 +00:00
table.reset();
/// Attach actions
2020-03-04 20:29:52 +00:00
/// getCreateTableQuery must return canonical CREATE query representation, there are no need for AST postprocessing
auto & create = create_ast->as<ASTCreateQuery &>();
create.attach = true;
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true);
2020-03-04 20:29:52 +00:00
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
2020-04-02 01:00:21 +00:00
auto data_path = database->getTableDataPath(create);
2020-03-04 20:29:52 +00:00
table = StorageFactory::instance().get(create,
2020-04-02 01:00:21 +00:00
data_path,
2020-03-04 20:29:52 +00:00
system_context,
system_context->getGlobalContext(),
2020-03-04 20:29:52 +00:00
columns,
constraints,
false);
2020-04-02 01:00:21 +00:00
database->attachTable(replica.table_name, table, data_path);
2020-03-04 20:29:52 +00:00
table->startup();
return table;
}
void InterpreterSystemQuery::restartReplicas(ContextPtr system_context)
{
2020-03-04 20:29:52 +00:00
std::vector<StorageID> replica_names;
auto & catalog = DatabaseCatalog::instance();
for (auto & elem : catalog.getDatabases())
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
2020-06-02 02:06:16 +00:00
if (auto table = iterator->table())
{
if (dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
2020-07-07 12:11:58 +00:00
replica_names.emplace_back(StorageID{iterator->databaseName(), iterator->name()});
2020-06-02 02:06:16 +00:00
}
}
}
if (replica_names.empty())
return;
TableGuards guards;
for (const auto & name : replica_names)
guards.emplace(UniqueTableName{name.database_name, name.table_name}, nullptr);
for (auto & guard : guards)
guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name);
2019-01-09 15:44:20 +00:00
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
for (auto & replica : replica_names)
{
LOG_TRACE(log, "Restarting replica on {}", replica.getNameForLogs());
pool.scheduleOrThrowOnError([&]() { tryRestartReplica(replica, system_context, false); });
}
pool.wait();
}
2020-05-17 12:44:22 +00:00
void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
{
2020-06-23 12:01:51 +00:00
if (query.replica.empty())
throw Exception("Replica name is empty", ErrorCodes::BAD_ARGUMENTS);
2020-05-17 12:44:22 +00:00
if (!table_id.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
2020-05-17 12:44:22 +00:00
2020-06-23 12:01:51 +00:00
if (!dropReplicaImpl(query, table))
2020-05-17 12:44:22 +00:00
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
else if (!query.database.empty())
2020-05-17 12:44:22 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.database);
2020-06-23 12:01:51 +00:00
DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.database);
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
2020-06-23 12:01:51 +00:00
dropReplicaImpl(query, iterator->table());
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
else if (query.is_drop_whole_replica)
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
2020-06-23 12:01:51 +00:00
auto databases = DatabaseCatalog::instance().getDatabases();
for (auto & elem : databases)
{
2020-06-23 12:01:51 +00:00
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
2020-06-23 12:01:51 +00:00
dropReplicaImpl(query, iterator->table());
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
else if (!query.replica_zk_path.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
auto remote_replica_path = query.replica_zk_path + "/replicas/" + query.replica;
2020-06-23 12:01:51 +00:00
/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(iterator->table().get()))
{
2020-06-23 12:01:51 +00:00
StorageReplicatedMergeTree::Status status;
storage_replicated->getStatus(status);
2020-06-23 12:01:51 +00:00
if (status.zookeeper_path == query.replica_zk_path)
throw Exception("There is a local table " + storage_replicated->getStorageID().getNameForLogs() +
", which has the same table path in ZooKeeper. Please check the path in query. "
"If you want to drop replica of this table, use `DROP TABLE` "
"or `SYSTEM DROP REPLICA 'name' FROM db.table`", ErrorCodes::TABLE_WAS_NOT_DROPPED);
}
}
}
auto zookeeper = getContext()->getZooKeeper();
2020-06-23 12:01:51 +00:00
bool looks_like_table_path = zookeeper->exists(query.replica_zk_path + "/replicas") ||
zookeeper->exists(query.replica_zk_path + "/dropped");
if (!looks_like_table_path)
throw Exception("Specified path " + query.replica_zk_path + " does not look like a table path",
ErrorCodes::TABLE_WAS_NOT_DROPPED);
if (zookeeper->exists(remote_replica_path + "/is_active"))
throw Exception("Can't remove replica: " + query.replica + ", because it's active",
2020-06-23 12:01:51 +00:00
ErrorCodes::TABLE_WAS_NOT_DROPPED);
2020-06-23 12:01:51 +00:00
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
}
2020-06-23 12:01:51 +00:00
else
throw Exception("Invalid query", ErrorCodes::LOGICAL_ERROR);
}
2020-06-23 12:01:51 +00:00
bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table)
{
auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!storage_replicated)
return false;
StorageReplicatedMergeTree::Status status;
auto zookeeper = getContext()->getZooKeeper();
2020-06-23 12:01:51 +00:00
storage_replicated->getStatus(status);
/// Do not allow to drop local replicas and active remote replicas
if (query.replica == status.replica_name)
throw Exception("We can't drop local replica, please use `DROP TABLE` "
"if you want to clean the data and drop this replica", ErrorCodes::TABLE_WAS_NOT_DROPPED);
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
/// However, the main usecase is to drop dead replica, which cannot become active.
/// This check prevents only from accidental drop of some other replica.
if (zookeeper->exists(status.zookeeper_path + "/replicas/" + query.replica + "/is_active"))
throw Exception("Can't drop replica: " + query.replica + ", because it's active",
ErrorCodes::TABLE_WAS_NOT_DROPPED);
storage_replicated->dropReplica(zookeeper, status.zookeeper_path, query.replica, log);
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
return true;
2020-05-17 12:44:22 +00:00
}
2020-03-04 20:29:52 +00:00
void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)
{
getContext()->checkAccess(AccessType::SYSTEM_SYNC_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
2020-04-22 06:01:33 +00:00
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
2019-09-19 11:04:57 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
2019-09-19 11:04:57 +00:00
throw Exception(
2020-03-04 20:29:52 +00:00
"SYNC REPLICA " + table_id.getNameForLogs() + ": command timed out! "
2019-09-19 11:04:57 +00:00
"See the 'receive_timeout' setting", ErrorCodes::TIMEOUT_EXCEEDED);
}
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "SYNC REPLICA {}: OK", table_id.getNameForLogs());
2019-09-19 11:04:57 +00:00
}
else
2020-03-04 20:29:52 +00:00
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
2020-03-04 20:29:52 +00:00
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
2019-04-22 15:11:16 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
2019-04-22 15:11:16 +00:00
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, getContext()).get()))
storage_distributed->flushClusterNodesAllData(getContext());
2019-04-22 15:11:16 +00:00
else
2020-03-04 20:29:52 +00:00
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
2021-04-20 19:23:54 +00:00
void InterpreterSystemQuery::restartDisk(String & name)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_DISK);
auto disk = getContext()->getDisk(name);
if (DiskRestartProxy * restart_proxy = dynamic_cast<DiskRestartProxy*>(disk.get()))
2021-04-29 20:32:19 +00:00
restart_proxy->restart();
2021-04-20 19:23:54 +00:00
else
throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS);
}
2020-01-24 16:20:36 +00:00
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
const auto & query = query_ptr->as<const ASTSystemQuery &>();
using Type = ASTSystemQuery::Type;
AccessRightsElements required_access;
switch (query.type)
{
case Type::SHUTDOWN: [[fallthrough]];
2021-01-07 19:19:33 +00:00
case Type::KILL: [[fallthrough]];
case Type::SUSPEND:
2020-01-24 16:20:36 +00:00
{
required_access.emplace_back(AccessType::SYSTEM_SHUTDOWN);
2020-01-24 16:20:36 +00:00
break;
}
case Type::DROP_DNS_CACHE: [[fallthrough]];
case Type::DROP_MARK_CACHE: [[fallthrough]];
case Type::DROP_MMAP_CACHE: [[fallthrough]];
2020-01-24 16:20:36 +00:00
#if USE_EMBEDDED_COMPILER
case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]];
#endif
case Type::DROP_UNCOMPRESSED_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
2020-01-24 16:20:36 +00:00
break;
}
case Type::RELOAD_DICTIONARY: [[fallthrough]];
case Type::RELOAD_DICTIONARIES: [[fallthrough]];
case Type::RELOAD_EMBEDDED_DICTIONARIES:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_DICTIONARY);
2020-01-24 16:20:36 +00:00
break;
}
2021-04-16 09:56:40 +00:00
case Type::RELOAD_MODEL: [[fallthrough]];
case Type::RELOAD_MODELS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_MODEL);
break;
}
2020-01-24 16:20:36 +00:00
case Type::RELOAD_CONFIG:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_CONFIG);
2020-01-24 16:20:36 +00:00
break;
}
2020-11-30 14:30:55 +00:00
case Type::RELOAD_SYMBOLS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_SYMBOLS);
break;
}
2020-01-24 16:20:36 +00:00
case Type::STOP_MERGES: [[fallthrough]];
case Type::START_MERGES:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_MERGES);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_MERGES, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_TTL_MERGES: [[fallthrough]];
case Type::START_TTL_MERGES:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_MOVES: [[fallthrough]];
case Type::START_MOVES:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_MOVES);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_FETCHES: [[fallthrough]];
case Type::START_FETCHES:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_FETCHES);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_FETCHES, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_DISTRIBUTED_SENDS: [[fallthrough]];
case Type::START_DISTRIBUTED_SENDS:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_REPLICATED_SENDS: [[fallthrough]];
case Type::START_REPLICATED_SENDS:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_REPLICATION_QUEUES: [[fallthrough]];
case Type::START_REPLICATION_QUEUES:
{
if (query.table.empty())
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES);
2020-01-24 16:20:36 +00:00
else
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
2020-05-17 12:44:22 +00:00
case Type::DROP_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.database, query.table);
break;
}
2020-01-24 16:20:36 +00:00
case Type::SYNC_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::RESTART_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::RESTART_REPLICAS:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA);
2020-01-24 16:20:36 +00:00
break;
}
case Type::FLUSH_DISTRIBUTED:
{
required_access.emplace_back(AccessType::SYSTEM_FLUSH_DISTRIBUTED, query.database, query.table);
2020-01-24 16:20:36 +00:00
break;
}
case Type::FLUSH_LOGS:
{
required_access.emplace_back(AccessType::SYSTEM_FLUSH_LOGS);
2020-01-24 16:20:36 +00:00
break;
}
2021-04-20 19:23:54 +00:00
case Type::RESTART_DISK:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_DISK);
break;
}
2020-01-24 16:20:36 +00:00
case Type::STOP_LISTEN_QUERIES: break;
case Type::START_LISTEN_QUERIES: break;
case Type::UNKNOWN: break;
case Type::END: break;
}
return required_access;
}
void InterpreterSystemQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & /*ast*/, ContextPtr) const
{
elem.query_kind = "System";
}
}