ClickHouse/src/Interpreters/InterpreterSystemQuery.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1486 lines
58 KiB
C++
Raw Normal View History

#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterSystemQuery.h>
2018-04-19 13:56:14 +00:00
#include <Common/DNSResolver.h>
#include <Common/ActionLock.h>
#include <Common/typeid_cast.h>
2018-06-05 19:46:49 +00:00
#include <Common/getNumberOfPhysicalCPUCores.h>
2020-11-30 14:30:55 +00:00
#include <Common/SymbolIndex.h>
#include <Common/ThreadPool.h>
2019-10-25 19:07:47 +00:00
#include <Common/escapeForFileName.h>
2021-01-07 19:19:33 +00:00
#include <Common/ShellCommand.h>
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/PageCache.h>
2024-03-03 13:22:40 +00:00
#include <Common/HostResolvePool.h>
2022-08-26 17:23:46 +00:00
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ActionLocksManager.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
2020-11-03 13:47:26 +00:00
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/QueryThreadLog.h>
2021-06-18 13:44:08 +00:00
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/SessionLog.h>
2019-02-03 21:30:45 +00:00
#include <Interpreters/TraceLog.h>
#include <Interpreters/TextLog.h>
2019-08-13 14:31:46 +00:00
#include <Interpreters/MetricLog.h>
2020-06-10 19:17:30 +00:00
#include <Interpreters/AsynchronousMetricLog.h>
2020-10-22 16:47:20 +00:00
#include <Interpreters/OpenTelemetrySpanLog.h>
2021-07-09 14:05:35 +00:00
#include <Interpreters/ZooKeeperLog.h>
2022-04-30 05:00:40 +00:00
#include <Interpreters/FilesystemCacheLog.h>
2022-01-14 14:03:00 +00:00
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
2023-08-10 12:46:04 +00:00
#include <Interpreters/BackupLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
2022-05-25 20:20:13 +00:00
#include <Interpreters/TransactionLog.h>
2023-04-25 21:30:03 +00:00
#include <Interpreters/AsynchronousInsertQueue.h>
#include <BridgeHelper/CatBoostLibraryBridgeHelper.h>
2022-09-16 11:19:39 +00:00
#include <Access/AccessControl.h>
#include <Access/ContextAccess.h>
#include <Access/Common/AllowedClientHosts.h>
#include <Databases/DatabaseReplicated.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
2022-06-08 12:09:59 +00:00
#include <Storages/Freeze.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/StorageAzureBlob.h>
2023-02-12 19:17:55 +00:00
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/HDFS/StorageHDFS.h>
2023-06-29 14:41:36 +00:00
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
2024-02-28 19:10:00 +00:00
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Common/ThreadFuzzer.h>
#include <base/coverage.h>
#include <csignal>
2019-01-09 15:44:20 +00:00
#include <algorithm>
2023-01-27 01:10:40 +00:00
#include <unistd.h>
#if USE_PROTOBUF
#include <Formats/ProtobufSchemas.h>
#endif
#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
#if USE_JEMALLOC
#include <Common/Jemalloc.h>
#endif
#include "config.h"
namespace CurrentMetrics
{
extern const Metric RestartReplicaThreads;
extern const Metric RestartReplicaThreadsActive;
extern const Metric RestartReplicaThreadsScheduled;
}
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_KILL;
extern const int NOT_IMPLEMENTED;
2019-09-19 11:04:57 +00:00
extern const int TIMEOUT_EXCEEDED;
2020-06-23 12:01:51 +00:00
extern const int TABLE_WAS_NOT_DROPPED;
extern const int ABORTED;
extern const int SUPPORT_IS_DISABLED;
}
namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
2023-09-20 16:10:00 +00:00
extern const StorageActionBlockType Cleanup;
2023-02-12 19:17:55 +00:00
extern const StorageActionBlockType ViewRefresh;
}
namespace
{
2023-08-11 01:06:07 +00:00
/// Sequentially tries to execute all commands and throws exception with info about failed commands
void executeCommandsAndThrowIfError(std::vector<std::function<void()>> commands)
{
2023-08-11 01:06:07 +00:00
ExecutionStatus result(0);
for (auto & command : commands)
{
2023-08-11 01:06:07 +00:00
try
{
command();
}
catch (...)
{
ExecutionStatus current_result = ExecutionStatus::fromCurrentException();
2023-08-11 01:06:07 +00:00
if (result.code == 0)
result.code = current_result.code;
2023-08-11 01:06:07 +00:00
if (!current_result.message.empty())
{
if (!result.message.empty())
result.message += '\n';
result.message += current_result.message;
}
}
}
2023-08-11 01:06:07 +00:00
if (result.code != 0)
throw Exception::createDeprecated(result.message, result.code);
}
2020-01-24 16:20:36 +00:00
AccessType getRequiredAccessType(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return AccessType::SYSTEM_MERGES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsFetch)
return AccessType::SYSTEM_FETCHES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsSend)
return AccessType::SYSTEM_REPLICATED_SENDS;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::ReplicationQueue)
return AccessType::SYSTEM_REPLICATION_QUEUES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::DistributedSend)
return AccessType::SYSTEM_DISTRIBUTED_SENDS;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsTTLMerge)
return AccessType::SYSTEM_TTL_MERGES;
2020-01-24 16:20:36 +00:00
else if (action_type == ActionLocks::PartsMove)
return AccessType::SYSTEM_MOVES;
else if (action_type == ActionLocks::PullReplicationLog)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
2023-09-20 16:10:00 +00:00
else if (action_type == ActionLocks::Cleanup)
return AccessType::SYSTEM_CLEANUP;
2023-02-12 19:17:55 +00:00
else if (action_type == ActionLocks::ViewRefresh)
return AccessType::SYSTEM_VIEWS;
2020-01-24 16:20:36 +00:00
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
2020-01-24 16:20:36 +00:00
}
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
constexpr std::string_view table_is_not_replicated = "Table {} is not replicated";
2020-03-04 20:29:52 +00:00
}
2020-01-24 16:20:36 +00:00
/// Implements SYSTEM [START|STOP] <something action from ActionLocks>
2020-03-04 20:29:52 +00:00
void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type, bool start)
{
auto manager = getContext()->getActionLocksManager();
manager->cleanExpired();
auto access = getContext()->getAccess();
auto required_access_type = getRequiredAccessType(action_type);
if (volume_ptr && action_type == ActionLocks::PartsMerge)
{
access->checkAccess(required_access_type);
volume_ptr->setAvoidMergesUserOverride(!start);
}
else if (table_id)
{
access->checkAccess(required_access_type, table_id.database_name, table_id.table_name);
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
2020-10-15 19:23:46 +00:00
if (table)
2020-10-15 16:10:22 +00:00
{
2020-10-15 19:23:46 +00:00
if (start)
{
manager->remove(table, action_type);
table->onActionLockRemove(action_type);
}
else
manager->add(table, action_type);
2020-10-15 16:10:22 +00:00
}
}
else
{
for (auto & elem : DatabaseCatalog::instance().getDatabases())
2020-01-24 16:20:36 +00:00
{
startStopActionInDatabase(action_type, start, elem.first, elem.second, getContext(), log);
}
}
}
void InterpreterSystemQuery::startStopActionInDatabase(StorageActionBlockType action_type, bool start,
const String & database_name, const DatabasePtr & database,
2024-01-23 17:04:50 +00:00
const ContextPtr & local_context, LoggerPtr log)
{
auto manager = local_context->getActionLocksManager();
auto access = local_context->getAccess();
auto required_access_type = getRequiredAccessType(action_type);
for (auto iterator = database->getTablesIterator(local_context); iterator->isValid(); iterator->next())
{
StoragePtr table = iterator->table();
if (!table)
continue;
if (!access->isGranted(required_access_type, database_name, iterator->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", toString(required_access_type), database_name, iterator->name());
continue;
}
if (start)
{
manager->remove(table, action_type);
table->onActionLockRemove(action_type);
2020-01-24 16:20:36 +00:00
}
else
manager->add(table, action_type);
}
}
2020-01-24 16:20:36 +00:00
2020-03-04 20:29:52 +00:00
2021-05-31 14:49:02 +00:00
InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
2024-01-23 17:04:50 +00:00
: WithMutableContext(context_), query_ptr(query_ptr_->clone()), log(getLogger("InterpreterSystemQuery"))
{
}
BlockIO InterpreterSystemQuery::execute()
{
auto & query = query_ptr->as<ASTSystemQuery &>();
if (!query.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = getRequiredAccessForDDLOnCluster();
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
using Type = ASTSystemQuery::Type;
/// Use global context with fresh system profile settings
auto system_context = Context::createCopy(getContext()->getGlobalContext());
system_context->setSetting("profile", getContext()->getSystemProfileName());
/// Make canonical query for simpler processing
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
if (query.type == Type::RELOAD_DICTIONARY)
{
2021-11-11 13:28:18 +00:00
if (query.database)
2021-10-12 18:11:00 +00:00
query.setTable(query.getDatabase() + "." + query.getTable());
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
}
2021-11-11 13:28:18 +00:00
else if (query.table)
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
{
2021-10-12 18:11:00 +00:00
table_id = getContext()->resolveStorageID(StorageID(query.getDatabase(), query.getTable()), Context::ResolveOrdinary);
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
}
2022-06-08 12:09:59 +00:00
BlockIO result;
volume_ptr = {};
if (!query.storage_policy.empty() && !query.volume.empty())
volume_ptr = getContext()->getStoragePolicy(query.storage_policy)->getVolumeByName(query.volume);
switch (query.type)
{
case Type::SHUTDOWN:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
if (kill(0, SIGTERM))
2023-12-15 18:25:49 +00:00
throw ErrnoException(ErrorCodes::CANNOT_KILL, "System call kill(0, SIGTERM) failed");
break;
2021-01-07 19:19:33 +00:00
}
case Type::KILL:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
2021-01-07 18:36:38 +00:00
/// Exit with the same code as it is usually set by shell when process is terminated by SIGKILL.
/// It's better than doing 'raise' or 'kill', because they have no effect for 'init' process (with pid = 0, usually in Docker).
LOG_INFO(log, "Exit immediately as the SYSTEM KILL command has been issued.");
_exit(128 + SIGKILL);
// break; /// unreachable
2021-01-07 19:19:33 +00:00
}
case Type::SUSPEND:
{
getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN);
2021-01-07 19:19:33 +00:00
auto command = fmt::format("kill -STOP {0} && sleep {1} && kill -CONT {0}", getpid(), query.seconds);
LOG_DEBUG(log, "Will run {}", command);
auto res = ShellCommand::execute(command);
res->in.close();
WriteBufferFromOwnString out;
copyData(res->out, out);
copyData(res->err, out);
if (!out.str().empty())
LOG_DEBUG(log, "The command {} returned output: {}", command, out.str());
res->wait();
break;
2021-01-07 19:19:33 +00:00
}
2023-01-27 01:10:40 +00:00
case Type::SYNC_FILE_CACHE:
{
2023-01-29 01:43:24 +00:00
LOG_DEBUG(log, "Will perform 'sync' syscall (it can take time).");
2023-01-27 01:10:40 +00:00
sync();
break;
}
case Type::DROP_DNS_CACHE:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_DNS_CACHE);
2018-04-19 13:56:14 +00:00
DNSResolver::instance().dropCache();
2024-03-03 13:22:40 +00:00
HostResolversPool::instance().dropCache();
/// Reinitialize clusters to update their resolved_addresses
system_context->reloadClusterConfig();
break;
2021-01-07 19:19:33 +00:00
}
2024-03-03 13:22:40 +00:00
case Type::DROP_CONNECTIONS_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_CONNECTIONS_CACHE);
HTTPConnectionPools::instance().dropCache();
break;
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->clearMarkCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->clearUncompressedCache();
break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->clearIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->clearIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->clearMMappedFileCache();
break;
case Type::DROP_QUERY_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
getContext()->clearQueryCache();
2022-11-29 13:15:28 +00:00
break;
case Type::DROP_COMPILED_EXPRESSION_CACHE:
2024-02-20 10:45:42 +00:00
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
2021-03-04 17:38:12 +00:00
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
2023-08-14 16:25:52 +00:00
cache->clear();
break;
2024-02-20 10:45:42 +00:00
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for JIT compilation");
#endif
case Type::DROP_S3_CLIENT_CACHE:
2024-02-20 10:45:42 +00:00
#if USE_AWS_S3
getContext()->checkAccess(AccessType::SYSTEM_DROP_S3_CLIENT_CACHE);
S3::ClientCacheRegistry::instance().clearCacheForAll();
break;
2024-02-20 10:45:42 +00:00
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for AWS S3");
#endif
2022-03-30 11:47:44 +00:00
case Type::DROP_FILESYSTEM_CACHE:
{
2022-06-22 16:00:15 +00:00
getContext()->checkAccess(AccessType::SYSTEM_DROP_FILESYSTEM_CACHE);
2024-01-24 17:26:11 +00:00
const auto user_id = FileCache::getCommonUser().user_id;
2023-04-15 11:08:49 +00:00
if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
2022-03-21 13:56:38 +00:00
for (const auto & [_, cache_data] : caches)
2024-01-24 17:26:11 +00:00
cache_data->cache->removeAllReleasable(user_id);
}
else
{
2023-12-07 11:07:01 +00:00
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
if (query.key_to_drop.empty())
{
2024-01-24 17:26:11 +00:00
cache->removeAllReleasable(user_id);
}
else
{
auto key = FileCacheKey::fromKeyString(query.key_to_drop);
if (query.offset_to_drop.has_value())
2024-01-24 17:26:11 +00:00
cache->removeFileSegment(key, query.offset_to_drop.value(), user_id);
else
2024-01-24 17:26:11 +00:00
cache->removeKey(key, user_id);
}
2022-06-21 17:30:21 +00:00
}
break;
}
2023-06-29 14:41:36 +00:00
case Type::SYNC_FILESYSTEM_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_SYNC_FILESYSTEM_CACHE);
2023-08-08 16:43:01 +00:00
ColumnsDescription columns{NamesAndTypesList{
{"cache_name", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"size", std::make_shared<DataTypeUInt64>()},
}};
2023-06-29 14:41:36 +00:00
Block sample_block;
for (const auto & column : columns)
sample_block.insert({column.type->createColumn(), column.type, column.name});
MutableColumns res_columns = sample_block.cloneEmptyColumns();
2023-12-04 17:50:37 +00:00
auto fill_data = [&](const std::string & cache_name, const FileCachePtr & cache, const std::vector<FileSegment::Info> & file_segments)
2023-08-08 16:43:01 +00:00
{
for (const auto & file_segment : file_segments)
{
size_t i = 0;
2024-01-17 11:57:40 +00:00
const auto path = cache->getFileSegmentPath(
file_segment.key, file_segment.offset, file_segment.kind,
FileCache::UserInfo(file_segment.user_id, file_segment.user_weight));
2023-08-08 16:43:01 +00:00
res_columns[i++]->insert(cache_name);
res_columns[i++]->insert(path);
2023-12-04 17:50:37 +00:00
res_columns[i++]->insert(file_segment.downloaded_size);
2023-08-08 16:43:01 +00:00
}
};
2023-06-29 14:41:36 +00:00
if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [cache_name, cache_data] : caches)
{
auto file_segments = cache_data->cache->sync();
2023-08-08 16:43:01 +00:00
fill_data(cache_name, cache_data->cache, file_segments);
2023-06-29 14:41:36 +00:00
}
}
else
{
2023-12-07 11:07:01 +00:00
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
2023-06-29 14:41:36 +00:00
auto file_segments = cache->sync();
2023-08-08 16:43:01 +00:00
fill_data(query.filesystem_cache_name, cache, file_segments);
2023-06-29 14:41:36 +00:00
}
size_t num_rows = res_columns[0]->size();
auto source = std::make_shared<SourceFromSingleChunk>(sample_block, Chunk(std::move(res_columns), num_rows));
result.pipeline = QueryPipeline(std::move(source));
break;
}
case Type::DROP_DISK_METADATA_CACHE:
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
}
case Type::DROP_PAGE_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_PAGE_CACHE);
getContext()->dropPageCache();
break;
}
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_storage.empty())
caches_to_drop = {"FILE", "S3", "HDFS", "URL", "AZURE"};
else
caches_to_drop = {query.schema_cache_storage};
if (caches_to_drop.contains("FILE"))
StorageFile::getSchemaCache(getContext()).clear();
2022-08-08 12:17:18 +00:00
#if USE_AWS_S3
if (caches_to_drop.contains("S3"))
StorageS3::getSchemaCache(getContext()).clear();
2022-08-08 12:17:18 +00:00
#endif
#if USE_HDFS
if (caches_to_drop.contains("HDFS"))
StorageHDFS::getSchemaCache(getContext()).clear();
2022-08-08 12:17:18 +00:00
#endif
if (caches_to_drop.contains("URL"))
StorageURL::getSchemaCache(getContext()).clear();
#if USE_AZURE_BLOB_STORAGE
if (caches_to_drop.contains("AZURE"))
StorageAzureBlob::getSchemaCache(getContext()).clear();
#endif
break;
}
case Type::DROP_FORMAT_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FORMAT_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_format.empty())
caches_to_drop = {"Protobuf"};
else
caches_to_drop = {query.schema_cache_format};
#if USE_PROTOBUF
if (caches_to_drop.contains("Protobuf"))
ProtobufSchemas::instance().clear();
#endif
break;
}
case Type::RELOAD_DICTIONARY:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
2021-03-19 12:47:27 +00:00
auto & external_dictionaries_loader = system_context->getExternalDictionariesLoader();
2021-10-12 18:11:00 +00:00
external_dictionaries_loader.reloadDictionary(query.getTable(), getContext());
2021-03-19 12:47:27 +00:00
2020-03-03 08:32:58 +00:00
ExternalDictionariesLoader::resetAll();
break;
2021-01-07 19:19:33 +00:00
}
case Type::RELOAD_DICTIONARIES:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
2023-08-11 01:06:07 +00:00
executeCommandsAndThrowIfError({
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
[&] { system_context->getExternalDictionariesLoader().reloadAllTriedToLoad(); },
[&] { system_context->getEmbeddedDictionaries().reload(); }
2023-08-11 01:06:07 +00:00
});
2020-03-03 08:32:58 +00:00
ExternalDictionariesLoader::resetAll();
break;
2021-01-07 19:19:33 +00:00
}
2021-04-16 09:56:40 +00:00
case Type::RELOAD_MODEL:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_MODEL);
auto bridge_helper = std::make_unique<CatBoostLibraryBridgeHelper>(getContext(), query.target_model);
bridge_helper->removeModel();
2021-04-16 09:56:40 +00:00
break;
}
case Type::RELOAD_MODELS:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_MODEL);
auto bridge_helper = std::make_unique<CatBoostLibraryBridgeHelper>(getContext());
bridge_helper->removeAllModels();
2021-04-16 09:56:40 +00:00
break;
}
case Type::RELOAD_FUNCTION:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_FUNCTION);
auto & external_user_defined_executable_functions_loader = system_context->getExternalUserDefinedExecutableFunctionsLoader();
external_user_defined_executable_functions_loader.reloadFunction(query.target_function);
break;
}
case Type::RELOAD_FUNCTIONS:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_FUNCTION);
auto & external_user_defined_executable_functions_loader = system_context->getExternalUserDefinedExecutableFunctionsLoader();
external_user_defined_executable_functions_loader.reloadAllTriedToLoad();
break;
}
case Type::RELOAD_EMBEDDED_DICTIONARIES:
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_EMBEDDED_DICTIONARIES);
system_context->getEmbeddedDictionaries().reload();
break;
case Type::RELOAD_CONFIG:
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_CONFIG);
system_context->reloadConfig();
break;
2022-09-16 11:19:39 +00:00
case Type::RELOAD_USERS:
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_USERS);
system_context->getAccessControl().reload(AccessControl::ReloadMode::ALL);
break;
case Type::RELOAD_ASYNCHRONOUS_METRICS:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_ASYNCHRONOUS_METRICS);
auto * asynchronous_metrics = system_context->getAsynchronousMetrics();
if (asynchronous_metrics)
asynchronous_metrics->update(std::chrono::system_clock::now(), /*force_update*/ true);
break;
}
case Type::STOP_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMerge, false);
break;
case Type::START_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMerge, true);
break;
2019-08-01 15:36:12 +00:00
case Type::STOP_TTL_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsTTLMerge, false);
2019-08-01 15:36:12 +00:00
break;
case Type::START_TTL_MERGES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsTTLMerge, true);
2019-08-01 15:36:12 +00:00
break;
2019-09-03 14:50:49 +00:00
case Type::STOP_MOVES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMove, false);
2019-09-03 14:50:49 +00:00
break;
case Type::START_MOVES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsMove, true);
2019-09-03 14:50:49 +00:00
break;
case Type::STOP_FETCHES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsFetch, false);
break;
case Type::START_FETCHES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsFetch, true);
break;
case Type::STOP_REPLICATED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsSend, false);
break;
2019-02-02 11:28:43 +00:00
case Type::START_REPLICATED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::PartsSend, true);
break;
case Type::STOP_REPLICATION_QUEUES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::ReplicationQueue, false);
break;
case Type::START_REPLICATION_QUEUES:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::ReplicationQueue, true);
break;
2019-04-22 15:11:16 +00:00
case Type::STOP_DISTRIBUTED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::DistributedSend, false);
2019-04-22 15:11:16 +00:00
break;
case Type::START_DISTRIBUTED_SENDS:
2020-03-04 20:29:52 +00:00
startStopAction(ActionLocks::DistributedSend, true);
2019-04-22 15:11:16 +00:00
break;
case Type::STOP_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, false);
break;
case Type::START_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, true);
break;
2023-09-20 16:10:00 +00:00
case Type::STOP_CLEANUP:
startStopAction(ActionLocks::Cleanup, false);
break;
case Type::START_CLEANUP:
startStopAction(ActionLocks::Cleanup, true);
break;
2023-02-12 19:17:55 +00:00
case Type::START_VIEW:
2023-11-23 05:08:44 +00:00
case Type::START_VIEWS:
2023-02-12 19:17:55 +00:00
startStopAction(ActionLocks::ViewRefresh, true);
break;
case Type::STOP_VIEW:
2023-11-23 05:08:44 +00:00
case Type::STOP_VIEWS:
2023-02-12 19:17:55 +00:00
startStopAction(ActionLocks::ViewRefresh, false);
break;
case Type::REFRESH_VIEW:
getRefreshTask()->run();
break;
case Type::CANCEL_VIEW:
getRefreshTask()->cancel();
break;
2023-11-29 02:32:41 +00:00
case Type::TEST_VIEW:
getRefreshTask()->setFakeTime(query.fake_time_for_view);
break;
2020-05-17 12:44:22 +00:00
case Type::DROP_REPLICA:
dropReplica(query);
break;
case Type::DROP_DATABASE_REPLICA:
dropDatabaseReplica(query);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
case Type::SYNC_DATABASE_REPLICA:
syncReplicatedDatabase(query);
break;
case Type::REPLICA_UNREADY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::REPLICA_READY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
2022-05-25 20:20:13 +00:00
case Type::SYNC_TRANSACTION_LOG:
syncTransactionLog();
break;
case Type::FLUSH_DISTRIBUTED:
flushDistributed(query);
2019-04-22 15:11:16 +00:00
break;
case Type::RESTART_REPLICAS:
restartReplicas(system_context);
break;
case Type::RESTART_REPLICA:
restartReplica(table_id, system_context);
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
break;
case Type::RESTORE_REPLICA:
restoreReplica();
break;
case Type::WAIT_LOADING_PARTS:
waitLoadingParts();
break;
2021-04-20 19:23:54 +00:00
case Type::RESTART_DISK:
restartDisk(query.disk);
case Type::FLUSH_LOGS:
2021-01-07 19:19:33 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
2023-08-11 01:06:07 +00:00
auto logs = getContext()->getSystemLogs();
2023-08-11 12:45:12 +00:00
std::vector<std::function<void()>> commands;
commands.reserve(logs.size());
2023-08-11 01:06:07 +00:00
for (auto * system_log : logs)
2023-08-11 12:45:12 +00:00
commands.emplace_back([system_log] { system_log->flush(true); });
2023-08-11 01:06:07 +00:00
executeCommandsAndThrowIfError(commands);
break;
2021-01-07 19:19:33 +00:00
}
case Type::STOP_LISTEN:
getContext()->checkAccess(AccessType::SYSTEM_LISTEN);
getContext()->stopServers(query.server_type);
break;
case Type::START_LISTEN:
getContext()->checkAccess(AccessType::SYSTEM_LISTEN);
getContext()->startServers(query.server_type);
break;
2023-04-25 21:30:03 +00:00
case Type::FLUSH_ASYNC_INSERT_QUEUE:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_ASYNC_INSERT_QUEUE);
2024-01-22 23:30:42 +00:00
auto * queue = getContext()->tryGetAsynchronousInsertQueue();
2023-04-25 21:30:03 +00:00
if (!queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot flush asynchronous insert queue because it is not initialized");
queue->flushAll();
break;
}
case Type::STOP_THREAD_FUZZER:
getContext()->checkAccess(AccessType::SYSTEM_THREAD_FUZZER);
ThreadFuzzer::stop();
2024-04-11 11:48:35 +00:00
CannotAllocateThreadFaultInjector::setFaultProbability(0);
break;
case Type::START_THREAD_FUZZER:
getContext()->checkAccess(AccessType::SYSTEM_THREAD_FUZZER);
ThreadFuzzer::start();
2024-04-11 11:48:35 +00:00
CannotAllocateThreadFaultInjector::setFaultProbability(getContext()->getServerSettings().cannot_allocate_thread_fault_injection_probability);
break;
2022-06-08 12:09:59 +00:00
case Type::UNFREEZE:
{
getContext()->checkAccess(AccessType::SYSTEM_UNFREEZE);
/// The result contains information about deleted parts as a table. It is for compatibility with ALTER TABLE UNFREEZE query.
result = Unfreezer(getContext()).systemUnfreeze(query.backup_name);
2022-06-08 12:09:59 +00:00
break;
}
case Type::ENABLE_FAILPOINT:
{
getContext()->checkAccess(AccessType::SYSTEM_FAILPOINT);
2023-05-08 21:02:36 +00:00
FailPointInjection::enableFailPoint(query.fail_point_name);
break;
}
case Type::DISABLE_FAILPOINT:
{
getContext()->checkAccess(AccessType::SYSTEM_FAILPOINT);
2023-05-08 21:02:36 +00:00
FailPointInjection::disableFailPoint(query.fail_point_name);
break;
}
2024-03-27 17:56:13 +00:00
case Type::WAIT_FAILPOINT:
{
getContext()->checkAccess(AccessType::SYSTEM_FAILPOINT);
LOG_TRACE(log, "waiting for failpoint {}", query.fail_point_name);
FailPointInjection::pauseFailPoint(query.fail_point_name);
LOG_TRACE(log, "finished failpoint {}", query.fail_point_name);
break;
}
case Type::RESET_COVERAGE:
{
getContext()->checkAccess(AccessType::SYSTEM);
resetCoverage();
break;
}
#if USE_JEMALLOC
case Type::JEMALLOC_PURGE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
purgeJemallocArenas();
break;
}
case Type::JEMALLOC_ENABLE_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
setJemallocProfileActive(true);
break;
}
case Type::JEMALLOC_DISABLE_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
setJemallocProfileActive(false);
break;
}
case Type::JEMALLOC_FLUSH_PROFILE:
{
getContext()->checkAccess(AccessType::SYSTEM_JEMALLOC);
flushJemallocProfile("/tmp/jemalloc_clickhouse");
break;
}
2024-02-20 10:45:42 +00:00
#else
case Type::JEMALLOC_PURGE:
case Type::JEMALLOC_ENABLE_PROFILE:
case Type::JEMALLOC_DISABLE_PROFILE:
case Type::JEMALLOC_FLUSH_PROFILE:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without JEMalloc");
#endif
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown type of SYSTEM query");
}
2022-06-08 12:09:59 +00:00
return result;
}
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
void InterpreterSystemQuery::restoreReplica()
{
getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id);
const StoragePtr table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * const table_replicated_ptr = dynamic_cast<StorageReplicatedMergeTree *>(table_ptr.get());
if (table_replicated_ptr == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
2022-01-20 18:55:59 +00:00
table_replicated_ptr->restoreMetadataInZooKeeper();
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
}
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context)
{
LOG_TRACE(log, "Restarting replica {}", replica);
auto table_ddl_guard = DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName());
auto restart_replica_lock = DatabaseCatalog::instance().tryGetLockForRestartReplica(replica.getDatabaseName());
if (!restart_replica_lock)
throw Exception(ErrorCodes::ABORTED, "Database {} is being dropped or detached, will not restart replica {}",
backQuoteIfNeed(replica.getDatabaseName()), replica.getNameForLogs());
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica, getContext());
ASTPtr create_ast;
/// Detach actions
2020-03-04 20:29:52 +00:00
if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
return nullptr;
2023-11-02 17:30:32 +00:00
SCOPE_EXIT({
if (table)
table->is_being_restarted = false;
});
table->is_being_restarted = true;
2021-05-13 07:03:00 +00:00
table->flushAndShutdown();
2020-03-04 20:29:52 +00:00
{
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
create_ast = database->getCreateTableQuery(replica.table_name, getContext());
database->detachTable(system_context, replica.table_name);
}
2022-08-23 14:10:44 +00:00
UUID uuid = table->getStorageID().uuid;
2020-03-04 20:29:52 +00:00
table.reset();
2022-08-23 14:10:44 +00:00
database->waitDetachedTableNotInUse(uuid);
/// Attach actions
2020-03-04 20:29:52 +00:00
/// getCreateTableQuery must return canonical CREATE query representation, there are no need for AST postprocessing
auto & create = create_ast->as<ASTCreateQuery &>();
create.attach = true;
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, LoadingStrictnessLevel::ATTACH);
2020-03-04 20:29:52 +00:00
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
2020-04-02 01:00:21 +00:00
auto data_path = database->getTableDataPath(create);
2020-03-04 20:29:52 +00:00
table = StorageFactory::instance().get(create,
2020-04-02 01:00:21 +00:00
data_path,
2020-03-04 20:29:52 +00:00
system_context,
system_context->getGlobalContext(),
2020-03-04 20:29:52 +00:00
columns,
constraints,
LoadingStrictnessLevel::ATTACH);
database->attachTable(system_context, replica.table_name, table, data_path);
2020-03-04 20:29:52 +00:00
table->startup();
LOG_TRACE(log, "Restarted replica {}", replica);
2020-03-04 20:29:52 +00:00
return table;
}
void InterpreterSystemQuery::restartReplica(const StorageID & replica, ContextMutablePtr system_context)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
if (!tryRestartReplica(replica, system_context))
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), replica.getNameForLogs());
}
2021-05-31 14:49:02 +00:00
void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
{
2020-03-04 20:29:52 +00:00
std::vector<StorageID> replica_names;
auto & catalog = DatabaseCatalog::instance();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_RESTART_REPLICA);
for (auto & elem : catalog.getDatabases())
{
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
for (auto it = elem.second->getTablesIterator(getContext()); it->isValid(); it->next())
{
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
if (dynamic_cast<const StorageReplicatedMergeTree *>(it->table().get()))
{
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_RESTART_REPLICA, elem.first, it->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", "SYSTEM RESTART REPLICA", elem.first, it->name());
continue;
}
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
replica_names.emplace_back(it->databaseName(), it->name());
}
}
}
if (replica_names.empty())
return;
size_t threads = std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size());
LOG_DEBUG(log, "Will restart {} replicas using {} threads", replica_names.size(), threads);
ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, CurrentMetrics::RestartReplicaThreadsScheduled, threads);
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
for (auto & replica : replica_names)
{
pool.scheduleOrThrowOnError([&]() { tryRestartReplica(replica, system_context); });
}
pool.wait();
}
2020-05-17 12:44:22 +00:00
void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
{
2020-06-23 12:01:51 +00:00
if (query.replica.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name is empty");
2020-05-17 12:44:22 +00:00
if (!table_id.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
2020-05-17 12:44:22 +00:00
2020-06-23 12:01:51 +00:00
if (!dropReplicaImpl(query, table))
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
2020-05-17 12:44:22 +00:00
}
2021-11-11 13:28:18 +00:00
else if (query.database)
2020-05-17 12:44:22 +00:00
{
2021-10-12 18:11:00 +00:00
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.getDatabase());
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
2020-06-23 12:01:51 +00:00
dropReplicaImpl(query, iterator->table());
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);
2020-06-23 12:01:51 +00:00
for (auto & elem : databases)
{
2020-06-23 12:01:51 +00:00
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_DROP_REPLICA, elem.first, iterator->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", "SYSTEM DROP REPLICA", elem.first, iterator->name());
continue;
}
2020-06-23 12:01:51 +00:00
dropReplicaImpl(query, iterator->table());
}
2020-06-23 12:01:51 +00:00
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
else if (!query.replica_zk_path.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
String remote_replica_path = fs::path(query.replica_zk_path) / "replicas" / query.replica;
2020-06-23 12:01:51 +00:00
/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(iterator->table().get()))
{
2022-12-22 13:31:42 +00:00
ReplicatedTableStatus status;
storage_replicated->getStatus(status);
2020-06-23 12:01:51 +00:00
if (status.zookeeper_path == query.replica_zk_path)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"There is a local table {}, which has the same table path in ZooKeeper. "
"Please check the path in query. "
"If you want to drop replica "
"of this table, use `DROP TABLE` "
"or `SYSTEM DROP REPLICA 'name' FROM db.table`",
storage_replicated->getStorageID().getNameForLogs());
}
}
}
auto zookeeper = getContext()->getZooKeeper();
2020-06-23 12:01:51 +00:00
bool looks_like_table_path = zookeeper->exists(query.replica_zk_path + "/replicas") ||
zookeeper->exists(query.replica_zk_path + "/dropped");
if (!looks_like_table_path)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Specified path {} does not look like a table path",
query.replica_zk_path);
2020-06-23 12:01:51 +00:00
if (zookeeper->exists(remote_replica_path + "/is_active"))
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't remove replica: {}, because it's active", query.replica);
2020-06-23 12:01:51 +00:00
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
}
2020-06-23 12:01:51 +00:00
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid query");
2020-06-23 12:01:51 +00:00
}
2020-06-23 12:01:51 +00:00
bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table)
{
auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!storage_replicated)
return false;
2022-12-22 13:31:42 +00:00
ReplicatedTableStatus status;
2020-06-23 12:01:51 +00:00
storage_replicated->getStatus(status);
/// Do not allow to drop local replicas and active remote replicas
if (query.replica == status.replica_name)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"We can't drop local replica, please use `DROP TABLE` if you want "
"to clean the data and drop this replica");
2020-06-23 12:01:51 +00:00
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
2020-06-23 12:01:51 +00:00
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
return true;
2020-05-17 12:44:22 +00:00
}
void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
{
if (query.replica.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name is empty");
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query_)
{
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return;
String full_replica_name = query_.shard.empty() ? query_.replica
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica);
2023-04-07 16:26:23 +00:00
if (replicated->getFullReplicaName() != full_replica_name)
return;
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "There is a local database {}, which has the same path in ZooKeeper "
"and the same replica name. Please check the path in query. "
"If you want to drop replica of this database, use `DROP DATABASE`", replicated->getDatabaseName());
};
if (query.database)
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.getDatabase());
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, /*throw_if_noop*/ true);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);
for (auto & elem : databases)
{
DatabasePtr & database = elem.second;
auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get());
if (!replicated)
continue;
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_DROP_REPLICA, elem.first))
{
LOG_INFO(log, "Access {} denied, skipping database {}", "SYSTEM DROP REPLICA", elem.first);
continue;
}
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, /*throw_if_noop*/ false);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
else if (!query.replica_zk_path.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, /*throw_if_noop*/ true);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid query");
}
void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
{
getContext()->checkAccess(AccessType::SYSTEM_SYNC_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
2020-04-22 06:01:33 +00:00
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
2019-09-19 11:04:57 +00:00
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
auto sync_timeout = getContext()->getSettingsRef().receive_timeout.totalMilliseconds();
std::unordered_set<std::string> replicas(query.src_replicas.begin(), query.src_replicas.end());
if (!storage_replicated->waitForProcessingQueue(sync_timeout, query.sync_replica_mode, replicas))
{
2024-02-13 23:21:13 +00:00
LOG_ERROR(log, "SYNC REPLICA {}: Timed out.", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \
"See the 'receive_timeout' setting", table_id.getNameForLogs());
}
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "SYNC REPLICA {}: OK", table_id.getNameForLogs());
2019-09-19 11:04:57 +00:00
}
else
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
}
void InterpreterSystemQuery::waitLoadingParts()
{
getContext()->checkAccess(AccessType::SYSTEM_WAIT_LOADING_PARTS, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (auto * merge_tree = dynamic_cast<MergeTreeData *>(table.get()))
{
LOG_TRACE(log, "Waiting for loading of parts of table {}", table_id.getFullTableName());
merge_tree->waitForOutdatedPartsToBeLoaded();
LOG_TRACE(log, "Finished waiting for loading of parts of table {}", table_id.getFullTableName());
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Command WAIT LOADING PARTS is supported only for MergeTree table, but got: {}", table->getName());
}
}
void InterpreterSystemQuery::syncReplicatedDatabase(ASTSystemQuery & query)
{
const auto database_name = query.getDatabase();
2022-07-20 20:54:43 +00:00
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
auto database = DatabaseCatalog::instance().getDatabase(database_name);
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get()))
{
LOG_TRACE(log, "Synchronizing entries in the database replica's (name: {}) queue with the log", database_name);
if (!ptr->waitForReplicaToProcessAllEntries(getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
2022-07-20 20:54:43 +00:00
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC DATABASE REPLICA {}: database is readonly or command timed out. " \
"See the 'receive_timeout' setting", database_name);
}
LOG_TRACE(log, "SYNC DATABASE REPLICA {}: OK", database_name);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "SYSTEM SYNC DATABASE REPLICA query is intended to work only with Replicated engine");
}
2022-05-25 20:20:13 +00:00
void InterpreterSystemQuery::syncTransactionLog()
{
getContext()->checkTransactionsAreAllowed(/* explicit_tcl_query */ true);
TransactionLog::instance().sync();
}
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
2019-04-22 15:11:16 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
2019-04-22 15:11:16 +00:00
SettingsChanges settings_changes;
if (query.query_settings)
settings_changes = query.query_settings->as<ASTSetQuery>()->changes;
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, getContext()).get()))
storage_distributed->flushClusterNodesAllData(getContext(), settings_changes);
2019-04-22 15:11:16 +00:00
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table {} is not distributed", table_id.getNameForLogs());
}
[[noreturn]] void InterpreterSystemQuery::restartDisk(String &)
2021-04-20 19:23:54 +00:00
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_DISK);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
2021-04-20 19:23:54 +00:00
}
2023-02-12 19:17:55 +00:00
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
{
auto ctx = getContext();
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
auto task = ctx->getRefreshSet().getTask(table_id);
if (!task)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
return task;
}
2020-01-24 16:20:36 +00:00
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
const auto & query = query_ptr->as<const ASTSystemQuery &>();
using Type = ASTSystemQuery::Type;
AccessRightsElements required_access;
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
2020-01-24 16:20:36 +00:00
switch (query.type)
{
case Type::SHUTDOWN:
case Type::KILL:
2021-01-07 19:19:33 +00:00
case Type::SUSPEND:
2020-01-24 16:20:36 +00:00
{
required_access.emplace_back(AccessType::SYSTEM_SHUTDOWN);
2020-01-24 16:20:36 +00:00
break;
}
case Type::DROP_DNS_CACHE:
2024-03-03 13:22:40 +00:00
case Type::DROP_CONNECTIONS_CACHE:
case Type::DROP_MARK_CACHE:
case Type::DROP_MMAP_CACHE:
case Type::DROP_QUERY_CACHE:
case Type::DROP_COMPILED_EXPRESSION_CACHE:
2020-01-24 16:20:36 +00:00
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
2022-03-30 11:47:44 +00:00
case Type::DROP_FILESYSTEM_CACHE:
2023-06-29 14:41:36 +00:00
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_PAGE_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
2020-01-24 16:20:36 +00:00
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
2020-01-24 16:20:36 +00:00
break;
}
case Type::DROP_DISK_METADATA_CACHE:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
case Type::RELOAD_DICTIONARY:
case Type::RELOAD_DICTIONARIES:
2020-01-24 16:20:36 +00:00
case Type::RELOAD_EMBEDDED_DICTIONARIES:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_DICTIONARY);
2020-01-24 16:20:36 +00:00
break;
}
case Type::RELOAD_MODEL:
2021-04-16 09:56:40 +00:00
case Type::RELOAD_MODELS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_MODEL);
break;
}
case Type::RELOAD_FUNCTION:
case Type::RELOAD_FUNCTIONS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_FUNCTION);
break;
}
2020-01-24 16:20:36 +00:00
case Type::RELOAD_CONFIG:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_CONFIG);
2020-01-24 16:20:36 +00:00
break;
}
2022-09-16 11:19:39 +00:00
case Type::RELOAD_USERS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_USERS);
break;
}
case Type::RELOAD_ASYNCHRONOUS_METRICS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_ASYNCHRONOUS_METRICS);
break;
}
case Type::STOP_MERGES:
2020-01-24 16:20:36 +00:00
case Type::START_MERGES:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_MERGES);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_MERGES, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_TTL_MERGES:
2020-01-24 16:20:36 +00:00
case Type::START_TTL_MERGES:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_TTL_MERGES, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_MOVES:
2020-01-24 16:20:36 +00:00
case Type::START_MOVES:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_MOVES);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG);
else
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG, query.getDatabase(), query.getTable());
break;
}
2023-09-20 16:10:00 +00:00
case Type::STOP_CLEANUP:
case Type::START_CLEANUP:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG);
else
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_FETCHES:
2020-01-24 16:20:36 +00:00
case Type::START_FETCHES:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_FETCHES);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_FETCHES, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_DISTRIBUTED_SENDS:
2020-01-24 16:20:36 +00:00
case Type::START_DISTRIBUTED_SENDS:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_DISTRIBUTED_SENDS, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_REPLICATED_SENDS:
2020-01-24 16:20:36 +00:00
case Type::START_REPLICATED_SENDS:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_REPLICATED_SENDS, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::STOP_REPLICATION_QUEUES:
2020-01-24 16:20:36 +00:00
case Type::START_REPLICATION_QUEUES:
{
2021-11-11 13:28:18 +00:00
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES);
2020-01-24 16:20:36 +00:00
else
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
2023-02-12 19:17:55 +00:00
}
case Type::REFRESH_VIEW:
case Type::START_VIEW:
case Type::START_VIEWS:
case Type::STOP_VIEW:
case Type::STOP_VIEWS:
case Type::CANCEL_VIEW:
2023-11-29 02:32:41 +00:00
case Type::TEST_VIEW:
2023-02-12 19:17:55 +00:00
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_VIEWS);
else
required_access.emplace_back(AccessType::SYSTEM_VIEWS, query.getDatabase(), query.getTable());
break;
2020-01-24 16:20:36 +00:00
}
2020-05-17 12:44:22 +00:00
case Type::DROP_REPLICA:
case Type::DROP_DATABASE_REPLICA:
2020-05-17 12:44:22 +00:00
{
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase(), query.getTable());
2020-05-17 12:44:22 +00:00
break;
}
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
case Type::RESTORE_REPLICA:
{
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_RESTORE_REPLICA, query.getDatabase(), query.getTable());
SYSTEM RESTORE REPLICA replica [ON CLUSTER cluster] (#13652) * initial commit: add setting and stub * typo * added test stub * fix * wip merging new integration test and code proto * adding steps interpreters * adding firstly proposed solution (moving parts etc) * added checking zookeeper path existence * fixing the include * fixing and sorting includes * fixing outdated struct * fix the name * added ast ptr as level of indirection * fix ref * updating the changes * working on test stub * fix iterator -> reference * revert rocksdb submodule update * fixed show privileges test * updated the test stub * replaced rand() with thread_local_rng(), updated the tests updated the test fixed test config path test fix removed error messages fixed the test updated the test fixed string literal fixed literal typo: = * fixed the empty replica error message * updated the test and the code with logs * updated the possible test cases, updated * added the code/test milestone comments * updated the test (added more testcases) * replaced native assert with CH one * individual replicas recursive delete fix * updated the AS db.name AST * two small logging fixes * manually generated AST fixes * Updated the test, added the possible algo change * Some thoughts about optimizing the solution: ALTER MOVE PARTITION .. TO TABLE -> move to detached/ + ALTER ... ATTACH * fix * Removed the replica sync in test as it's invalid * Some test tweaks * tmp * Rewrote the algo by using the executeQuery instead of hand-crafting the ASTPtr. Two questions still active. * tr: logging active parts * Extracted the parts moving algo into a separate helper function * Fixed the test data and the queries slightly * Replaced query to system.parts to direct invocation, started building the test that breaks on various parts. * Added the case for tables when at least one replica is alive * Updated the test to test replicas restoration by detaching/attaching * Altered the test to check restoration without replica restart * Added the tables swap in the start if the server failed last time * Hotfix when only /replicas/replica... path was deleted * Restore ZK paths while creating a replicated MergeTree table * Updated the docs, fixed the algo for individual replicas restoration case * Initial parts table storage fix, tests sync fix * Reverted individual replica restoration to general algo * Slightly optimised getDataParts * Trying another solution with parts detaching * Rewrote algo without any steps, added ON CLUSTER support * Attaching parts from other replica on restoration * Getting part checksums from ZK * Removed ON CLUSTER, finished working solution * Multiple small changes after review * Fixing parallel test * Supporting rewritten form on cluster * Test fix * Moar logging * Using source replica as checksum provider * improve test, remove some code from parser * Trying solution with move to detached + forget * Moving all parts (not only Committed) to detached * Edited docs for RESTORE REPLICA * Re-merging * minor fixes Co-authored-by: Alexander Tokmakov <avtokmakov@yandex-team.ru>
2021-06-20 08:24:43 +00:00
break;
}
2020-01-24 16:20:36 +00:00
case Type::SYNC_REPLICA:
{
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::REPLICA_READY:
case Type::REPLICA_UNREADY:
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Not implemented");
2020-01-24 16:20:36 +00:00
case Type::RESTART_REPLICA:
{
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::RESTART_REPLICAS:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_REPLICA);
2020-01-24 16:20:36 +00:00
break;
}
case Type::WAIT_LOADING_PARTS:
{
required_access.emplace_back(AccessType::SYSTEM_WAIT_LOADING_PARTS, query.getDatabase(), query.getTable());
break;
}
case Type::SYNC_DATABASE_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_DATABASE_REPLICA, query.getDatabase());
break;
}
2022-05-25 20:20:13 +00:00
case Type::SYNC_TRANSACTION_LOG:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_TRANSACTION_LOG);
break;
}
2020-01-24 16:20:36 +00:00
case Type::FLUSH_DISTRIBUTED:
{
2021-10-12 18:11:00 +00:00
required_access.emplace_back(AccessType::SYSTEM_FLUSH_DISTRIBUTED, query.getDatabase(), query.getTable());
2020-01-24 16:20:36 +00:00
break;
}
case Type::FLUSH_LOGS:
{
required_access.emplace_back(AccessType::SYSTEM_FLUSH_LOGS);
2020-01-24 16:20:36 +00:00
break;
}
2023-04-25 21:30:03 +00:00
case Type::FLUSH_ASYNC_INSERT_QUEUE:
{
required_access.emplace_back(AccessType::SYSTEM_FLUSH_ASYNC_INSERT_QUEUE);
break;
}
2021-04-20 19:23:54 +00:00
case Type::RESTART_DISK:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_DISK);
break;
}
2022-06-08 12:09:59 +00:00
case Type::UNFREEZE:
{
required_access.emplace_back(AccessType::SYSTEM_UNFREEZE);
break;
}
2023-01-27 01:10:40 +00:00
case Type::SYNC_FILE_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_FILE_CACHE);
break;
}
case Type::STOP_LISTEN:
case Type::START_LISTEN:
{
required_access.emplace_back(AccessType::SYSTEM_LISTEN);
break;
}
case Type::JEMALLOC_PURGE:
case Type::JEMALLOC_ENABLE_PROFILE:
case Type::JEMALLOC_DISABLE_PROFILE:
case Type::JEMALLOC_FLUSH_PROFILE:
2024-01-11 11:37:24 +00:00
{
required_access.emplace_back(AccessType::SYSTEM_JEMALLOC);
break;
}
case Type::STOP_THREAD_FUZZER:
case Type::START_THREAD_FUZZER:
case Type::ENABLE_FAILPOINT:
2024-03-27 17:56:13 +00:00
case Type::WAIT_FAILPOINT:
2024-01-11 11:37:24 +00:00
case Type::DISABLE_FAILPOINT:
case Type::RESET_COVERAGE:
case Type::UNKNOWN:
2020-01-24 16:20:36 +00:00
case Type::END: break;
}
return required_access;
}
void registerInterpreterSystemQuery(InterpreterFactory & factory)
{
auto create_fn = [] (const InterpreterFactory::Arguments & args)
{
return std::make_unique<InterpreterSystemQuery>(args.query, args.context);
};
factory.registerInterpreter("InterpreterSystemQuery", create_fn);
}
}