Simplify the implementation

This commit is contained in:
Nikolay Degterinsky 2023-10-25 19:02:48 +00:00
parent b737866d02
commit 97c99132e0
11 changed files with 75 additions and 103 deletions

View File

@ -97,12 +97,12 @@ The fourth one is useful to remove metadata of dead replica when all other repli
Dead replicas of `Replicated` databases can be dropped using following syntax:
``` sql
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM DATABASE database;
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'];
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM ZKPATH '/path/to/table/in/zk';
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM DATABASE database;
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'];
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM ZKPATH '/path/to/table/in/zk';
```
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. Replica group name is the name defined by `replica_group_name` [setting](../../operations/server-configuration-parameters/settings.md#replica_group_name) in the server configuration. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format if replica groups are not used and in `shard_name|replica_name|group_name` otherwise.
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format.
## DROP UNCOMPRESSED CACHE

View File

@ -118,50 +118,28 @@ DatabaseReplicated::DatabaseReplicated(
fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef());
replica_group_name = context_->getConfigRef().getString("replica_group_name", "");
if (replica_group_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '/': {}", replica_group_name);
if (replica_group_name.find('|') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '|': {}", replica_group_name);
}
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica, const String & replica_group)
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica)
{
if (replica_group.empty())
return shard + '|' + replica;
else
return shard + '|' + replica + '|' + replica_group;
return shard + '|' + replica;
}
String DatabaseReplicated::getFullReplicaName() const
{
return getFullReplicaName(shard_name, replica_name, replica_group_name);
return getFullReplicaName(shard_name, replica_name);
}
DatabaseReplicated::NameParts DatabaseReplicated::parseFullReplicaName(const String & name)
std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String & name)
{
NameParts parts;
auto pos_first = name.find('|');
if (pos_first == std::string::npos)
String shard;
String replica;
auto pos = name.find('|');
if (pos == std::string::npos || name.find('|', pos + 1) != std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
parts.shard = name.substr(0, pos_first);
auto pos_second = name.find('|', pos_first + 1);
if (pos_second == std::string::npos)
{
parts.replica = name.substr(pos_first + 1);
return parts;
}
if (name.find('|', pos_second + 1) != std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
parts.replica = name.substr(pos_first + 1, pos_second - pos_first - 1);
parts.replica_group = name.substr(pos_second + 1);
return parts;
shard = name.substr(0, pos);
replica = name.substr(pos + 1);
return {shard, replica};
}
ClusterPtr DatabaseReplicated::tryGetCluster() const
@ -217,10 +195,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", zookeeper_path);
hosts.clear();
std::vector<String> paths;
for (const auto & host : unfiltered_hosts)
paths.push_back(zookeeper_path + "/replicas/" + host + "/replica_group");
auto replica_groups = zookeeper->tryGet(paths);
for (size_t i = 0; i < paths.size(); ++i)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts.push_back(host);
if (replica_groups[i].data == replica_group_name)
hosts.push_back(unfiltered_hosts[i]);
}
Int32 cversion = stat.cversion;
@ -253,7 +238,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
assert(!hosts.empty());
assert(hosts.size() == host_ids.size());
String current_shard = parseFullReplicaName(hosts.front()).shard;
String current_shard = parseFullReplicaName(hosts.front()).first;
std::vector<std::vector<DatabaseReplicaInfo>> shards;
shards.emplace_back();
for (size_t i = 0; i < hosts.size(); ++i)
@ -261,17 +246,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
const auto & id = host_ids[i];
if (id == DROPPED_MARK)
continue;
auto parts = parseFullReplicaName(hosts[i]);
auto [shard, replica] = parseFullReplicaName(hosts[i]);
auto pos = id.rfind(':');
String host_port = id.substr(0, pos);
if (parts.shard != current_shard)
if (shard != current_shard)
{
current_shard = parts.shard;
current_shard = shard;
if (!shards.back().empty())
shards.emplace_back();
}
String hostname = unescapeForFileName(host_port);
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(parts.shard), std::move(parts.replica), std::move(parts.replica_group)});
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(shard), std::move(replica)});
}
UInt16 default_port = getContext()->getTCPPort();
@ -301,7 +286,7 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name, replica.database_replica_group_name);
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
@ -381,6 +366,21 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
"Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'",
replica_name, shard_name, zookeeper_path, replica_host_id, host_id);
}
/// Check that replica_group_name in ZooKeeper matches the local one and change it if necessary.
String zk_replica_group_name;
if (!current_zookeeper->tryGet(replica_path + "/replica_group", zk_replica_group_name))
{
/// Replica groups were introduced in 23.10, so the node might not exist
current_zookeeper->create(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent);
if (!replica_group_name.empty())
createEmptyLogEntry(current_zookeeper);
}
else if (zk_replica_group_name != replica_group_name)
{
current_zookeeper->set(replica_path + "/replica_group", replica_group_name, -1);
createEmptyLogEntry(current_zookeeper);
}
}
else if (is_create_query)
{
@ -497,21 +497,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
for (int attempts = 10; attempts > 0; --attempts)
{
Coordination::Stat stat_max_log_ptr;
Coordination::Stat stat_replicas;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat_max_log_ptr);
Strings replicas = current_zookeeper->getChildren(zookeeper_path + "/replicas", &stat_replicas);
for (const auto & replica : replicas)
{
NameParts parts = parseFullReplicaName(replica);
if (parts.shard == shard_name && parts.replica == replica_name)
{
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Replica {} of shard {} of replicated database already exists in the replica group {} at {}",
replica_name, shard_name, parts.replica_group, zookeeper_path);
}
}
Coordination::Stat stat;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
/// This way we make sure that other replica with the same replica_name and shard_name
/// but with a different replica_group_name was not created at the same time.
@ -521,11 +508,12 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent));
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat_max_log_ptr.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", replica_value, stat_replicas.version));
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
@ -759,10 +747,16 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
Strings hosts_to_wait;
Strings unfiltered_hosts = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
std::vector<String> paths;
for (const auto & host : unfiltered_hosts)
paths.push_back(zookeeper_path + "/replicas/" + host + "/replica_group");
auto replica_groups = getZooKeeper()->tryGet(paths);
for (size_t i = 0; i < paths.size(); ++i)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts_to_wait.push_back(host);
if (replica_groups[i].data == replica_group_name)
hosts_to_wait.push_back(unfiltered_hosts[i]);
}
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
@ -1172,11 +1166,11 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
}
void DatabaseReplicated::dropReplica(
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group)
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica)
{
assert(!database || database_zookeeper_path == database->zookeeper_path);
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica, replica_group);
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);

