mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-09 17:14:47 +00:00
Don't leave an empty znode when replicated table is dropped
This commit is contained in:
parent
09f22920d9
commit
9a3adc70bd
@ -1723,11 +1723,10 @@ std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts
|
||||
|
||||
String extractZooKeeperName(const String & path)
|
||||
{
|
||||
static constexpr auto default_zookeeper_name = "default";
|
||||
if (path.empty())
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path should not be empty");
|
||||
if (path[0] == '/')
|
||||
return default_zookeeper_name;
|
||||
return String(DEFAULT_ZOOKEEPER_NAME);
|
||||
auto pos = path.find(":/");
|
||||
if (pos != String::npos && pos < path.find('/'))
|
||||
{
|
||||
@ -1736,7 +1735,7 @@ String extractZooKeeperName(const String & path)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'");
|
||||
return zookeeper_name;
|
||||
}
|
||||
return default_zookeeper_name;
|
||||
return String(DEFAULT_ZOOKEEPER_NAME);
|
||||
}
|
||||
|
||||
String extractZooKeeperPath(const String & path, bool check_starts_with_slash, LoggerPtr log)
|
||||
|
@ -47,6 +47,10 @@ namespace zkutil
|
||||
/// Preferred size of multi command (in the number of operations)
|
||||
constexpr size_t MULTI_BATCH_SIZE = 100;
|
||||
|
||||
/// Path "default:/foo" refers to znode "/foo" in the default zookeeper,
|
||||
/// path "other:/foo" refers to znode "/foo" in auxiliary zookeeper named "other".
|
||||
constexpr std::string_view DEFAULT_ZOOKEEPER_NAME = "default";
|
||||
|
||||
struct ShuffleHost
|
||||
{
|
||||
enum AvailabilityZoneInfo
|
||||
|
@ -3743,6 +3743,11 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
|
||||
return zookeeper->second;
|
||||
}
|
||||
|
||||
std::shared_ptr<zkutil::ZooKeeper> Context::getDefaultOrAuxiliaryZooKeeper(const String & name) const
|
||||
{
|
||||
return name == zkutil::DEFAULT_ZOOKEEPER_NAME ? getZooKeeper() : getAuxiliaryZooKeeper(name);
|
||||
}
|
||||
|
||||
|
||||
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
|
||||
{
|
||||
|
@ -1004,6 +1004,8 @@ public:
|
||||
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
|
||||
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
|
||||
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
|
||||
/// If name == "default", same as getZooKeeper(), otherwise same as getAuxiliaryZooKeeper().
|
||||
std::shared_ptr<zkutil::ZooKeeper> getDefaultOrAuxiliaryZooKeeper(const String & name) const;
|
||||
/// return Auxiliary Zookeeper map
|
||||
std::map<String, zkutil::ZooKeeperPtr> getAuxiliaryZooKeepers() const;
|
||||
|
||||
|
@ -1464,7 +1464,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
{
|
||||
if (!getContext()->getSettingsRef().allow_experimental_refreshable_materialized_view)
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use.");
|
||||
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use");
|
||||
|
||||
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
|
||||
visitor.visit(*create.refresh_strategy);
|
||||
|
@ -1005,7 +1005,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
|
||||
{
|
||||
ReplicatedTableStatus status;
|
||||
storage_replicated->getStatus(status);
|
||||
if (status.zookeeper_path == query.replica_zk_path)
|
||||
if (status.zookeeper_info.path == query.replica_zk_path)
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
|
||||
"There is a local table {}, which has the same table path in ZooKeeper. "
|
||||
"Please check the path in query. "
|
||||
@ -1028,7 +1028,10 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
|
||||
if (zookeeper->exists(remote_replica_path + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't remove replica: {}, because it's active", query.replica);
|
||||
|
||||
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
|
||||
TableZnodeInfo info;
|
||||
info.path = query.replica_zk_path;
|
||||
info.replica_name = query.replica;
|
||||
StorageReplicatedMergeTree::dropReplica(zookeeper, info, log);
|
||||
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
|
||||
}
|
||||
else
|
||||
@ -1045,12 +1048,12 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
|
||||
storage_replicated->getStatus(status);
|
||||
|
||||
/// Do not allow to drop local replicas and active remote replicas
|
||||
if (query.replica == status.replica_name)
|
||||
if (query.replica == status.zookeeper_info.replica_name)
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
|
||||
"We can't drop local replica, please use `DROP TABLE` if you want "
|
||||
"to clean the data and drop this replica");
|
||||
|
||||
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
|
||||
storage_replicated->dropReplica(query.replica, log);
|
||||
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
|
||||
|
||||
return true;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
@ -16,9 +17,7 @@ struct ReplicatedTableStatus
|
||||
|
||||
ReplicatedMergeTreeQueue::Status queue;
|
||||
UInt32 parts_to_check;
|
||||
String zookeeper_name;
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
TableZnodeInfo zookeeper_info;
|
||||
String replica_path;
|
||||
Int32 columns_version;
|
||||
UInt64 log_max_index;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -88,32 +89,23 @@ See details in documentation: https://clickhouse.com/docs/en/engines/table-engin
|
||||
If you use the Replicated version of engines, see https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication/.
|
||||
)";
|
||||
|
||||
static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_zookeeper_path, ContextMutablePtr context)
|
||||
static ColumnsDescription getColumnsDescriptionFromZookeeper(const TableZnodeInfo & zookeeper_info, ContextMutablePtr context)
|
||||
{
|
||||
String zookeeper_name = zkutil::extractZooKeeperName(raw_zookeeper_path);
|
||||
String zookeeper_path = zkutil::extractZooKeeperPath(raw_zookeeper_path, true);
|
||||
|
||||
if (!context->hasZooKeeper() && !context->hasAuxiliaryZooKeeper(zookeeper_name))
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure without zookeeper, you must specify the structure manually"};
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
try
|
||||
{
|
||||
if (zookeeper_name == StorageReplicatedMergeTree::getDefaultZooKeeperName())
|
||||
zookeeper = context->getZooKeeper();
|
||||
else
|
||||
zookeeper = context->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
zookeeper = context->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure from zookeeper, because cannot get zookeeper: {}. You must specify structure manually", getCurrentExceptionMessage(false)};
|
||||
}
|
||||
|
||||
if (!zookeeper->exists(zookeeper_path + "/replicas"))
|
||||
if (!zookeeper->exists(zookeeper_info.path + "/replicas"))
|
||||
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure, because there no other replicas in zookeeper. You must specify the structure manually"};
|
||||
|
||||
Coordination::Stat columns_stat;
|
||||
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_path) / "columns", &columns_stat));
|
||||
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_info.path) / "columns", &columns_stat));
|
||||
}
|
||||
|
||||
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
|
||||
@ -184,23 +176,16 @@ static std::string_view getNamePart(const String & engine_name)
|
||||
/// Extracts zookeeper path and replica name from the table engine's arguments.
|
||||
/// The function can modify those arguments (that's why they're passed separately in `engine_args`) and also determines RenamingRestrictions.
|
||||
/// The function assumes the table engine is Replicated.
|
||||
static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
static TableZnodeInfo extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
const ASTCreateQuery & query,
|
||||
const StorageID & table_id,
|
||||
const String & engine_name,
|
||||
ASTs & engine_args,
|
||||
LoadingStrictnessLevel mode,
|
||||
const ContextPtr & local_context,
|
||||
String & zookeeper_path,
|
||||
String & replica_name,
|
||||
RenamingRestrictions & renaming_restrictions)
|
||||
const ContextPtr & local_context)
|
||||
{
|
||||
chassert(isReplicated(engine_name));
|
||||
|
||||
zookeeper_path = "";
|
||||
replica_name = "";
|
||||
renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
|
||||
bool is_extended_storage_def = isExtendedStorageDef(query);
|
||||
|
||||
if (is_extended_storage_def)
|
||||
@ -210,62 +195,12 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
evaluateEngineArgs(engine_args, local_context);
|
||||
}
|
||||
|
||||
bool is_on_cluster = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
|
||||
|
||||
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
|
||||
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
|
||||
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
|
||||
|
||||
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
|
||||
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name, String zookeeper_path, String replica_name) -> TableZnodeInfo
|
||||
{
|
||||
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
Macros::MacroExpansionInfo info;
|
||||
/// NOTE: it's not recursive
|
||||
info.expand_special_macros_only = true;
|
||||
info.table_id = table_id;
|
||||
/// Avoid unfolding {uuid} macro on this step.
|
||||
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
|
||||
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
|
||||
|
||||
info.level = 0;
|
||||
replica_name = local_context->getMacros()->expand(replica_name, info);
|
||||
}
|
||||
|
||||
ast_zk_path->value = zookeeper_path;
|
||||
ast_replica_name->value = replica_name;
|
||||
|
||||
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
|
||||
/// to make possible copying metadata files between replicas.
|
||||
Macros::MacroExpansionInfo info;
|
||||
info.table_id = table_id;
|
||||
if (is_replicated_database)
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
info.shard = getReplicatedDatabaseShardName(database);
|
||||
info.replica = getReplicatedDatabaseReplicaName(database);
|
||||
}
|
||||
if (!allow_uuid_macro)
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
|
||||
|
||||
info.level = 0;
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
replica_name = local_context->getMacros()->expand(replica_name, info);
|
||||
|
||||
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
|
||||
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
|
||||
/// or if one of these macros is recursively expanded from some other macro.
|
||||
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
|
||||
if (info.expanded_database || info.expanded_table)
|
||||
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
|
||||
else if (info.expanded_uuid)
|
||||
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
|
||||
TableZnodeInfo res = TableZnodeInfo::resolve(zookeeper_path, replica_name, table_id, query, mode, local_context);
|
||||
ast_zk_path->value = res.full_path_for_metadata;
|
||||
ast_replica_name->value = res.replica_name_for_metadata;
|
||||
return res;
|
||||
};
|
||||
|
||||
size_t arg_num = 0;
|
||||
@ -293,27 +228,22 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
|
||||
/// Get path and name from engine arguments
|
||||
auto * ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
|
||||
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
|
||||
zookeeper_path = ast_zk_path->value.safeGet<String>();
|
||||
else
|
||||
if (!ast_zk_path || ast_zk_path->value.getType() != Field::Types::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
|
||||
|
||||
auto * ast_replica_name = engine_args[arg_num + 1]->as<ASTLiteral>();
|
||||
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
|
||||
replica_name = ast_replica_name->value.safeGet<String>();
|
||||
else
|
||||
if (!ast_replica_name || ast_replica_name->value.getType() != Field::Types::String)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
|
||||
|
||||
|
||||
if (is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
|
||||
{
|
||||
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
|
||||
"with default arguments", zookeeper_path, replica_name);
|
||||
engine_args[arg_num]->as<ASTLiteral>()->value = zookeeper_path = server_settings.default_replica_path;
|
||||
engine_args[arg_num + 1]->as<ASTLiteral>()->value = replica_name = server_settings.default_replica_name;
|
||||
ast_zk_path->value = server_settings.default_replica_path;
|
||||
ast_replica_name->value = server_settings.default_replica_name;
|
||||
}
|
||||
|
||||
expand_macro(ast_zk_path, ast_replica_name);
|
||||
return expand_macro(ast_zk_path, ast_replica_name, ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
|
||||
}
|
||||
else if (is_extended_storage_def
|
||||
&& (arg_cnt == 0
|
||||
@ -322,24 +252,24 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
{
|
||||
/// Try use default values if arguments are not specified.
|
||||
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
|
||||
zookeeper_path = server_settings.default_replica_path;
|
||||
/// TODO maybe use hostname if {replica} is not defined?
|
||||
replica_name = server_settings.default_replica_name;
|
||||
|
||||
/// Modify query, so default values will be written to metadata
|
||||
assert(arg_num == 0);
|
||||
ASTs old_args;
|
||||
std::swap(engine_args, old_args);
|
||||
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
|
||||
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
|
||||
auto path_arg = std::make_shared<ASTLiteral>("");
|
||||
auto name_arg = std::make_shared<ASTLiteral>("");
|
||||
auto * ast_zk_path = path_arg.get();
|
||||
auto * ast_replica_name = name_arg.get();
|
||||
|
||||
expand_macro(ast_zk_path, ast_replica_name);
|
||||
auto res = expand_macro(ast_zk_path, ast_replica_name, server_settings.default_replica_path, server_settings.default_replica_name);
|
||||
|
||||
engine_args.emplace_back(std::move(path_arg));
|
||||
engine_args.emplace_back(std::move(name_arg));
|
||||
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
|
||||
|
||||
return res;
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
|
||||
@ -363,15 +293,11 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = engine_arg->clone();
|
||||
|
||||
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE;
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
RenamingRestrictions renaming_restrictions;
|
||||
|
||||
try
|
||||
{
|
||||
extractZooKeeperPathAndReplicaNameFromEngineArgs(query, table_id, engine_name, engine_args, mode, local_context,
|
||||
zookeeper_path, replica_name, renaming_restrictions);
|
||||
auto res = extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
query, table_id, engine_name, engine_args, LoadingStrictnessLevel::CREATE, local_context);
|
||||
return res.full_path;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -382,8 +308,6 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
return zookeeper_path;
|
||||
}
|
||||
|
||||
static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
@ -551,19 +475,17 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
}
|
||||
|
||||
/// Extract zookeeper path and replica name from engine arguments.
|
||||
String zookeeper_path;
|
||||
String replica_name;
|
||||
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
TableZnodeInfo zookeeper_info;
|
||||
|
||||
if (replicated)
|
||||
{
|
||||
extractZooKeeperPathAndReplicaNameFromEngineArgs(args.query, args.table_id, args.engine_name, args.engine_args, args.mode,
|
||||
args.getLocalContext(), zookeeper_path, replica_name, renaming_restrictions);
|
||||
zookeeper_info = extractZooKeeperPathAndReplicaNameFromEngineArgs(
|
||||
args.query, args.table_id, args.engine_name, args.engine_args, args.mode, args.getLocalContext());
|
||||
|
||||
if (replica_name.empty())
|
||||
if (zookeeper_info.replica_name.empty())
|
||||
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
|
||||
// '\t' and '\n' will interrupt parsing 'source replica' in ReplicatedMergeTreeLogEntryData::readText
|
||||
if (replica_name.find('\t') != String::npos || replica_name.find('\n') != String::npos)
|
||||
if (zookeeper_info.replica_name.find('\t') != String::npos || zookeeper_info.replica_name.find('\n') != String::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must not contain '\\t' or '\\n'");
|
||||
|
||||
arg_cnt = engine_args.size(); /// Update `arg_cnt` here because extractZooKeeperPathAndReplicaNameFromEngineArgs() could add arguments.
|
||||
@ -649,7 +571,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
|
||||
ColumnsDescription columns;
|
||||
if (args.columns.empty() && replicated)
|
||||
columns = getColumnsDescriptionFromZookeeper(zookeeper_path, context);
|
||||
columns = getColumnsDescriptionFromZookeeper(zookeeper_info, context);
|
||||
else
|
||||
columns = args.columns;
|
||||
|
||||
@ -879,8 +801,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
need_check_table_structure = txn->isInitialQuery();
|
||||
|
||||
return std::make_shared<StorageReplicatedMergeTree>(
|
||||
zookeeper_path,
|
||||
replica_name,
|
||||
zookeeper_info,
|
||||
args.mode,
|
||||
args.table_id,
|
||||
args.relative_data_path,
|
||||
@ -889,7 +810,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
date_column_name,
|
||||
merging_params,
|
||||
std::move(storage_settings),
|
||||
renaming_restrictions,
|
||||
need_check_table_structure);
|
||||
}
|
||||
else
|
||||
|
@ -211,7 +211,6 @@ namespace ActionLocks
|
||||
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
|
||||
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
|
||||
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
|
||||
const String StorageReplicatedMergeTree::default_zookeeper_name = "default";
|
||||
|
||||
void StorageReplicatedMergeTree::setZooKeeper()
|
||||
{
|
||||
@ -221,19 +220,10 @@ void StorageReplicatedMergeTree::setZooKeeper()
|
||||
/// strange effects. So we always use only one session for all tables.
|
||||
/// (excluding auxiliary zookeepers)
|
||||
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
{
|
||||
auto new_keeper = getContext()->getZooKeeper();
|
||||
auto new_keeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
|
||||
{
|
||||
@ -263,7 +253,7 @@ String StorageReplicatedMergeTree::getEndpointName() const
|
||||
{
|
||||
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
|
||||
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
|
||||
return zookeeper_name + ":" + replica_path;
|
||||
return zookeeper_info.zookeeper_name + ":" + replica_path;
|
||||
|
||||
return replica_path;
|
||||
}
|
||||
@ -294,8 +284,7 @@ static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom
|
||||
}
|
||||
|
||||
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
const TableZnodeInfo & zookeeper_info_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
@ -304,7 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure)
|
||||
: MergeTreeData(table_id_,
|
||||
metadata_,
|
||||
@ -315,11 +303,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
true, /// require_part_metadata
|
||||
mode,
|
||||
[this] (const std::string & name) { enqueuePartForCheck(name); })
|
||||
, full_zookeeper_path(zookeeper_path_)
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(full_zookeeper_path))
|
||||
, zookeeper_path(zkutil::extractZooKeeperPath(full_zookeeper_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
|
||||
, replica_name(replica_name_)
|
||||
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
|
||||
, zookeeper_info(zookeeper_info_)
|
||||
, zookeeper_path(zookeeper_info.path)
|
||||
, replica_name(zookeeper_info.replica_name)
|
||||
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name)
|
||||
, reader(*this)
|
||||
, writer(*this)
|
||||
, merger_mutator(*this)
|
||||
@ -331,7 +318,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
, part_check_thread(*this)
|
||||
, restarting_thread(*this)
|
||||
, part_moves_between_shards_orchestrator(*this)
|
||||
, renaming_restrictions(renaming_restrictions_)
|
||||
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
|
||||
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
|
||||
{
|
||||
@ -365,7 +351,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
/// Will be activated by restarting thread.
|
||||
mutations_finalizing_task->deactivate();
|
||||
|
||||
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
|
||||
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
if (has_zookeeper)
|
||||
{
|
||||
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
|
||||
@ -845,7 +831,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
|
||||
else
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
|
||||
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.load()))
|
||||
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, log.load()))
|
||||
{
|
||||
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
|
||||
continue;
|
||||
@ -1107,11 +1093,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
|
||||
|
||||
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const
|
||||
{
|
||||
zkutil::ZooKeeperPtr maybe_new_zookeeper;
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
maybe_new_zookeeper = getContext()->getZooKeeper();
|
||||
else
|
||||
maybe_new_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
zkutil::ZooKeeperPtr maybe_new_zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
|
||||
maybe_new_zookeeper->sync(zookeeper_path);
|
||||
return maybe_new_zookeeper;
|
||||
}
|
||||
@ -1226,7 +1208,7 @@ void StorageReplicatedMergeTree::drop()
|
||||
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
|
||||
}
|
||||
|
||||
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_path, replica_name, log.load(), getSettings(), &has_metadata_in_zookeeper);
|
||||
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_info, log.load(), getSettings(), &has_metadata_in_zookeeper);
|
||||
if (last_replica_dropped)
|
||||
{
|
||||
dropZookeeperZeroCopyLockPaths(zookeeper, zero_copy_locks_paths, log.load());
|
||||
@ -1235,13 +1217,15 @@ void StorageReplicatedMergeTree::drop()
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
LoggerPtr logger, MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
|
||||
bool StorageReplicatedMergeTree::dropReplica(
|
||||
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info, LoggerPtr logger,
|
||||
MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
|
||||
{
|
||||
if (zookeeper->expired())
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
|
||||
|
||||
auto remote_replica_path = zookeeper_path + "/replicas/" + replica;
|
||||
String zookeeper_path = zookeeper_info.path;
|
||||
auto remote_replica_path = zookeeper_path + "/replicas/" + zookeeper_info.replica_name;
|
||||
|
||||
LOG_INFO(logger, "Removing replica {}, marking it as lost", remote_replica_path);
|
||||
/// Mark itself lost before removing, because the following recursive removal may fail
|
||||
@ -1352,30 +1336,33 @@ bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
|
||||
LOG_INFO(logger, "Removing table {} (this might take several minutes)", zookeeper_path);
|
||||
removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, logger);
|
||||
removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, logger);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger)
|
||||
bool StorageReplicatedMergeTree::dropReplica(const String & drop_replica, LoggerPtr logger)
|
||||
{
|
||||
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
|
||||
|
||||
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
|
||||
/// However, the main use case is to drop dead replica, which cannot become active.
|
||||
/// This check prevents only from accidental drop of some other replica.
|
||||
if (zookeeper->exists(drop_zookeeper_path + "/replicas/" + drop_replica + "/is_active"))
|
||||
if (zookeeper->exists(zookeeper_info.path + "/replicas/" + drop_replica + "/is_active"))
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", drop_replica);
|
||||
|
||||
return dropReplica(zookeeper, drop_zookeeper_path, drop_replica, logger);
|
||||
TableZnodeInfo info = zookeeper_info;
|
||||
info.replica_name = drop_replica;
|
||||
return dropReplica(zookeeper, info, logger);
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper,
|
||||
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
|
||||
const TableZnodeInfo & zookeeper_info2, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
|
||||
{
|
||||
String zookeeper_path = zookeeper_info2.path;
|
||||
bool completely_removed = false;
|
||||
|
||||
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
|
||||
@ -1445,6 +1432,15 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
|
||||
LOG_INFO(logger, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
zookeeper_info2.dropAncestorZnodesIfNeeded(zookeeper);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_WARNING(logger, "Failed to drop ancestor znodes {} - {} after dropping table: {}", zookeeper_info2.path_prefix_for_drop, zookeeper_info2.path, getCurrentExceptionMessage(false));
|
||||
}
|
||||
|
||||
return completely_removed;
|
||||
}
|
||||
|
||||
@ -2295,7 +2291,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
|
||||
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
|
||||
if (!fetchPart(part_name,
|
||||
metadata_snapshot,
|
||||
zookeeper_name,
|
||||
zookeeper_info.zookeeper_name,
|
||||
source_replica_path,
|
||||
/* to_detached= */ false,
|
||||
entry.quorum,
|
||||
@ -2858,7 +2854,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
|
||||
interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
|
||||
part_desc->res_part = fetched_part;
|
||||
@ -2980,7 +2976,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
|
||||
interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
|
||||
replicated_fetches_throttler, true);
|
||||
@ -5076,7 +5072,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
currently_fetching_parts.erase(part_name);
|
||||
});
|
||||
|
||||
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
|
||||
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
|
||||
|
||||
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
||||
|
||||
@ -5109,7 +5105,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
|
||||
|
||||
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
|
||||
metadata_snapshot, getContext(), part_name, zookeeper_info.zookeeper_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
|
||||
@ -5148,7 +5144,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
|
||||
|
||||
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
|
||||
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
|
||||
return part;
|
||||
}
|
||||
|
||||
@ -6653,10 +6649,10 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped(ContextPtr query_context
|
||||
|
||||
void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_name) const
|
||||
{
|
||||
if (renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
|
||||
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
|
||||
return;
|
||||
|
||||
if (renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
|
||||
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
|
||||
{
|
||||
auto old_name = getStorageID();
|
||||
bool is_server_startup = Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER
|
||||
@ -6680,7 +6676,7 @@ void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_na
|
||||
"If you really want to rename table, you should edit metadata file first and restart server or reattach the table.");
|
||||
}
|
||||
|
||||
assert(renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
|
||||
assert(zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
|
||||
if (!new_name.hasUUID() && getStorageID().hasUUID())
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Cannot move Replicated table to Ordinary database, because zookeeper_path contains implicit "
|
||||
@ -7039,9 +7035,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
|
||||
/// NOTE: consider convert to UInt64
|
||||
res.parts_to_check = static_cast<UInt32>(part_check_thread.size());
|
||||
|
||||
res.zookeeper_name = zookeeper_name;
|
||||
res.zookeeper_path = zookeeper_path;
|
||||
res.replica_name = replica_name;
|
||||
res.zookeeper_info = zookeeper_info;
|
||||
res.replica_path = replica_path;
|
||||
res.columns_version = -1;
|
||||
|
||||
@ -7250,11 +7244,7 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
}
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
if (from_zookeeper_name != default_zookeeper_name)
|
||||
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
|
||||
else
|
||||
zookeeper = getZooKeeper();
|
||||
zkutil::ZooKeeperPtr zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(from_zookeeper_name);
|
||||
|
||||
if (from.back() == '/')
|
||||
from.resize(from.size() - 1);
|
||||
@ -10540,7 +10530,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
|
||||
auto coordination = backup_entries_collector.getBackupCoordination();
|
||||
|
||||
coordination->addReplicatedDataPath(full_zookeeper_path, data_path_in_backup);
|
||||
coordination->addReplicatedDataPath(zookeeper_info.full_path, data_path_in_backup);
|
||||
|
||||
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
|
||||
std::vector<PartNameAndChecksum> part_names_with_hashes;
|
||||
@ -10549,7 +10539,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
|
||||
|
||||
/// Send our list of part names to the coordination (to compare with other replicas).
|
||||
coordination->addReplicatedPartNames(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
|
||||
coordination->addReplicatedPartNames(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
|
||||
|
||||
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
|
||||
{
|
||||
@ -10591,13 +10581,13 @@ void StorageReplicatedMergeTree::backupData(
|
||||
}
|
||||
|
||||
if (!mutation_infos.empty())
|
||||
coordination->addReplicatedMutations(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
|
||||
coordination->addReplicatedMutations(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
|
||||
}
|
||||
}
|
||||
|
||||
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
|
||||
/// give us the final list of parts to add to the BackupEntriesCollector.
|
||||
auto post_collecting_task = [my_full_zookeeper_path = full_zookeeper_path,
|
||||
auto post_collecting_task = [my_full_zookeeper_path = zookeeper_info.full_path,
|
||||
my_replica_name = getReplicaName(),
|
||||
coordination,
|
||||
my_parts_backup_entries = std::move(parts_backup_entries),
|
||||
@ -10636,7 +10626,7 @@ void StorageReplicatedMergeTree::backupData(
|
||||
|
||||
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
|
||||
{
|
||||
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zookeeper_path))
|
||||
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(zookeeper_info.full_path))
|
||||
{
|
||||
/// Other replica is already restoring the data of this table.
|
||||
/// We'll get them later due to replication, it's not necessary to read it from the backup.
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
|
||||
#include <Storages/MergeTree/ReplicatedTableStatus.h>
|
||||
#include <Storages/RenamingRestrictions.h>
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
@ -98,8 +99,7 @@ public:
|
||||
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
|
||||
*/
|
||||
StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
const TableZnodeInfo & zookeeper_info_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
@ -108,7 +108,6 @@ public:
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure);
|
||||
|
||||
void startup() override;
|
||||
@ -244,13 +243,14 @@ public:
|
||||
/** Remove a specific replica from zookeeper.
|
||||
* returns true if there are no replicas left
|
||||
*/
|
||||
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
|
||||
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
|
||||
LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
|
||||
|
||||
bool dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger);
|
||||
bool dropReplica(const String & drop_replica, LoggerPtr logger);
|
||||
|
||||
/// Removes table from ZooKeeper after the last replica was dropped
|
||||
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
|
||||
static bool removeTableNodesFromZooKeeper(
|
||||
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
|
||||
|
||||
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
||||
@ -330,17 +330,15 @@ public:
|
||||
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
|
||||
|
||||
// Return default or custom zookeeper name for table
|
||||
const String & getZooKeeperName() const { return zookeeper_name; }
|
||||
const String & getZooKeeperPath() const { return zookeeper_path; }
|
||||
const String & getFullZooKeeperPath() const { return full_zookeeper_path; }
|
||||
const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
|
||||
const String & getZooKeeperPath() const { return zookeeper_info.path; }
|
||||
const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }
|
||||
|
||||
// Return table id, common for different replicas
|
||||
String getTableSharedID() const override;
|
||||
|
||||
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
|
||||
|
||||
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
|
||||
|
||||
/// Check if there are new broken disks and enqueue part recovery tasks.
|
||||
void checkBrokenDisks();
|
||||
|
||||
@ -418,12 +416,10 @@ private:
|
||||
|
||||
bool is_readonly_metric_set = false;
|
||||
|
||||
const String full_zookeeper_path;
|
||||
static const String default_zookeeper_name;
|
||||
const String zookeeper_name;
|
||||
const String zookeeper_path;
|
||||
const TableZnodeInfo zookeeper_info;
|
||||
const String zookeeper_path; // shorthand for zookeeper_info.path
|
||||
|
||||
const String replica_name;
|
||||
const String replica_name; // shorthand for zookeeper_info.replica_name
|
||||
const String replica_path;
|
||||
|
||||
/** /replicas/me/is_active.
|
||||
@ -519,9 +515,6 @@ private:
|
||||
/// True if replica was created for existing table with fixed granularity
|
||||
bool other_replicas_fixed_granularity = false;
|
||||
|
||||
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
|
||||
const RenamingRestrictions renaming_restrictions;
|
||||
|
||||
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
|
||||
/// speed.
|
||||
ThrottlerPtr replicated_fetches_throttler;
|
||||
|
@ -530,9 +530,9 @@ Chunk SystemReplicasSource::generate()
|
||||
res_columns[col_num++]->insert(status.is_session_expired);
|
||||
res_columns[col_num++]->insert(status.queue.future_parts);
|
||||
res_columns[col_num++]->insert(status.parts_to_check);
|
||||
res_columns[col_num++]->insert(status.zookeeper_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_path);
|
||||
res_columns[col_num++]->insert(status.replica_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.zookeeper_name);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.path);
|
||||
res_columns[col_num++]->insert(status.zookeeper_info.replica_name);
|
||||
res_columns[col_num++]->insert(status.replica_path);
|
||||
res_columns[col_num++]->insert(status.columns_version);
|
||||
res_columns[col_num++]->insert(status.queue.queue_size);
|
||||
|
135
src/Storages/TableZnodeInfo.cpp
Normal file
135
src/Storages/TableZnodeInfo.cpp
Normal file
@ -0,0 +1,135 @@
|
||||
#include <Storages/TableZnodeInfo.h>
|
||||
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Databases/DatabaseReplicatedHelpers.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
TableZnodeInfo TableZnodeInfo::resolve(const String & requested_path, const String & requested_replica_name, const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode, const ContextPtr & context)
|
||||
{
|
||||
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
|
||||
|
||||
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
|
||||
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
|
||||
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
|
||||
|
||||
TableZnodeInfo res;
|
||||
res.full_path = requested_path;
|
||||
res.replica_name = requested_replica_name;
|
||||
|
||||
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
Macros::MacroExpansionInfo info;
|
||||
/// NOTE: it's not recursive
|
||||
info.expand_special_macros_only = true;
|
||||
info.table_id = table_id;
|
||||
/// Avoid unfolding {uuid} macro on this step.
|
||||
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
|
||||
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.full_path = context->getMacros()->expand(res.full_path, info);
|
||||
|
||||
info.level = 0;
|
||||
res.replica_name = context->getMacros()->expand(res.replica_name, info);
|
||||
}
|
||||
|
||||
res.full_path_for_metadata = res.full_path;
|
||||
res.replica_name_for_metadata = res.replica_name;
|
||||
|
||||
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
|
||||
/// to make possible copying metadata files between replicas.
|
||||
Macros::MacroExpansionInfo info;
|
||||
info.table_id = table_id;
|
||||
if (is_replicated_database)
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
info.shard = getReplicatedDatabaseShardName(database);
|
||||
info.replica = getReplicatedDatabaseReplicaName(database);
|
||||
}
|
||||
if (!allow_uuid_macro)
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.full_path = context->getMacros()->expand(res.full_path, info);
|
||||
bool expanded_uuid_in_path = info.expanded_uuid;
|
||||
|
||||
info.level = 0;
|
||||
info.table_id.uuid = UUIDHelpers::Nil;
|
||||
res.replica_name = context->getMacros()->expand(res.replica_name, info);
|
||||
|
||||
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
|
||||
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
|
||||
/// or if one of these macros is recursively expanded from some other macro.
|
||||
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
|
||||
if (info.expanded_database || info.expanded_table)
|
||||
res.renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
|
||||
else if (info.expanded_uuid)
|
||||
res.renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
|
||||
|
||||
res.zookeeper_name = zkutil::extractZooKeeperName(res.full_path);
|
||||
res.path = zkutil::extractZooKeeperPath(res.full_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, getLogger(table_id.getNameForLogs()));
|
||||
res.path_prefix_for_drop = res.path;
|
||||
|
||||
if (expanded_uuid_in_path)
|
||||
{
|
||||
/// When dropping table with znode path "/foo/{uuid}/bar/baz", delete not only
|
||||
/// "/foo/{uuid}/bar/baz" but also "/foo/{uuid}/bar" and "/foo/{uuid}" if they became empty.
|
||||
///
|
||||
/// (We find the uuid substring by searching instead of keeping track of it when expanding
|
||||
/// the macro. So in principle we may find a uuid substring that wasn't expanded from a
|
||||
/// macro. This should be ok because we're searching for the *last* occurrence, so we'll get
|
||||
/// a prefix at least as long as the correct one, so we won't delete znodes outside the
|
||||
/// {uuid} path component. This sounds sketchy, but propagating string indices through macro
|
||||
/// expansion passes is sketchy too (error-prone and more complex), and on balance this seems
|
||||
/// better.)
|
||||
String uuid_str = toString(table_id.uuid);
|
||||
size_t i = res.path.rfind(uuid_str);
|
||||
if (i == String::npos)
|
||||
/// Possible if the macro is in the "<auxiliary_zookeeper_name>:/" prefix, but we probably
|
||||
/// don't want to allow that.
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find uuid in zookeeper path after expanding {{uuid}} macro: {} (uuid {})", res.path, uuid_str);
|
||||
i += uuid_str.size();
|
||||
/// In case the path is "/foo/pika{uuid}chu/bar" (or "/foo/{uuid}{replica}/bar").
|
||||
while (i < res.path.size() && res.path[i] != '/')
|
||||
i += 1;
|
||||
res.path_prefix_for_drop = res.path.substr(0, i);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void TableZnodeInfo::dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const
|
||||
{
|
||||
chassert(path.starts_with(path_prefix_for_drop));
|
||||
if (path_prefix_for_drop.empty() || path_prefix_for_drop.size() == path.size())
|
||||
return;
|
||||
chassert(path[path_prefix_for_drop.size()] == '/');
|
||||
|
||||
String path_to_remove = path;
|
||||
while (path_to_remove.size() > path_prefix_for_drop.size())
|
||||
{
|
||||
size_t i = path_to_remove.find_last_of('/');
|
||||
chassert(i != String::npos && i >= path_prefix_for_drop.size());
|
||||
path_to_remove = path_to_remove.substr(0, i);
|
||||
|
||||
Coordination::Error rc = zookeeper->tryRemove(path_to_remove);
|
||||
if (rc != Coordination::Error::ZOK)
|
||||
/// Znode not empty or already removed by someone else.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
62
src/Storages/TableZnodeInfo.h
Normal file
62
src/Storages/TableZnodeInfo.h
Normal file
@ -0,0 +1,62 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
#include <Storages/RenamingRestrictions.h>
|
||||
#include <Databases/LoadingStrictnessLevel.h>
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
class ZooKeeper;
|
||||
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct StorageID;
|
||||
class ASTCreateQuery;
|
||||
class Context;
|
||||
using ContextPtr = std::shared_ptr<const Context>;
|
||||
|
||||
/// Helper for replicated tables that use zookeeper for coordination among replicas.
|
||||
/// Handles things like:
|
||||
/// * Expanding macros like {table} and {uuid} in zookeeper path. Some macros are expanded+saved once
|
||||
/// on table creation (e.g. {table}, to avoid changing the path if the table is later renamed),
|
||||
/// others are expanded on each server startup and each replica (e.g. {replica} because it's
|
||||
/// different on different replicas).
|
||||
/// * When dropping table with znode path (say) "/clickhouse/tables/{uuid}/{shard}", delete not only
|
||||
/// the znode at this path but also the parent znode "/clickhouse/tables/{uuid}" if it became empty.
|
||||
/// Otherwise each created+dropped table would leave behind an empty znode.
|
||||
|
||||
struct TableZnodeInfo
|
||||
{
|
||||
String path;
|
||||
String replica_name;
|
||||
/// Which zookeeper cluster to use ("default" or one of auxiliary zookeepers listed in config).
|
||||
String zookeeper_name = "default";
|
||||
|
||||
/// Path with optional zookeeper_name prefix: "<auxiliary_zookeeper_name>:<path>".
|
||||
String full_path;
|
||||
|
||||
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro.
|
||||
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
|
||||
|
||||
/// Information to save in table metadata and send to replicas (if ON CLUSTER or DatabaseReplicated).
|
||||
/// Has some macros expanded (e.g. {table}), others left unexpanded (e.g. {replica}).
|
||||
String full_path_for_metadata;
|
||||
String replica_name_for_metadata;
|
||||
|
||||
/// Path to an ancestor of `path` that should be considered "owned" by this table (shared among
|
||||
/// replicas of the table). When table is dropped, this znode will be removed if it became empty.
|
||||
/// E.g. path = "/clickhouse/tables/{uuid}/{shard}", path_prefix_to_drop = "/clickhouse/tables/{uuid}".
|
||||
String path_prefix_for_drop;
|
||||
|
||||
static TableZnodeInfo resolve(
|
||||
const String & requested_path, const String & requested_replica_name,
|
||||
const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode,
|
||||
const ContextPtr & context);
|
||||
|
||||
void dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const;
|
||||
};
|
||||
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
s1 r1 OK 0 0
|
||||
1
|
||||
s1 r1 OK 0 0
|
||||
0
|
16
tests/queries/0_stateless/03234_replicated_table_parent_znode_cleanup.sh
Executable file
16
tests/queries/0_stateless/03234_replicated_table_parent_znode_cleanup.sh
Executable file
@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
db="rdb_$CLICKHOUSE_DATABASE"
|
||||
|
||||
$CLICKHOUSE_CLIENT -nq "
|
||||
create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1');
|
||||
create table $db.a (x Int8) engine ReplicatedMergeTree order by x;"
|
||||
uuid=`$CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = '$db' and name = 'a'"`
|
||||
$CLICKHOUSE_CLIENT -nq "
|
||||
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';
|
||||
drop table $db.a sync;
|
||||
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';"
|
||||
$CLICKHOUSE_CLIENT -q "drop database $db"
|
Loading…
Reference in New Issue
Block a user