Compare commits

...

6 Commits

Author SHA1 Message Date
Michael Kolupaev
343edf0c33
Merge 9102a6f119 into 733c57dae7 2024-09-16 00:29:30 +02:00
Michael Kolupaev
9102a6f119 Merge remote-tracking branch 'origin/master' into zznode 2024-09-14 00:57:56 +00:00
Michael Kolupaev
59d1f9a6b1 Fix test 2024-09-13 20:04:28 +00:00
Michael Kolupaev
adb905a692 Small improvement 2024-09-13 04:12:28 +00:00
Michael Kolupaev
e6ec9eaad3 Conflict 2024-09-13 02:18:24 +00:00
Michael Kolupaev
9a3adc70bd Don't leave an empty znode when replicated table is dropped 2024-09-12 21:20:25 +00:00
15 changed files with 341 additions and 208 deletions

View File

@ -1723,11 +1723,10 @@ std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts
String extractZooKeeperName(const String & path)
{
static constexpr auto default_zookeeper_name = "default";
if (path.empty())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path should not be empty");
if (path[0] == '/')
return default_zookeeper_name;
return String(DEFAULT_ZOOKEEPER_NAME);
auto pos = path.find(":/");
if (pos != String::npos && pos < path.find('/'))
{
@ -1736,7 +1735,7 @@ String extractZooKeeperName(const String & path)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'");
return zookeeper_name;
}
return default_zookeeper_name;
return String(DEFAULT_ZOOKEEPER_NAME);
}
String extractZooKeeperPath(const String & path, bool check_starts_with_slash, LoggerPtr log)

View File

@ -47,6 +47,10 @@ namespace zkutil
/// Preferred size of multi command (in the number of operations)
constexpr size_t MULTI_BATCH_SIZE = 100;
/// Path "default:/foo" refers to znode "/foo" in the default zookeeper,
/// path "other:/foo" refers to znode "/foo" in auxiliary zookeeper named "other".
constexpr std::string_view DEFAULT_ZOOKEEPER_NAME = "default";
struct ShuffleHost
{
enum AvailabilityZoneInfo

View File

@ -3743,6 +3743,11 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
return zookeeper->second;
}
std::shared_ptr<zkutil::ZooKeeper> Context::getDefaultOrAuxiliaryZooKeeper(const String & name) const
{
return name == zkutil::DEFAULT_ZOOKEEPER_NAME ? getZooKeeper() : getAuxiliaryZooKeeper(name);
}
std::map<String, zkutil::ZooKeeperPtr> Context::getAuxiliaryZooKeepers() const
{

View File

@ -1004,6 +1004,8 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// If name == "default", same as getZooKeeper(), otherwise same as getAuxiliaryZooKeeper().
std::shared_ptr<zkutil::ZooKeeper> getDefaultOrAuxiliaryZooKeeper(const String & name) const;
/// return Auxiliary Zookeeper map
std::map<String, zkutil::ZooKeeperPtr> getAuxiliaryZooKeepers() const;

View File

@ -1469,7 +1469,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
if (!getContext()->getSettingsRef().allow_experimental_refreshable_materialized_view)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use.");
"Refreshable materialized views are experimental. Enable allow_experimental_refreshable_materialized_view to use");
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visit(*create.refresh_strategy);

View File