View File

@ -54,19 +54,12 @@ public:
void stopReplication() override;
struct NameParts
{
String shard;
String replica;
String replica_group;
};
String getShardName() const { return shard_name; }
String getReplicaName() const { return replica_name; }
String getReplicaGroupName() const { return replica_group_name; }
String getFullReplicaName() const;
static String getFullReplicaName(const String & shard, const String & replica, const String & replica_group);
static NameParts parseFullReplicaName(const String & name);
static String getFullReplicaName(const String & shard, const String & replica);
static std::pair<String, String> parseFullReplicaName(const String & name);
const String & getZooKeeperPath() const { return zookeeper_path; }
@ -88,7 +81,7 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group);
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;

View File

@ -159,7 +159,6 @@ Cluster::Address::Address(
host_name = parsed_host_port.first;
database_shard_name = info.shard_name;
database_replica_name = info.replica_name;
database_replica_group_name = info.replica_group_name;
port = parsed_host_port.second;
secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = params.priority;
@ -517,7 +516,7 @@ Cluster::Cluster(
Addresses current;
for (const auto & replica : shard)
current.emplace_back(
DatabaseReplicaInfo{replica, "", "", ""},
DatabaseReplicaInfo{replica, "", ""},
params,
current_shard_num,
current.size() + 1);

View File

@ -35,7 +35,6 @@ struct DatabaseReplicaInfo
String hostname;
String shard_name;
String replica_name;
String replica_group_name;
};
struct ClusterConnectionParameters
@ -112,7 +111,6 @@ public:
String host_name;
String database_shard_name;
String database_replica_name;
String database_replica_group_name;
UInt16 port{0};
String user;
String password;

View File

@ -927,7 +927,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return;
String full_replica_name = query_.shard.empty() ? query_.replica
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica, query_.replica_group);
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica);
if (replicated->getFullReplicaName() != full_replica_name)
return;
@ -943,7 +943,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
@ -968,7 +968,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
}
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
@ -981,7 +981,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, query.replica_group);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else

View File

@ -357,9 +357,9 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
size_t num = 0;
if (is_replicated_database)
{
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(parts.shard);
columns[num++]->insert(parts.replica);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
@ -511,9 +511,9 @@ Chunk DDLQueryStatusSource::generate()
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(parts.shard);
columns[num++]->insert(parts.replica);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
}
else

View File

@ -116,9 +116,6 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!shard.empty())
print_keyword(" FROM SHARD ") << quoteString(shard);
if (!replica_group.empty())
print_keyword(" FROM GROUP ") << quoteString(replica_group);
if (table)
{
print_keyword(" FROM TABLE ");

View File

@ -107,7 +107,6 @@ public:
String replica;
String shard;
String replica_zk_path;
String replica_group;
bool is_drop_whole_replica{};
String storage_policy;
String volume;

View File

@ -165,14 +165,6 @@ enum class SystemQueryTargetType
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->shard = ast->as<ASTLiteral &>().value.safeGet<String>();
if (database && ParserKeyword{"FROM GROUP"}.ignore(pos, expected))
{
ASTPtr group_ast;
if (!ParserStringLiteral{}.parse(pos, group_ast, expected))
return false;
res->replica_group = group_ast->as<ASTLiteral &>().value.safeGet<String>();
}
}
if (ParserKeyword{"FROM"}.ignore(pos, expected))

View File

@ -118,7 +118,7 @@ def test_cluster_groups(started_cluster):
# 4. SYSTEM DROP DATABASE REPLICA
backup_node_2.stop_clickhouse()
backup_node_1.query(
"SYSTEM DROP DATABASE REPLICA '4' FROM SHARD '1' FROM GROUP 'backups' FROM DATABASE cluster_groups"
"SYSTEM DROP DATABASE REPLICA '1|4' FROM DATABASE cluster_groups"
)
assert_eq_with_retry(backup_node_1, cluster_query, "backup_node_1\n")