diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md
index bd8e1da2f1e..fa9f8e70692 100644
--- a/docs/en/operations/server-configuration-parameters/settings.md
+++ b/docs/en/operations/server-configuration-parameters/settings.md
@@ -1396,6 +1396,23 @@ For more information, see the section [Creating replicated tables](../../engines
```
+## replica_group_name {#replica_group_name}
+
+Replica group name for database Replicated.
+
+The cluster created by Replicated database will consist of replicas in the same group.
+DDL queries will only wail for the replicas in the same group.
+
+Empty by default.
+
+**Example**
+
+``` xml
+backups
+```
+
+Default value: ``.
+
## max_open_files {#max-open-files}
The maximum number of open files.
diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md
index 1558e64f99b..578ff38574a 100644
--- a/docs/en/sql-reference/statements/system.md
+++ b/docs/en/sql-reference/statements/system.md
@@ -97,12 +97,12 @@ The fourth one is useful to remove metadata of dead replica when all other repli
Dead replicas of `Replicated` databases can be dropped using following syntax:
``` sql
-SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM DATABASE database;
-SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'];
-SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM ZKPATH '/path/to/table/in/zk';
+SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM DATABASE database;
+SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'];
+SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM ZKPATH '/path/to/table/in/zk';
```
-Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format.
+Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. Replica group name is the name defined by `replica_group_name` [setting](../../operations/server-configuration-parameters/settings.md#replica_group_name) in the server configuration. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format if replica groups are not used and in `shard_name|replica_name|group_name` otherwise.
## DROP UNCOMPRESSED CACHE
diff --git a/programs/server/config.xml b/programs/server/config.xml
index 1dd527805fd..a1e2907f6b6 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -926,6 +926,15 @@
-->
+
+
+
3600
diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp
index 91153f2302f..1c44a074c96 100644
--- a/src/Databases/DatabaseReplicated.cpp
+++ b/src/Databases/DatabaseReplicated.cpp
@@ -116,28 +116,52 @@ DatabaseReplicated::DatabaseReplicated(
if (!db_settings.collection_name.value.empty())
fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef());
+
+ replica_group_name = context_->getConfigRef().getString("replica_group_name", "");
+
+ if (replica_group_name.find('/') != std::string::npos)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '/': {}", replica_group_name);
+ if (replica_group_name.find('|') != std::string::npos)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '|': {}", replica_group_name);
}
-String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica)
+String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica, const String & replica_group)
{
- return shard + '|' + replica;
+ if (replica_group.empty())
+ return shard + '|' + replica;
+ else
+ return shard + '|' + replica + '|' + replica_group;
}
String DatabaseReplicated::getFullReplicaName() const
{
- return getFullReplicaName(shard_name, replica_name);
+ return getFullReplicaName(shard_name, replica_name, replica_group_name);
}
-std::pair DatabaseReplicated::parseFullReplicaName(const String & name)
+DatabaseReplicated::NameParts DatabaseReplicated::parseFullReplicaName(const String & name)
{
- String shard;
- String replica;
- auto pos = name.find('|');
- if (pos == std::string::npos || name.find('|', pos + 1) != std::string::npos)
+ NameParts parts;
+
+ auto pos_first = name.find('|');
+ if (pos_first == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
- shard = name.substr(0, pos);
- replica = name.substr(pos + 1);
- return {shard, replica};
+
+ parts.shard = name.substr(0, pos_first);
+
+ auto pos_second = name.find('|', pos_first + 1);
+ if (pos_second == std::string::npos)
+ {
+ parts.replica = name.substr(pos_first + 1);
+ return parts;
+ }
+
+ if (name.find('|', pos_second + 1) != std::string::npos)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
+
+ parts.replica = name.substr(pos_first + 1, pos_second - pos_first - 1);
+ parts.replica_group = name.substr(pos_second + 1);
+
+ return parts;
}
ClusterPtr DatabaseReplicated::tryGetCluster() const
@@ -175,6 +199,7 @@ void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
ClusterPtr DatabaseReplicated::getClusterImpl() const
{
+ Strings unfiltered_hosts;
Strings hosts;
Strings host_ids;
@@ -186,11 +211,18 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
{
host_ids.resize(0);
Coordination::Stat stat;
- hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
- if (hosts.empty())
+ unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
+ if (unfiltered_hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", zookeeper_path);
+
+ for (const auto & host : unfiltered_hosts)
+ {
+ if (replica_group_name == parseFullReplicaName(host).replica_group)
+ hosts.push_back(host);
+ }
+
Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end());
@@ -221,7 +253,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
assert(!hosts.empty());
assert(hosts.size() == host_ids.size());
- String current_shard = parseFullReplicaName(hosts.front()).first;
+ String current_shard = parseFullReplicaName(hosts.front()).shard;
std::vector> shards;
shards.emplace_back();
for (size_t i = 0; i < hosts.size(); ++i)
@@ -229,17 +261,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
const auto & id = host_ids[i];
if (id == DROPPED_MARK)
continue;
- auto [shard, replica] = parseFullReplicaName(hosts[i]);
+ auto parts = parseFullReplicaName(hosts[i]);
auto pos = id.rfind(':');
String host_port = id.substr(0, pos);
- if (shard != current_shard)
+ if (parts.shard != current_shard)
{
- current_shard = shard;
+ current_shard = parts.shard;
if (!shards.back().empty())
shards.emplace_back();
}
String hostname = unescapeForFileName(host_port);
- shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(shard), std::move(replica)});
+ shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(parts.shard), std::move(parts.replica), std::move(parts.replica_group)});
}
UInt16 default_port = getContext()->getTCPPort();
@@ -269,7 +301,7 @@ std::vector DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
{
for (const auto & replica : addresses_with_failover[shard_index])
{
- String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
+ String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
@@ -309,6 +341,7 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
}
+
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
{
try
@@ -464,8 +497,26 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
for (int attempts = 10; attempts > 0; --attempts)
{
- Coordination::Stat stat;
- String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
+ Coordination::Stat stat_max_log_ptr;
+ Coordination::Stat stat_replicas;
+ String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat_max_log_ptr);
+ Strings replicas = current_zookeeper->getChildren(zookeeper_path + "/replicas", &stat_replicas);
+ for (const auto & replica : replicas)
+ {
+ NameParts parts = parseFullReplicaName(replica);
+ if (parts.shard == shard_name && parts.replica == replica_name)
+ {
+ throw Exception(
+ ErrorCodes::REPLICA_ALREADY_EXISTS,
+ "Replica {} of shard {} of replicated database already exists in the replica group {} at {}",
+ replica_name, shard_name, parts.replica_group, zookeeper_path);
+ }
+ }
+
+ /// This way we make sure that other replica with the same replica_name and shard_name
+ /// but with a different replica_group_name was not created at the same time.
+ String replica_value = "Last added replica: " + getFullReplicaName();
+
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
@@ -473,7 +524,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering.
- ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
+ ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat_max_log_ptr.version));
+ ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", replica_value, stat_replicas.version));
Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
@@ -704,7 +756,15 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
entry.tracing_context = OpenTelemetry::CurrentContext();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
- Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
+ Strings hosts_to_wait;
+ Strings unfiltered_hosts = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
+
+ for (const auto & host : unfiltered_hosts)
+ {
+ if (replica_group_name == parseFullReplicaName(host).replica_group)
+ hosts_to_wait.push_back(host);
+ }
+
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
}
@@ -1112,11 +1172,11 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
}
void DatabaseReplicated::dropReplica(
- DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica)
+ DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group)
{
assert(!database || database_zookeeper_path == database->zookeeper_path);
- String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica);
+ String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica, replica_group);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);
diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h
index 7ba91e48085..1622578f3d9 100644
--- a/src/Databases/DatabaseReplicated.h
+++ b/src/Databases/DatabaseReplicated.h
@@ -54,11 +54,19 @@ public:
void stopReplication() override;
+ struct NameParts
+ {
+ String shard;
+ String replica;
+ String replica_group;
+ };
+
String getShardName() const { return shard_name; }
String getReplicaName() const { return replica_name; }
+ String getReplicaGroupName() const { return replica_group_name; }
String getFullReplicaName() const;
- static String getFullReplicaName(const String & shard, const String & replica);
- static std::pair parseFullReplicaName(const String & name);
+ static String getFullReplicaName(const String & shard, const String & replica, const String & replica_group);
+ static NameParts parseFullReplicaName(const String & name);
const String & getZooKeeperPath() const { return zookeeper_path; }
@@ -80,7 +88,7 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
- static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica);
+ static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group);
std::vector tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
@@ -126,6 +134,7 @@ private:
String zookeeper_path;
String shard_name;
String replica_name;
+ String replica_group_name;
String replica_path;
DatabaseReplicatedSettings db_settings;
diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp
index 82c3d48bc05..fbc760bc486 100644
--- a/src/Interpreters/Cluster.cpp
+++ b/src/Interpreters/Cluster.cpp
@@ -159,6 +159,7 @@ Cluster::Address::Address(
host_name = parsed_host_port.first;
database_shard_name = info.shard_name;
database_replica_name = info.replica_name;
+ database_replica_group_name = info.replica_group_name;
port = parsed_host_port.second;
secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = params.priority;
@@ -516,7 +517,7 @@ Cluster::Cluster(
Addresses current;
for (const auto & replica : shard)
current.emplace_back(
- DatabaseReplicaInfo{replica, "", ""},
+ DatabaseReplicaInfo{replica, "", "", ""},
params,
current_shard_num,
current.size() + 1);
diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h
index b2bc03dd74d..acda6d9afec 100644
--- a/src/Interpreters/Cluster.h
+++ b/src/Interpreters/Cluster.h
@@ -35,6 +35,7 @@ struct DatabaseReplicaInfo
String hostname;
String shard_name;
String replica_name;
+ String replica_group_name;
};
struct ClusterConnectionParameters
@@ -111,6 +112,7 @@ public:
String host_name;
String database_shard_name;
String database_replica_name;
+ String database_replica_group_name;
UInt16 port{0};
String user;
String password;
diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp
index 07a1ae7d170..d11c2d9a969 100644
--- a/src/Interpreters/InterpreterSystemQuery.cpp
+++ b/src/Interpreters/InterpreterSystemQuery.cpp
@@ -927,7 +927,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return;
String full_replica_name = query_.shard.empty() ? query_.replica
- : DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica);
+ : DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica, query_.replica_group);
if (replicated->getFullReplicaName() != full_replica_name)
return;
@@ -943,7 +943,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast(database.get()))
{
check_not_local_replica(replicated, query);
- DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
+ DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
@@ -968,7 +968,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
}
check_not_local_replica(replicated, query);
- DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
+ DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
@@ -981,7 +981,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast(elem.second.get()))
check_not_local_replica(replicated, query);
- DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica);
+ DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, query.replica_group);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else
diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp
index 750affdfe71..188865cb35c 100644
--- a/src/Interpreters/executeDDLQueryOnCluster.cpp
+++ b/src/Interpreters/executeDDLQueryOnCluster.cpp
@@ -357,9 +357,9 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
size_t num = 0;
if (is_replicated_database)
{
- auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
- columns[num++]->insert(shard);
- columns[num++]->insert(replica);
+ auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
+ columns[num++]->insert(parts.shard);
+ columns[num++]->insert(parts.replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
@@ -511,9 +511,9 @@ Chunk DDLQueryStatusSource::generate()
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
- auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
- columns[num++]->insert(shard);
- columns[num++]->insert(replica);
+ auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
+ columns[num++]->insert(parts.shard);
+ columns[num++]->insert(parts.replica);
columns[num++]->insert(OK);
}
else
diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h
index cc06e0fdcb5..3315d7dd3b6 100644
--- a/src/Parsers/ASTSystemQuery.h
+++ b/src/Parsers/ASTSystemQuery.h
@@ -107,6 +107,7 @@ public:
String replica;
String shard;
String replica_zk_path;
+ String replica_group;
bool is_drop_whole_replica{};
String storage_policy;
String volume;
diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp
index a26fdc1396b..979debeb75f 100644
--- a/src/Parsers/ParserSystemQuery.cpp
+++ b/src/Parsers/ParserSystemQuery.cpp
@@ -165,6 +165,14 @@ enum class SystemQueryTargetType
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->shard = ast->as().value.safeGet();
+
+ if (database && ParserKeyword{"FROM GROUP"}.ignore(pos, expected))
+ {
+ ASTPtr group_ast;
+ if (!ParserStringLiteral{}.parse(pos, group_ast, expected))
+ return false;
+ res->replica_group = group_ast->as().value.safeGet();
+ }
}
if (ParserKeyword{"FROM"}.ignore(pos, expected))
diff --git a/tests/integration/test_replicated_database_cluster_groups/__init__.py b/tests/integration/test_replicated_database_cluster_groups/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/integration/test_replicated_database_cluster_groups/configs/backup_group.xml b/tests/integration/test_replicated_database_cluster_groups/configs/backup_group.xml
new file mode 100644
index 00000000000..3df343bbc9e
--- /dev/null
+++ b/tests/integration/test_replicated_database_cluster_groups/configs/backup_group.xml
@@ -0,0 +1,3 @@
+
+ backups
+
diff --git a/tests/integration/test_replicated_database_cluster_groups/configs/settings.xml b/tests/integration/test_replicated_database_cluster_groups/configs/settings.xml
new file mode 100644
index 00000000000..5666ffeace8
--- /dev/null
+++ b/tests/integration/test_replicated_database_cluster_groups/configs/settings.xml
@@ -0,0 +1,14 @@
+
+
+
+ 1
+ 1
+ 1
+
+
+
+
+ default
+
+
+
diff --git a/tests/integration/test_replicated_database_cluster_groups/test.py b/tests/integration/test_replicated_database_cluster_groups/test.py
new file mode 100644
index 00000000000..db43c37bc6e
--- /dev/null
+++ b/tests/integration/test_replicated_database_cluster_groups/test.py
@@ -0,0 +1,129 @@
+import re
+import pytest
+
+from helpers.cluster import ClickHouseCluster
+from helpers.test_tools import assert_eq_with_retry
+
+cluster = ClickHouseCluster(__file__)
+
+main_node_1 = cluster.add_instance(
+ "main_node_1",
+ user_configs=["configs/settings.xml"],
+ with_zookeeper=True,
+ stay_alive=True,
+ macros={"shard": 1, "replica": 1},
+)
+main_node_2 = cluster.add_instance(
+ "main_node_2",
+ user_configs=["configs/settings.xml"],
+ with_zookeeper=True,
+ stay_alive=True,
+ macros={"shard": 1, "replica": 2},
+)
+backup_node_1 = cluster.add_instance(
+ "backup_node_1",
+ main_configs=["configs/backup_group.xml"],
+ user_configs=["configs/settings.xml"],
+ with_zookeeper=True,
+ stay_alive=True,
+ macros={"shard": 1, "replica": 3},
+)
+backup_node_2 = cluster.add_instance(
+ "backup_node_2",
+ main_configs=["configs/backup_group.xml"],
+ user_configs=["configs/settings.xml"],
+ with_zookeeper=True,
+ stay_alive=True,
+ macros={"shard": 1, "replica": 4},
+)
+
+all_nodes = [
+ main_node_1,
+ main_node_2,
+ backup_node_1,
+ backup_node_2,
+]
+
+uuid_regex = re.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
+
+
+def assert_create_query(nodes, table_name, expected):
+ replace_uuid = lambda x: re.sub(uuid_regex, "uuid", x)
+ query = "show create table {}".format(table_name)
+ for node in nodes:
+ assert_eq_with_retry(node, query, expected, get_result=replace_uuid)
+
+
+@pytest.fixture(scope="module")
+def started_cluster():
+ try:
+ cluster.start()
+ yield cluster
+
+ finally:
+ cluster.shutdown()
+
+
+def test_cluster_groups(started_cluster):
+ for node in all_nodes:
+ node.query(
+ f"CREATE DATABASE cluster_groups ENGINE = Replicated('/test/cluster_groups', '{node.macros['shard']}', '{node.macros['replica']}');"
+ )
+
+ # 1. system.clusters
+
+ cluster_query = "SELECT host_name from system.clusters WHERE cluster = 'cluster_groups' ORDER BY host_name"
+ expected_main = "main_node_1\nmain_node_2\n"
+ expected_backup = "backup_node_1\nbackup_node_2\n"
+
+ for node in [main_node_1, main_node_2]:
+ assert_eq_with_retry(node, cluster_query, expected_main)
+
+ for node in [backup_node_1, backup_node_2]:
+ assert_eq_with_retry(node, cluster_query, expected_backup)
+
+ # 2. Query execution depends only on your cluster group
+
+ backup_node_1.stop_clickhouse()
+ backup_node_2.stop_clickhouse()
+
+ # OK
+ main_node_1.query(
+ "CREATE TABLE cluster_groups.table_1 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
+ )
+
+ # Exception
+ main_node_2.stop_clickhouse()
+ settings = {"distributed_ddl_task_timeout": 5}
+ assert (
+ "There are 1 unfinished hosts (0 of them are currently active)"
+ in main_node_1.query_and_get_error(
+ "CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
+ settings=settings,
+ )
+ )
+
+ # 3. After start both groups are synced
+
+ backup_node_1.start_clickhouse()
+ backup_node_2.start_clickhouse()
+ main_node_2.start_clickhouse()
+
+ expected_1 = "CREATE TABLE cluster_groups.table_1\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
+ expected_2 = "CREATE TABLE cluster_groups.table_2\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
+
+ assert_create_query(all_nodes, "cluster_groups.table_1", expected_1)
+ assert_create_query(all_nodes, "cluster_groups.table_2", expected_2)
+
+ # 4. SYSTEM DROP DATABASE REPLICA
+ backup_node_2.stop_clickhouse()
+ backup_node_1.query(
+ "SYSTEM DROP DATABASE REPLICA '4' FROM SHARD '1' FROM GROUP 'backups' FROM DATABASE cluster_groups"
+ )
+
+ assert_eq_with_retry(backup_node_1, cluster_query, "backup_node_1\n")
+
+ main_node_2.stop_clickhouse()
+ main_node_1.query("SYSTEM DROP DATABASE REPLICA '1|2' FROM DATABASE cluster_groups")
+
+ assert_eq_with_retry(main_node_1, cluster_query, "main_node_1\n")