@ -1005,7 +1005,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
{
ReplicatedTableStatus status;
storage_replicated->getStatus(status);
if (status.zookeeper_path == query.replica_zk_path)
if (status.zookeeper_info.path == query.replica_zk_path)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"There is a local table {}, which has the same table path in ZooKeeper. "
"Please check the path in query. "
@ -1028,7 +1028,10 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
if (zookeeper->exists(remote_replica_path + "/is_active"))
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't remove replica: {}, because it's active", query.replica);
StorageReplicatedMergeTree::dropReplica(zookeeper, query.replica_zk_path, query.replica, log);
TableZnodeInfo info;
info.path = query.replica_zk_path;
info.replica_name = query.replica;
StorageReplicatedMergeTree::dropReplica(zookeeper, info, log);
LOG_INFO(log, "Dropped replica {}", remote_replica_path);
}
else
@ -1045,12 +1048,12 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
storage_replicated->getStatus(status);
/// Do not allow to drop local replicas and active remote replicas
if (query.replica == status.replica_name)
if (query.replica == status.zookeeper_info.replica_name)
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED,
"We can't drop local replica, please use `DROP TABLE` if you want "
"to clean the data and drop this replica");
storage_replicated->dropReplica(status.zookeeper_path, query.replica, log);
storage_replicated->dropReplica(query.replica, log);
LOG_TRACE(log, "Dropped replica {} of {}", query.replica, table->getStorageID().getNameForLogs());
return true;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/TableZnodeInfo.h>
#include <Core/Types.h>
namespace DB
@ -16,9 +17,7 @@ struct ReplicatedTableStatus
ReplicatedMergeTreeQueue::Status queue;
UInt32 parts_to_check;
String zookeeper_name;
String zookeeper_path;
String replica_name;
TableZnodeInfo zookeeper_info;
String replica_path;
Int32 columns_version;
UInt64 log_max_index;

View File

