mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fixes & improvements
This commit is contained in:
parent
483184cbdb
commit
20e16be27b
@ -1330,6 +1330,23 @@ For more information, see the section [Creating replicated tables](../../engines
|
||||
<macros incl="macros" optional="true" />
|
||||
```
|
||||
|
||||
## replica_group_name {#replica_group_name}
|
||||
|
||||
Replica group name for database Replicated.
|
||||
|
||||
The cluster created by Replicated database will consist of replicas in the same group.
|
||||
DDL queries will only wail for the replicas in the same group.
|
||||
|
||||
Empty by default.
|
||||
|
||||
**Example**
|
||||
|
||||
``` xml
|
||||
<replica_group_name>backups</replica_group_name>
|
||||
```
|
||||
|
||||
Default value: ``.
|
||||
|
||||
## max_open_files {#max-open-files}
|
||||
|
||||
The maximum number of open files.
|
||||
|
@ -97,12 +97,12 @@ The fourth one is useful to remove metadata of dead replica when all other repli
|
||||
Dead replicas of `Replicated` databases can be dropped using following syntax:
|
||||
|
||||
``` sql
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM DATABASE database;
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'];
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM ZKPATH '/path/to/table/in/zk';
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM DATABASE database;
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'];
|
||||
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM ZKPATH '/path/to/table/in/zk';
|
||||
```
|
||||
|
||||
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format.
|
||||
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. Replica group name is the name defined by `replica_group_name` [setting](../../operations/server-configuration-parameters/settings.md#replica_group_name) in the server configuration. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format if replica groups are not used and in `shard_name|replica_name|group_name` otherwise.
|
||||
|
||||
## DROP UNCOMPRESSED CACHE
|
||||
|
||||
|
@ -914,6 +914,15 @@
|
||||
</macros>
|
||||
-->
|
||||
|
||||
<!-- Replica group name for database Replicated.
|
||||
The cluster created by Replicated database will consist of replicas in the same group.
|
||||
DDL queries will only wail for the replicas in the same group.
|
||||
Empty by default.
|
||||
-->
|
||||
<!--
|
||||
<replica_group_name><replica_group_name>
|
||||
-->
|
||||
|
||||
|
||||
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
|
||||
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>
|
||||
|
@ -117,29 +117,51 @@ DatabaseReplicated::DatabaseReplicated(
|
||||
if (!db_settings.collection_name.value.empty())
|
||||
fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef());
|
||||
|
||||
cluster_path = zookeeper_path + "/" + getClusterGroup(context_->getConfigRef());
|
||||
replica_group_name = context_->getConfigRef().getString("replica_group_name", "");
|
||||
|
||||
if (replica_group_name.find('/') != std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '/': {}", replica_group_name);
|
||||
if (replica_group_name.find('|') != std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '|': {}", replica_group_name);
|
||||
}
|
||||
|
||||
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica)
|
||||
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica, const String & replica_group)
|
||||
{
|
||||
return shard + '|' + replica;
|
||||
if (replica_group.empty())
|
||||
return shard + '|' + replica;
|
||||
else
|
||||
return shard + '|' + replica + '|' + replica_group;
|
||||
}
|
||||
|
||||
String DatabaseReplicated::getFullReplicaName() const
|
||||
{
|
||||
return getFullReplicaName(shard_name, replica_name);
|
||||
return getFullReplicaName(shard_name, replica_name, replica_group_name);
|
||||
}
|
||||
|
||||
std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String & name)
|
||||
DatabaseReplicated::NameParts DatabaseReplicated::parseFullReplicaName(const String & name)
|
||||
{
|
||||
String shard;
|
||||
String replica;
|
||||
auto pos = name.find('|');
|
||||
if (pos == std::string::npos || name.find('|', pos + 1) != std::string::npos)
|
||||
NameParts parts;
|
||||
|
||||
auto pos_first = name.find('|');
|
||||
if (pos_first == std::string::npos)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
|
||||
shard = name.substr(0, pos);
|
||||
replica = name.substr(pos + 1);
|
||||
return {shard, replica};
|
||||
|
||||
parts.shard = name.substr(0, pos_first);
|
||||
|
||||
auto pos_second = name.find('|', pos_first + 1);
|
||||
if (pos_second == std::string::npos)
|
||||
{
|
||||
parts.replica = name.substr(pos_first + 1);
|
||||
return parts;
|
||||
}
|
||||
|
||||
if (name.find('|', pos_second + 1) != std::string::npos)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
|
||||
|
||||
parts.replica = name.substr(pos_first + 1, pos_second - pos_first - 1);
|
||||
parts.replica_group = name.substr(pos_second + 1);
|
||||
|
||||
return parts;
|
||||
}
|
||||
|
||||
ClusterPtr DatabaseReplicated::tryGetCluster() const
|
||||
@ -177,6 +199,7 @@ void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
|
||||
|
||||
ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
{
|
||||
Strings unfiltered_hosts;
|
||||
Strings hosts;
|
||||
Strings host_ids;
|
||||
|
||||
@ -188,11 +211,18 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
{
|
||||
host_ids.resize(0);
|
||||
Coordination::Stat stat;
|
||||
hosts = zookeeper->getChildren(cluster_path, &stat);
|
||||
if (hosts.empty())
|
||||
unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
|
||||
if (unfiltered_hosts.empty())
|
||||
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
|
||||
"It's possible if the first replica is not fully created yet "
|
||||
"or if the last replica was just dropped or due to logical error", zookeeper_path);
|
||||
|
||||
for (const auto & host : unfiltered_hosts)
|
||||
{
|
||||
if (replica_group_name == parseFullReplicaName(host).replica_group)
|
||||
hosts.push_back(host);
|
||||
}
|
||||
|
||||
Int32 cversion = stat.cversion;
|
||||
::sort(hosts.begin(), hosts.end());
|
||||
|
||||
@ -200,7 +230,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
futures.reserve(hosts.size());
|
||||
host_ids.reserve(hosts.size());
|
||||
for (const auto & host : hosts)
|
||||
futures.emplace_back(zookeeper->asyncTryGet(cluster_path + "/" + host));
|
||||
futures.emplace_back(zookeeper->asyncTryGet(zookeeper_path + "/replicas/" + host));
|
||||
|
||||
success = true;
|
||||
for (auto & future : futures)
|
||||
@ -211,7 +241,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
host_ids.emplace_back(res.data);
|
||||
}
|
||||
|
||||
zookeeper->get(cluster_path, &stat);
|
||||
zookeeper->get(zookeeper_path + "/replicas", &stat);
|
||||
if (cversion != stat.cversion)
|
||||
success = false;
|
||||
if (success)
|
||||
@ -223,7 +253,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
|
||||
assert(!hosts.empty());
|
||||
assert(hosts.size() == host_ids.size());
|
||||
String current_shard = parseFullReplicaName(hosts.front()).first;
|
||||
String current_shard = parseFullReplicaName(hosts.front()).shard;
|
||||
std::vector<std::vector<DatabaseReplicaInfo>> shards;
|
||||
shards.emplace_back();
|
||||
for (size_t i = 0; i < hosts.size(); ++i)
|
||||
@ -231,17 +261,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
|
||||
const auto & id = host_ids[i];
|
||||
if (id == DROPPED_MARK)
|
||||
continue;
|
||||
auto [shard, replica] = parseFullReplicaName(hosts[i]);
|
||||
auto parts = parseFullReplicaName(hosts[i]);
|
||||
auto pos = id.rfind(':');
|
||||
String host_port = id.substr(0, pos);
|
||||
if (shard != current_shard)
|
||||
if (parts.shard != current_shard)
|
||||
{
|
||||
current_shard = shard;
|
||||
current_shard = parts.shard;
|
||||
if (!shards.back().empty())
|
||||
shards.emplace_back();
|
||||
}
|
||||
String hostname = unescapeForFileName(host_port);
|
||||
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(shard), std::move(replica)});
|
||||
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(parts.shard), std::move(parts.replica), std::move(parts.replica_group)});
|
||||
}
|
||||
|
||||
UInt16 default_port = getContext()->getTCPPort();
|
||||
@ -271,8 +301,8 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
|
||||
{
|
||||
for (const auto & replica : addresses_with_failover[shard_index])
|
||||
{
|
||||
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
|
||||
paths.emplace_back(fs::path(cluster_path) / full_name / "active");
|
||||
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name, replica.database_replica_name);
|
||||
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,15 +341,6 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
|
||||
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
|
||||
}
|
||||
|
||||
String DatabaseReplicated::getClusterGroup(const Poco::Util::AbstractConfiguration & config_ref)
|
||||
{
|
||||
const auto cluster_group = config_ref.getString("database_replicated_cluster_group", "");
|
||||
|
||||
if (cluster_group.empty())
|
||||
return "replicas";
|
||||
|
||||
return "replicas_" + cluster_group;
|
||||
}
|
||||
|
||||
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
|
||||
{
|
||||
@ -338,16 +359,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
|
||||
createDatabaseNodesInZooKeeper(current_zookeeper);
|
||||
}
|
||||
|
||||
if (!current_zookeeper->exists(cluster_path))
|
||||
{
|
||||
/// Create new cluster group, multiple nodes can execute it concurrently
|
||||
auto code = current_zookeeper->tryCreate(cluster_path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
|
||||
throw Coordination::Exception(code);
|
||||
}
|
||||
|
||||
replica_path = fs::path(cluster_path) / getFullReplicaName();
|
||||
replica_path = fs::path(zookeeper_path) / "replicas" / getFullReplicaName();
|
||||
bool is_create_query = mode == LoadingStrictnessLevel::CREATE;
|
||||
|
||||
String replica_host_id;
|
||||
@ -485,8 +497,26 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
|
||||
|
||||
for (int attempts = 10; attempts > 0; --attempts)
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
|
||||
Coordination::Stat stat_max_log_ptr;
|
||||
Coordination::Stat stat_replicas;
|
||||
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat_max_log_ptr);
|
||||
Strings replicas = current_zookeeper->getChildren(zookeeper_path + "/replicas", &stat_replicas);
|
||||
for (const auto & replica : replicas)
|
||||
{
|
||||
NameParts parts = parseFullReplicaName(replica);
|
||||
if (parts.shard == shard_name && parts.replica == replica_name)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::REPLICA_ALREADY_EXISTS,
|
||||
"Replica {} of shard {} of replicated database already exists in the replica group {} at {}",
|
||||
replica_name, shard_name, parts.replica_group, zookeeper_path);
|
||||
}
|
||||
}
|
||||
|
||||
/// This way we make sure that other replica with the same replica_name and shard_name
|
||||
/// but with a different replica_group_name was not created at the same time.
|
||||
String replica_value = "Last added replica: " + getFullReplicaName();
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
|
||||
@ -494,7 +524,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
|
||||
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
|
||||
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
|
||||
/// notify other nodes that issued new queries while this node was recovering.
|
||||
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
|
||||
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat_max_log_ptr.version));
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", replica_value, stat_replicas.version));
|
||||
Coordination::Responses responses;
|
||||
const auto code = current_zookeeper->tryMulti(ops, responses);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
@ -725,7 +756,15 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
|
||||
entry.tracing_context = OpenTelemetry::CurrentContext();
|
||||
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
|
||||
|
||||
Strings hosts_to_wait = getZooKeeper()->getChildren(cluster_path);
|
||||
Strings hosts_to_wait;
|
||||
Strings unfiltered_hosts = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
|
||||
|
||||
for (const auto & host : unfiltered_hosts)
|
||||
{
|
||||
if (replica_group_name == parseFullReplicaName(host).replica_group)
|
||||
hosts_to_wait.push_back(host);
|
||||
}
|
||||
|
||||
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
|
||||
}
|
||||
|
||||
@ -1107,11 +1146,11 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
|
||||
}
|
||||
|
||||
void DatabaseReplicated::dropReplica(
|
||||
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica)
|
||||
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group)
|
||||
{
|
||||
assert(!database || database_zookeeper_path == database->zookeeper_path);
|
||||
|
||||
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica);
|
||||
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica, replica_group);
|
||||
|
||||
if (full_replica_name.find('/') != std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);
|
||||
@ -1161,7 +1200,7 @@ void DatabaseReplicated::drop(ContextPtr context_)
|
||||
|
||||
current_zookeeper->tryRemoveRecursive(replica_path);
|
||||
/// TODO it may leave garbage in ZooKeeper if the last node lost connection here
|
||||
if (current_zookeeper->tryRemove(cluster_path) == Coordination::Error::ZOK)
|
||||
if (current_zookeeper->tryRemove(zookeeper_path + "/replicas") == Coordination::Error::ZOK)
|
||||
{
|
||||
/// It was the last replica, remove all metadata
|
||||
current_zookeeper->tryRemoveRecursive(zookeeper_path);
|
||||
|
@ -54,11 +54,19 @@ public:
|
||||
|
||||
void stopReplication() override;
|
||||
|
||||
struct NameParts
|
||||
{
|
||||
String shard;
|
||||
String replica;
|
||||
String replica_group;
|
||||
};
|
||||
|
||||
String getShardName() const { return shard_name; }
|
||||
String getReplicaName() const { return replica_name; }
|
||||
String getReplicaGroupName() const { return replica_group_name; }
|
||||
String getFullReplicaName() const;
|
||||
static String getFullReplicaName(const String & shard, const String & replica);
|
||||
static std::pair<String, String> parseFullReplicaName(const String & name);
|
||||
static String getFullReplicaName(const String & shard, const String & replica, const String & replica_group);
|
||||
static NameParts parseFullReplicaName(const String & name);
|
||||
|
||||
const String & getZooKeeperPath() const { return zookeeper_path; }
|
||||
|
||||
@ -80,7 +88,7 @@ public:
|
||||
|
||||
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
|
||||
|
||||
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica);
|
||||
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group);
|
||||
|
||||
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
|
||||
|
||||
@ -101,7 +109,6 @@ private:
|
||||
} cluster_auth_info;
|
||||
|
||||
void fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config);
|
||||
String getClusterGroup(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void checkQueryValid(const ASTPtr & query, ContextPtr query_context) const;
|
||||
|
||||
@ -127,8 +134,8 @@ private:
|
||||
String zookeeper_path;
|
||||
String shard_name;
|
||||
String replica_name;
|
||||
String replica_group_name;
|
||||
String replica_path;
|
||||
String cluster_path;
|
||||
DatabaseReplicatedSettings db_settings;
|
||||
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
|
@ -159,6 +159,7 @@ Cluster::Address::Address(
|
||||
host_name = parsed_host_port.first;
|
||||
database_shard_name = info.shard_name;
|
||||
database_replica_name = info.replica_name;
|
||||
database_replica_group_name = info.replica_group_name;
|
||||
port = parsed_host_port.second;
|
||||
secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
|
||||
priority = params.priority;
|
||||
@ -518,7 +519,7 @@ Cluster::Cluster(
|
||||
Addresses current;
|
||||
for (const auto & replica : shard)
|
||||
current.emplace_back(
|
||||
DatabaseReplicaInfo{replica, "", ""},
|
||||
DatabaseReplicaInfo{replica, "", "", ""},
|
||||
params,
|
||||
current_shard_num,
|
||||
current.size() + 1);
|
||||
|
@ -35,6 +35,7 @@ struct DatabaseReplicaInfo
|
||||
String hostname;
|
||||
String shard_name;
|
||||
String replica_name;
|
||||
String replica_group_name;
|
||||
};
|
||||
|
||||
struct ClusterConnectionParameters
|
||||
@ -111,6 +112,7 @@ public:
|
||||
String host_name;
|
||||
String database_shard_name;
|
||||
String database_replica_name;
|
||||
String database_replica_group_name;
|
||||
UInt16 port{0};
|
||||
String user;
|
||||
String password;
|
||||
|
@ -844,7 +844,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
|
||||
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
|
||||
return;
|
||||
String full_replica_name = query_.shard.empty() ? query_.replica
|
||||
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica);
|
||||
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica, query_.replica_group);
|
||||
if (replicated->getFullReplicaName() != full_replica_name)
|
||||
return;
|
||||
|
||||
@ -860,7 +860,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
|
||||
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
|
||||
{
|
||||
check_not_local_replica(replicated, query);
|
||||
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
|
||||
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
|
||||
@ -885,7 +885,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
|
||||
}
|
||||
|
||||
check_not_local_replica(replicated, query);
|
||||
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
|
||||
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
|
||||
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
|
||||
}
|
||||
}
|
||||
@ -898,7 +898,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
|
||||
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
|
||||
check_not_local_replica(replicated, query);
|
||||
|
||||
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica);
|
||||
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, query.replica_group);
|
||||
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
|
||||
}
|
||||
else
|
||||
|
@ -357,9 +357,9 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
|
||||
size_t num = 0;
|
||||
if (is_replicated_database)
|
||||
{
|
||||
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
|
||||
columns[num++]->insert(shard);
|
||||
columns[num++]->insert(replica);
|
||||
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
|
||||
columns[num++]->insert(parts.shard);
|
||||
columns[num++]->insert(parts.replica);
|
||||
if (active_hosts_set.contains(host_id))
|
||||
columns[num++]->insert(IN_PROGRESS);
|
||||
else
|
||||
@ -511,9 +511,9 @@ Chunk DDLQueryStatusSource::generate()
|
||||
{
|
||||
if (status.code != 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
|
||||
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
|
||||
columns[num++]->insert(shard);
|
||||
columns[num++]->insert(replica);
|
||||
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
|
||||
columns[num++]->insert(parts.shard);
|
||||
columns[num++]->insert(parts.replica);
|
||||
columns[num++]->insert(OK);
|
||||
}
|
||||
else
|
||||
|
@ -101,6 +101,7 @@ public:
|
||||
String replica;
|
||||
String shard;
|
||||
String replica_zk_path;
|
||||
String replica_group;
|
||||
bool is_drop_whole_replica{};
|
||||
String storage_policy;
|
||||
String volume;
|
||||
|
@ -165,6 +165,14 @@ enum class SystemQueryTargetType
|
||||
if (!ParserStringLiteral{}.parse(pos, ast, expected))
|
||||
return false;
|
||||
res->shard = ast->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (database && ParserKeyword{"FROM GROUP"}.ignore(pos, expected))
|
||||
{
|
||||
ASTPtr group_ast;
|
||||
if (!ParserStringLiteral{}.parse(pos, group_ast, expected))
|
||||
return false;
|
||||
res->replica_group = group_ast->as<ASTLiteral &>().value.safeGet<String>();
|
||||
}
|
||||
}
|
||||
|
||||
if (ParserKeyword{"FROM"}.ignore(pos, expected))
|
||||
|
@ -1,3 +1,3 @@
|
||||
<clickhouse>
|
||||
<database_replicated_cluster_group>backups</database_replicated_cluster_group>
|
||||
<replica_group_name>backups</replica_group_name>
|
||||
</clickhouse>
|
||||
|
@ -72,15 +72,15 @@ def test_cluster_groups(started_cluster):
|
||||
|
||||
# 1. system.clusters
|
||||
|
||||
query = "SELECT host_name from system.clusters WHERE cluster = 'cluster_groups' ORDER BY host_name"
|
||||
cluster_query = "SELECT host_name from system.clusters WHERE cluster = 'cluster_groups' ORDER BY host_name"
|
||||
expected_main = "main_node_1\nmain_node_2\n"
|
||||
expected_backup = "backup_node_1\nbackup_node_2\n"
|
||||
|
||||
for node in [main_node_1, main_node_2]:
|
||||
assert_eq_with_retry(node, query, expected_main)
|
||||
assert_eq_with_retry(node, cluster_query, expected_main)
|
||||
|
||||
for node in [backup_node_1, backup_node_2]:
|
||||
assert_eq_with_retry(node, query, expected_backup)
|
||||
assert_eq_with_retry(node, cluster_query, expected_backup)
|
||||
|
||||
# 2. Query execution depends only on your cluster group
|
||||
|
||||
@ -114,3 +114,16 @@ def test_cluster_groups(started_cluster):
|
||||
|
||||
assert_create_query(all_nodes, "cluster_groups.table_1", expected_1)
|
||||
assert_create_query(all_nodes, "cluster_groups.table_2", expected_2)
|
||||
|
||||
# 4. SYSTEM DROP DATABASE REPLICA
|
||||
backup_node_2.stop_clickhouse()
|
||||
backup_node_1.query(
|
||||
"SYSTEM DROP DATABASE REPLICA '4' FROM SHARD '1' FROM GROUP 'backups' FROM DATABASE cluster_groups"
|
||||
)
|
||||
|
||||
assert_eq_with_retry(backup_node_1, cluster_query, "backup_node_1\n")
|
||||
|
||||
main_node_2.stop_clickhouse()
|
||||
main_node_1.query("SYSTEM DROP DATABASE REPLICA '1|2' FROM DATABASE cluster_groups")
|
||||
|
||||
assert_eq_with_retry(main_node_1, cluster_query, "main_node_1\n")
|
||||
|
Loading…
Reference in New Issue
Block a user