@ -6,6 +6,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/TableZnodeInfo.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -88,32 +89,23 @@ See details in documentation: https://clickhouse.com/docs/en/engines/table-engin
If you use the Replicated version of engines, see https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication/.
)";
static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_zookeeper_path, ContextMutablePtr context)
static ColumnsDescription getColumnsDescriptionFromZookeeper(const TableZnodeInfo & zookeeper_info, ContextMutablePtr context)
{
String zookeeper_name = zkutil::extractZooKeeperName(raw_zookeeper_path);
String zookeeper_path = zkutil::extractZooKeeperPath(raw_zookeeper_path, true);
if (!context->hasZooKeeper() && !context->hasAuxiliaryZooKeeper(zookeeper_name))
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure without zookeeper, you must specify the structure manually"};
zkutil::ZooKeeperPtr zookeeper;
try
{
if (zookeeper_name == StorageReplicatedMergeTree::getDefaultZooKeeperName())
zookeeper = context->getZooKeeper();
else
zookeeper = context->getAuxiliaryZooKeeper(zookeeper_name);
zookeeper = context->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
}
catch (...)
{
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure from zookeeper, because cannot get zookeeper: {}. You must specify structure manually", getCurrentExceptionMessage(false)};
}
if (!zookeeper->exists(zookeeper_path + "/replicas"))
if (!zookeeper->exists(zookeeper_info.path + "/replicas"))
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot get replica structure, because there no other replicas in zookeeper. You must specify the structure manually"};
Coordination::Stat columns_stat;
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_path) / "columns", &columns_stat));
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_info.path) / "columns", &columns_stat));
}
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
@ -184,23 +176,16 @@ static std::string_view getNamePart(const String & engine_name)
/// Extracts zookeeper path and replica name from the table engine's arguments.
/// The function can modify those arguments (that's why they're passed separately in `engine_args`) and also determines RenamingRestrictions.
/// The function assumes the table engine is Replicated.
static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
static TableZnodeInfo extractZooKeeperPathAndReplicaNameFromEngineArgs(
const ASTCreateQuery & query,
const StorageID & table_id,
const String & engine_name,
ASTs & engine_args,
LoadingStrictnessLevel mode,
const ContextPtr & local_context,
String & zookeeper_path,
String & replica_name,
RenamingRestrictions & renaming_restrictions)
const ContextPtr & local_context)
{
chassert(isReplicated(engine_name));
zookeeper_path = "";
replica_name = "";
renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
bool is_extended_storage_def = isExtendedStorageDef(query);
if (is_extended_storage_def)
@ -210,62 +195,12 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
evaluateEngineArgs(engine_args, local_context);
}
bool is_on_cluster = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name, String zookeeper_path, String replica_name) -> TableZnodeInfo
{
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
replica_name = local_context->getMacros()->expand(replica_name, info);
}
ast_zk_path->value = zookeeper_path;
ast_replica_name->value = replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = local_context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = local_context->getMacros()->expand(replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
TableZnodeInfo res = TableZnodeInfo::resolve(zookeeper_path, replica_name, table_id, query, mode, local_context);
ast_zk_path->value = res.full_path_for_metadata;
ast_replica_name->value = res.replica_name_for_metadata;
return res;
};
size_t arg_num = 0;
@ -277,6 +212,9 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
if (has_valid_arguments)
{
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -293,27 +231,22 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
/// Get path and name from engine arguments
auto * ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
zookeeper_path = ast_zk_path->value.safeGet<String>();
else
if (!ast_zk_path || ast_zk_path->value.getType() != Field::Types::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
auto * ast_replica_name = engine_args[arg_num + 1]->as<ASTLiteral>();
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
replica_name = ast_replica_name->value.safeGet<String>();
else
if (!ast_replica_name || ast_replica_name->value.getType() != Field::Types::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
if (!query.attach && is_replicated_database && local_context->getSettingsRef().database_replicated_allow_replicated_engine_arguments == 2)
{
LOG_WARNING(&Poco::Logger::get("registerStorageMergeTree"), "Replacing user-provided ZooKeeper path and replica name ({}, {}) "
"with default arguments", zookeeper_path, replica_name);
engine_args[arg_num]->as<ASTLiteral>()->value = zookeeper_path = server_settings.default_replica_path;
engine_args[arg_num + 1]->as<ASTLiteral>()->value = replica_name = server_settings.default_replica_name;
"with default arguments", ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
ast_zk_path->value = server_settings.default_replica_path;
ast_replica_name->value = server_settings.default_replica_name;
}
expand_macro(ast_zk_path, ast_replica_name);
return expand_macro(ast_zk_path, ast_replica_name, ast_zk_path->value.safeGet<String>(), ast_replica_name->value.safeGet<String>());
}
else if (is_extended_storage_def
&& (arg_cnt == 0
@ -322,24 +255,24 @@ static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
{
/// Try use default values if arguments are not specified.
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
zookeeper_path = server_settings.default_replica_path;
/// TODO maybe use hostname if {replica} is not defined?
replica_name = server_settings.default_replica_name;
/// Modify query, so default values will be written to metadata
assert(arg_num == 0);
ASTs old_args;
std::swap(engine_args, old_args);
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
auto path_arg = std::make_shared<ASTLiteral>("");
auto name_arg = std::make_shared<ASTLiteral>("");
auto * ast_zk_path = path_arg.get();
auto * ast_replica_name = name_arg.get();
expand_macro(ast_zk_path, ast_replica_name);
auto res = expand_macro(ast_zk_path, ast_replica_name, server_settings.default_replica_path, server_settings.default_replica_name);
engine_args.emplace_back(std::move(path_arg));
engine_args.emplace_back(std::move(name_arg));
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
return res;
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
@ -363,15 +296,11 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
for (auto & engine_arg : engine_args)
engine_arg = engine_arg->clone();
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE;
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions;
try
{
extractZooKeeperPathAndReplicaNameFromEngineArgs(query, table_id, engine_name, engine_args, mode, local_context,
zookeeper_path, replica_name, renaming_restrictions);
auto res = extractZooKeeperPathAndReplicaNameFromEngineArgs(
query, table_id, engine_name, engine_args, LoadingStrictnessLevel::CREATE, local_context);
return res.full_path;
}
catch (Exception & e)
{
@ -382,8 +311,6 @@ std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreate
}
throw;
}
return zookeeper_path;
}
static StoragePtr create(const StorageFactory::Arguments & args)
@ -551,19 +478,17 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
/// Extract zookeeper path and replica name from engine arguments.
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
TableZnodeInfo zookeeper_info;
if (replicated)
{
extractZooKeeperPathAndReplicaNameFromEngineArgs(args.query, args.table_id, args.engine_name, args.engine_args, args.mode,
args.getLocalContext(), zookeeper_path, replica_name, renaming_restrictions);
zookeeper_info = extractZooKeeperPathAndReplicaNameFromEngineArgs(
args.query, args.table_id, args.engine_name, args.engine_args, args.mode, args.getLocalContext());
if (replica_name.empty())
if (zookeeper_info.replica_name.empty())
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
// '\t' and '\n' will interrupt parsing 'source replica' in ReplicatedMergeTreeLogEntryData::readText
if (replica_name.find('\t') != String::npos || replica_name.find('\n') != String::npos)
if (zookeeper_info.replica_name.find('\t') != String::npos || zookeeper_info.replica_name.find('\n') != String::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must not contain '\\t' or '\\n'");
arg_cnt = engine_args.size(); /// Update `arg_cnt` here because extractZooKeeperPathAndReplicaNameFromEngineArgs() could add arguments.
@ -649,7 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ColumnsDescription columns;
if (args.columns.empty() && replicated)
columns = getColumnsDescriptionFromZookeeper(zookeeper_path, context);
columns = getColumnsDescriptionFromZookeeper(zookeeper_info, context);
else
columns = args.columns;
@ -879,8 +804,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
need_check_table_structure = txn->isInitialQuery();
return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_path,
replica_name,
zookeeper_info,
args.mode,
args.table_id,
args.relative_data_path,
@ -889,7 +813,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
date_column_name,
merging_params,
std::move(storage_settings),
renaming_restrictions,
need_check_table_structure);
}
else

View File

@ -211,7 +211,6 @@ namespace ActionLocks
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
const String StorageReplicatedMergeTree::default_zookeeper_name = "default";
void StorageReplicatedMergeTree::setZooKeeper()
{
@ -221,18 +220,9 @@ void StorageReplicatedMergeTree::setZooKeeper()
/// strange effects. So we always use only one session for all tables.
/// (excluding auxiliary zookeepers)
if (zookeeper_name == default_zookeeper_name)
{
auto new_keeper = getContext()->getZooKeeper();
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
else
{
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
auto new_keeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
std::lock_guard lock(current_zookeeper_mutex);
current_zookeeper = new_keeper;
}
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const
@ -263,7 +253,7 @@ String StorageReplicatedMergeTree::getEndpointName() const
{
const MergeTreeSettings & settings = getContext()->getReplicatedMergeTreeSettings();
if (settings.enable_the_endpoint_id_with_zookeeper_name_prefix)
return zookeeper_name + ":" + replica_path;
return zookeeper_info.zookeeper_name + ":" + replica_path;
return replica_path;
}
@ -294,8 +284,7 @@ static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
const TableZnodeInfo & zookeeper_info_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const String & relative_data_path_,
@ -304,7 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
RenamingRestrictions renaming_restrictions_,
bool need_check_structure)
: MergeTreeData(table_id_,
metadata_,
@ -315,11 +303,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
mode,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, full_zookeeper_path(zookeeper_path_)
, zookeeper_name(zkutil::extractZooKeeperName(full_zookeeper_path))
, zookeeper_path(zkutil::extractZooKeeperPath(full_zookeeper_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
, replica_name(replica_name_)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
, zookeeper_info(zookeeper_info_)
, zookeeper_path(zookeeper_info.path)
, replica_name(zookeeper_info.replica_name)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this)
@ -331,7 +318,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_check_thread(*this)
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
, renaming_restrictions(renaming_restrictions_)
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
{
@ -365,7 +351,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// Will be activated by restarting thread.
mutations_finalizing_task->deactivate();
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
if (has_zookeeper)
{
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
@ -845,7 +831,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, log.load()))
if (!removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, log.load()))
{
/// Someone is recursively removing table right now, we cannot create new table until old one is removed
continue;
@ -1107,11 +1093,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperIfTableShutDown() const
{
zkutil::ZooKeeperPtr maybe_new_zookeeper;
if (zookeeper_name == default_zookeeper_name)
maybe_new_zookeeper = getContext()->getZooKeeper();
else
maybe_new_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
zkutil::ZooKeeperPtr maybe_new_zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(zookeeper_info.zookeeper_name);
maybe_new_zookeeper->sync(zookeeper_path);
return maybe_new_zookeeper;
}
@ -1226,7 +1208,7 @@ void StorageReplicatedMergeTree::drop()
LOG_INFO(log, "Dropping table with non-zero lost_part_count equal to {}", lost_part_count);
}
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_path, replica_name, log.load(), getSettings(), &has_metadata_in_zookeeper);
bool last_replica_dropped = dropReplica(zookeeper, zookeeper_info, log.load(), getSettings(), &has_metadata_in_zookeeper);
if (last_replica_dropped)
{
dropZookeeperZeroCopyLockPaths(zookeeper, zero_copy_locks_paths, log.load());
@ -1235,13 +1217,15 @@ void StorageReplicatedMergeTree::drop()
}
bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
LoggerPtr logger, MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
bool StorageReplicatedMergeTree::dropReplica(
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info, LoggerPtr logger,
MergeTreeSettingsPtr table_settings, std::optional<bool> * has_metadata_out)
{
if (zookeeper->expired())
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Table was not dropped because ZooKeeper session has expired.");
auto remote_replica_path = zookeeper_path + "/replicas/" + replica;
const String & zookeeper_path = zookeeper_info.path;
auto remote_replica_path = zookeeper_path + "/replicas/" + zookeeper_info.replica_name;
LOG_INFO(logger, "Removing replica {}, marking it as lost", remote_replica_path);
/// Mark itself lost before removing, because the following recursive removal may fail
@ -1352,30 +1336,33 @@ bool StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zookeeper);
LOG_INFO(logger, "Removing table {} (this might take several minutes)", zookeeper_path);
removeTableNodesFromZooKeeper(zookeeper, zookeeper_path, metadata_drop_lock, logger);
removeTableNodesFromZooKeeper(zookeeper, zookeeper_info, metadata_drop_lock, logger);
}
return true;
}
bool StorageReplicatedMergeTree::dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger)
bool StorageReplicatedMergeTree::dropReplica(const String & drop_replica, LoggerPtr logger)
{
zkutil::ZooKeeperPtr zookeeper = getZooKeeperIfTableShutDown();
/// NOTE it's not atomic: replica may become active after this check, but before dropReplica(...)
/// However, the main use case is to drop dead replica, which cannot become active.
/// This check prevents only from accidental drop of some other replica.
if (zookeeper->exists(drop_zookeeper_path + "/replicas/" + drop_replica + "/is_active"))
if (zookeeper->exists(zookeeper_info.path + "/replicas/" + drop_replica + "/is_active"))
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "Can't drop replica: {}, because it's active", drop_replica);
return dropReplica(zookeeper, drop_zookeeper_path, drop_replica, logger);
TableZnodeInfo info = zookeeper_info;
info.replica_name = drop_replica;
return dropReplica(zookeeper, info, logger);
}
bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper,
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
const TableZnodeInfo & zookeeper_info2, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger)
{
const String & zookeeper_path = zookeeper_info2.path;
bool completely_removed = false;
/// NOTE /block_numbers/ actually is not flat, because /block_numbers/<partition_id>/ may have ephemeral children,
@ -1443,6 +1430,15 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
metadata_drop_lock->setAlreadyRemoved();
completely_removed = true;
LOG_INFO(logger, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
try
{
zookeeper_info2.dropAncestorZnodesIfNeeded(zookeeper);
}
catch (...)
{
LOG_WARNING(logger, "Failed to drop ancestor znodes {} - {} after dropping table: {}", zookeeper_info2.path_prefix_for_drop, zookeeper_info2.path, getCurrentExceptionMessage(false));
}
}
return completely_removed;
@ -2295,7 +2291,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;
if (!fetchPart(part_name,
metadata_snapshot,
zookeeper_name,
zookeeper_info.zookeeper_name,
source_replica_path,
/* to_detached= */ false,
entry.quorum,
@ -2858,7 +2854,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), part_desc->found_new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
part_desc->res_part = fetched_part;
@ -2980,7 +2976,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), entry.new_part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
replicated_fetches_throttler, true);
@ -5076,7 +5072,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
currently_fetching_parts.erase(part_name);
});
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
LOG_DEBUG(log, "Fetching already known part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -5109,7 +5105,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
"'{}' != '{}', can't fetch part from {}", interserver_scheme, address.scheme, address.host);
auto [fetched_part, lock] = fetcher.fetchSelectedPart(
metadata_snapshot, getContext(), part_name, zookeeper_name, source_replica_path,
metadata_snapshot, getContext(), part_name, zookeeper_info.zookeeper_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(),
interserver_scheme, replicated_fetches_throttler, false, "", nullptr, true,
@ -5148,7 +5144,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_info.zookeeper_name, source_replica_path);
return part;
}
@ -6653,10 +6649,10 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped(ContextPtr query_context
void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_name) const
{
if (renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_ANY)
return;
if (renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
if (zookeeper_info.renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
{
auto old_name = getStorageID();
bool is_server_startup = Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER
@ -6680,7 +6676,7 @@ void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_na
"If you really want to rename table, you should edit metadata file first and restart server or reattach the table.");
}
assert(renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
assert(zookeeper_info.renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
if (!new_name.hasUUID() && getStorageID().hasUUID())
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Cannot move Replicated table to Ordinary database, because zookeeper_path contains implicit "
@ -7039,9 +7035,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
/// NOTE: consider convert to UInt64
res.parts_to_check = static_cast<UInt32>(part_check_thread.size());
res.zookeeper_name = zookeeper_name;
res.zookeeper_path = zookeeper_path;
res.replica_name = replica_name;
res.zookeeper_info = zookeeper_info;
res.replica_path = replica_path;
res.columns_version = -1;
@ -7250,11 +7244,7 @@ void StorageReplicatedMergeTree::fetchPartition(
}
}
zkutil::ZooKeeperPtr zookeeper;
if (from_zookeeper_name != default_zookeeper_name)
zookeeper = getContext()->getAuxiliaryZooKeeper(from_zookeeper_name);
else
zookeeper = getZooKeeper();
zkutil::ZooKeeperPtr zookeeper = getContext()->getDefaultOrAuxiliaryZooKeeper(from_zookeeper_name);
if (from.back() == '/')
from.resize(from.size() - 1);
@ -10540,7 +10530,7 @@ void StorageReplicatedMergeTree::backupData(
auto coordination = backup_entries_collector.getBackupCoordination();
coordination->addReplicatedDataPath(full_zookeeper_path, data_path_in_backup);
coordination->addReplicatedDataPath(zookeeper_info.full_path, data_path_in_backup);
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
std::vector<PartNameAndChecksum> part_names_with_hashes;
@ -10549,7 +10539,7 @@ void StorageReplicatedMergeTree::backupData(
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
/// Send our list of part names to the coordination (to compare with other replicas).
coordination->addReplicatedPartNames(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
coordination->addReplicatedPartNames(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
{
@ -10591,13 +10581,13 @@ void StorageReplicatedMergeTree::backupData(
}
if (!mutation_infos.empty())
coordination->addReplicatedMutations(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
coordination->addReplicatedMutations(zookeeper_info.full_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
}
}
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
/// give us the final list of parts to add to the BackupEntriesCollector.
auto post_collecting_task = [my_full_zookeeper_path = full_zookeeper_path,
auto post_collecting_task = [my_full_zookeeper_path = zookeeper_info.full_path,
my_replica_name = getReplicaName(),
coordination,
my_parts_backup_entries = std::move(parts_backup_entries),
@ -10636,7 +10626,7 @@ void StorageReplicatedMergeTree::backupData(
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zookeeper_path))
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(zookeeper_info.full_path))
{
/// Other replica is already restoring the data of this table.
/// We'll get them later due to replication, it's not necessary to read it from the backup.

View File

@ -27,6 +27,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedTableStatus.h>
#include <Storages/RenamingRestrictions.h>
#include <Storages/TableZnodeInfo.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
@ -98,8 +99,7 @@ public:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
const TableZnodeInfo & zookeeper_info_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const String & relative_data_path_,
@ -108,7 +108,6 @@ public:
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
RenamingRestrictions renaming_restrictions_,
bool need_check_structure);
void startup() override;
@ -244,14 +243,15 @@ public:
/** Remove a specific replica from zookeeper.
* returns true if there are no replicas left
*/
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);
bool dropReplica(const String & drop_zookeeper_path, const String & drop_replica, LoggerPtr logger);
bool dropReplica(const String & drop_replica, LoggerPtr logger);
/// Removes table from ZooKeeper after the last replica was dropped
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
static bool removeTableNodesFromZooKeeper(
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -330,17 +330,15 @@ public:
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
// Return default or custom zookeeper name for table
const String & getZooKeeperName() const { return zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_path; }
const String & getFullZooKeeperPath() const { return full_zookeeper_path; }
const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_info.path; }
const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }
// Return table id, common for different replicas
String getTableSharedID() const override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
/// Check if there are new broken disks and enqueue part recovery tasks.
void checkBrokenDisks();
@ -418,12 +416,10 @@ private:
bool is_readonly_metric_set = false;
const String full_zookeeper_path;
static const String default_zookeeper_name;
const String zookeeper_name;
const String zookeeper_path;
const TableZnodeInfo zookeeper_info;
const String zookeeper_path; // shorthand for zookeeper_info.path
const String replica_name;
const String replica_name; // shorthand for zookeeper_info.replica_name
const String replica_path;
/** /replicas/me/is_active.
@ -519,9 +515,6 @@ private:
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
const RenamingRestrictions renaming_restrictions;
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
/// speed.
ThrottlerPtr replicated_fetches_throttler;

View File

@ -530,9 +530,9 @@ Chunk SystemReplicasSource::generate()
res_columns[col_num++]->insert(status.is_session_expired);
res_columns[col_num++]->insert(status.queue.future_parts);
res_columns[col_num++]->insert(status.parts_to_check);
res_columns[col_num++]->insert(status.zookeeper_name);
res_columns[col_num++]->insert(status.zookeeper_path);
res_columns[col_num++]->insert(status.replica_name);
res_columns[col_num++]->insert(status.zookeeper_info.zookeeper_name);
res_columns[col_num++]->insert(status.zookeeper_info.path);
res_columns[col_num++]->insert(status.zookeeper_info.replica_name);
res_columns[col_num++]->insert(status.replica_path);
res_columns[col_num++]->insert(status.columns_version);
res_columns[col_num++]->insert(status.queue.queue_size);

View File

@ -0,0 +1,135 @@
#include <Storages/TableZnodeInfo.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/StorageID.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TableZnodeInfo TableZnodeInfo::resolve(const String & requested_path, const String & requested_replica_name, const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode, const ContextPtr & context)
{
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
TableZnodeInfo res;
res.full_path = requested_path;
res.replica_name = requested_replica_name;
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
res.full_path = context->getMacros()->expand(res.full_path, info);
info.level = 0;
res.replica_name = context->getMacros()->expand(res.replica_name, info);
}
res.full_path_for_metadata = res.full_path;
res.replica_name_for_metadata = res.replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
res.full_path = context->getMacros()->expand(res.full_path, info);
bool expanded_uuid_in_path = info.expanded_uuid;
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
res.replica_name = context->getMacros()->expand(res.replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
res.renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
res.renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
res.zookeeper_name = zkutil::extractZooKeeperName(res.full_path);
res.path = zkutil::extractZooKeeperPath(res.full_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, getLogger(table_id.getNameForLogs()));
res.path_prefix_for_drop = res.path;
if (expanded_uuid_in_path)
{
/// When dropping table with znode path "/foo/{uuid}/bar/baz", delete not only
/// "/foo/{uuid}/bar/baz" but also "/foo/{uuid}/bar" and "/foo/{uuid}" if they became empty.
///
/// (We find the uuid substring by searching instead of keeping track of it when expanding
/// the macro. So in principle we may find a uuid substring that wasn't expanded from a
/// macro. This should be ok because we're searching for the *last* occurrence, so we'll get
/// a prefix at least as long as the correct one, so we won't delete znodes outside the
/// {uuid} path component. This sounds sketchy, but propagating string indices through macro
/// expansion passes is sketchy too (error-prone and more complex), and on balance this seems
/// better.)
String uuid_str = toString(table_id.uuid);
size_t i = res.path.rfind(uuid_str);
if (i == String::npos)
/// Possible if the macro is in the "<auxiliary_zookeeper_name>:/" prefix, but we probably
/// don't want to allow that.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find uuid in zookeeper path after expanding {{uuid}} macro: {} (uuid {})", res.path, uuid_str);
i += uuid_str.size();
/// In case the path is "/foo/pika{uuid}chu/bar" (or "/foo/{uuid}{replica}/bar").
while (i < res.path.size() && res.path[i] != '/')
i += 1;
res.path_prefix_for_drop = res.path.substr(0, i);
}
return res;
}
void TableZnodeInfo::dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const
{
chassert(path.starts_with(path_prefix_for_drop));
if (path_prefix_for_drop.empty() || path_prefix_for_drop.size() == path.size())
return;
chassert(path[path_prefix_for_drop.size()] == '/');
String path_to_remove = path;
while (path_to_remove.size() > path_prefix_for_drop.size())
{
size_t i = path_to_remove.find_last_of('/');
chassert(i != String::npos && i >= path_prefix_for_drop.size());
path_to_remove = path_to_remove.substr(0, i);
Coordination::Error rc = zookeeper->tryRemove(path_to_remove);
if (rc != Coordination::Error::ZOK)
/// Znode not empty or already removed by someone else.
break;
}
}
}

View File

@ -0,0 +1,62 @@
#pragma once
#include <base/types.h>
#include <Storages/RenamingRestrictions.h>
#include <Databases/LoadingStrictnessLevel.h>
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
struct StorageID;
class ASTCreateQuery;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Helper for replicated tables that use zookeeper for coordination among replicas.
/// Handles things like:
/// * Expanding macros like {table} and {uuid} in zookeeper path. Some macros are expanded+saved once
/// on table creation (e.g. {table}, to avoid changing the path if the table is later renamed),
/// others are expanded on each server startup and each replica (e.g. {replica} because it's
/// different on different replicas).
/// * When dropping table with znode path (say) "/clickhouse/tables/{uuid}/{shard}", delete not only
/// the znode at this path but also the parent znode "/clickhouse/tables/{uuid}" if it became empty.
/// Otherwise each created+dropped table would leave behind an empty znode.
struct TableZnodeInfo
{
String path;
String replica_name;
/// Which zookeeper cluster to use ("default" or one of auxiliary zookeepers listed in config).
String zookeeper_name = "default";
/// Path with optional zookeeper_name prefix: "<auxiliary_zookeeper_name>:<path>".
String full_path;
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro.
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
/// Information to save in table metadata and send to replicas (if ON CLUSTER or DatabaseReplicated).
/// Has some macros expanded (e.g. {table}), others left unexpanded (e.g. {replica}).
String full_path_for_metadata;
String replica_name_for_metadata;
/// Path to an ancestor of `path` that should be considered "owned" by this table (shared among
/// replicas of the table). When table is dropped, this znode will be removed if it became empty.
/// E.g. path = "/clickhouse/tables/{uuid}/{shard}", path_prefix_to_drop = "/clickhouse/tables/{uuid}".
String path_prefix_for_drop;
static TableZnodeInfo resolve(
const String & requested_path, const String & requested_replica_name,
const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode,
const ContextPtr & context);
void dropAncestorZnodesIfNeeded(const zkutil::ZooKeeperPtr & zookeeper) const;
};
}

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
db="rdb_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1');
create table $db.a (x Int8) engine ReplicatedMergeTree order by x;"
uuid=`$CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = '$db' and name = 'a'"`
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -nq "
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';
drop table $db.a sync;
select count() from system.zookeeper where path = '/clickhouse/tables' and name = '$uuid';"
$CLICKHOUSE_CLIENT -q "drop database $db"