Merge pull request #54421 from evillique/cluster-groups

Add replica groups to Replicated database engine
This commit is contained in:
robot-clickhouse-ci-1 2023-10-12 18:14:42 +02:00 committed by GitHub
commit 211c80e1a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 296 additions and 43 deletions

View File

@ -1396,6 +1396,23 @@ For more information, see the section [Creating replicated tables](../../engines
<macros incl="macros" optional="true" />
```
## replica_group_name {#replica_group_name}
Replica group name for database Replicated.
The cluster created by Replicated database will consist of replicas in the same group.
DDL queries will only wail for the replicas in the same group.
Empty by default.
**Example**
``` xml
<replica_group_name>backups</replica_group_name>
```
Default value: ``.
## max_open_files {#max-open-files}
The maximum number of open files.

View File

@ -97,12 +97,12 @@ The fourth one is useful to remove metadata of dead replica when all other repli
Dead replicas of `Replicated` databases can be dropped using following syntax:
``` sql
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM DATABASE database;
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'];
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM ZKPATH '/path/to/table/in/zk';
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM DATABASE database;
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'];
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM ZKPATH '/path/to/table/in/zk';
```
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format.
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. Replica group name is the name defined by `replica_group_name` [setting](../../operations/server-configuration-parameters/settings.md#replica_group_name) in the server configuration. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format if replica groups are not used and in `shard_name|replica_name|group_name` otherwise.
## DROP UNCOMPRESSED CACHE

View File

@ -926,6 +926,15 @@
</macros>
-->
<!-- Replica group name for database Replicated.
The cluster created by Replicated database will consist of replicas in the same group.
DDL queries will only wail for the replicas in the same group.
Empty by default.
-->
<!--
<replica_group_name><replica_group_name>
-->
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>

View File

@ -116,28 +116,52 @@ DatabaseReplicated::DatabaseReplicated(
if (!db_settings.collection_name.value.empty())
fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef());
replica_group_name = context_->getConfigRef().getString("replica_group_name", "");
if (replica_group_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '/': {}", replica_group_name);
if (replica_group_name.find('|') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '|': {}", replica_group_name);
}
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica)
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica, const String & replica_group)
{
return shard + '|' + replica;
if (replica_group.empty())
return shard + '|' + replica;
else
return shard + '|' + replica + '|' + replica_group;
}
String DatabaseReplicated::getFullReplicaName() const
{
return getFullReplicaName(shard_name, replica_name);
return getFullReplicaName(shard_name, replica_name, replica_group_name);
}
std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String & name)
DatabaseReplicated::NameParts DatabaseReplicated::parseFullReplicaName(const String & name)
{
String shard;
String replica;
auto pos = name.find('|');
if (pos == std::string::npos || name.find('|', pos + 1) != std::string::npos)
NameParts parts;
auto pos_first = name.find('|');
if (pos_first == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
shard = name.substr(0, pos);
replica = name.substr(pos + 1);
return {shard, replica};
parts.shard = name.substr(0, pos_first);
auto pos_second = name.find('|', pos_first + 1);
if (pos_second == std::string::npos)
{
parts.replica = name.substr(pos_first + 1);
return parts;
}
if (name.find('|', pos_second + 1) != std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
parts.replica = name.substr(pos_first + 1, pos_second - pos_first - 1);
parts.replica_group = name.substr(pos_second + 1);
return parts;
}
ClusterPtr DatabaseReplicated::tryGetCluster() const
@ -175,6 +199,7 @@ void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
ClusterPtr DatabaseReplicated::getClusterImpl() const
{
Strings unfiltered_hosts;
Strings hosts;
Strings host_ids;
@ -186,11 +211,18 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
{
host_ids.resize(0);
Coordination::Stat stat;
hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
if (hosts.empty())
unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
if (unfiltered_hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", zookeeper_path);
for (const auto & host : unfiltered_hosts)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts.push_back(host);
}
Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end());
@ -221,7 +253,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
assert(!hosts.empty());
assert(hosts.size() == host_ids.size());
String current_shard = parseFullReplicaName(hosts.front()).first;
String current_shard = parseFullReplicaName(hosts.front()).shard;
std::vector<std::vector<DatabaseReplicaInfo>> shards;
shards.emplace_back();
for (size_t i = 0; i < hosts.size(); ++i)
@ -229,17 +261,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
const auto & id = host_ids[i];
if (id == DROPPED_MARK)
continue;
auto [shard, replica] = parseFullReplicaName(hosts[i]);
auto parts = parseFullReplicaName(hosts[i]);
auto pos = id.rfind(':');
String host_port = id.substr(0, pos);
if (shard != current_shard)
if (parts.shard != current_shard)
{
current_shard = shard;
current_shard = parts.shard;
if (!shards.back().empty())
shards.emplace_back();
}
String hostname = unescapeForFileName(host_port);
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(shard), std::move(replica)});
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(parts.shard), std::move(parts.replica), std::move(parts.replica_group)});
}
UInt16 default_port = getContext()->getTCPPort();
@ -269,7 +301,7 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
@ -309,6 +341,7 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
}
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
{
try
@ -464,8 +497,26 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
for (int attempts = 10; attempts > 0; --attempts)
{
Coordination::Stat stat;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
Coordination::Stat stat_max_log_ptr;
Coordination::Stat stat_replicas;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat_max_log_ptr);
Strings replicas = current_zookeeper->getChildren(zookeeper_path + "/replicas", &stat_replicas);
for (const auto & replica : replicas)
{
NameParts parts = parseFullReplicaName(replica);
if (parts.shard == shard_name && parts.replica == replica_name)
{
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Replica {} of shard {} of replicated database already exists in the replica group {} at {}",
replica_name, shard_name, parts.replica_group, zookeeper_path);
}
}
/// This way we make sure that other replica with the same replica_name and shard_name
/// but with a different replica_group_name was not created at the same time.
String replica_value = "Last added replica: " + getFullReplicaName();
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
@ -473,7 +524,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat_max_log_ptr.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", replica_value, stat_replicas.version));
Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
@ -704,7 +756,15 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
entry.tracing_context = OpenTelemetry::CurrentContext();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
Strings hosts_to_wait;
Strings unfiltered_hosts = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
for (const auto & host : unfiltered_hosts)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts_to_wait.push_back(host);
}
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
}
@ -1112,11 +1172,11 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
}
void DatabaseReplicated::dropReplica(
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica)
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group)
{
assert(!database || database_zookeeper_path == database->zookeeper_path);
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica);
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica, replica_group);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);

View File

@ -54,11 +54,19 @@ public:
void stopReplication() override;
struct NameParts
{
String shard;
String replica;
String replica_group;
};
String getShardName() const { return shard_name; }
String getReplicaName() const { return replica_name; }
String getReplicaGroupName() const { return replica_group_name; }
String getFullReplicaName() const;
static String getFullReplicaName(const String & shard, const String & replica);
static std::pair<String, String> parseFullReplicaName(const String & name);
static String getFullReplicaName(const String & shard, const String & replica, const String & replica_group);
static NameParts parseFullReplicaName(const String & name);
const String & getZooKeeperPath() const { return zookeeper_path; }
@ -80,7 +88,7 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica);
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
@ -126,6 +134,7 @@ private:
String zookeeper_path;
String shard_name;
String replica_name;
String replica_group_name;
String replica_path;
DatabaseReplicatedSettings db_settings;

View File

@ -159,6 +159,7 @@ Cluster::Address::Address(
host_name = parsed_host_port.first;
database_shard_name = info.shard_name;
database_replica_name = info.replica_name;
database_replica_group_name = info.replica_group_name;
port = parsed_host_port.second;
secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = params.priority;
@ -516,7 +517,7 @@ Cluster::Cluster(
Addresses current;
for (const auto & replica : shard)
current.emplace_back(
DatabaseReplicaInfo{replica, "", ""},
DatabaseReplicaInfo{replica, "", "", ""},
params,
current_shard_num,
current.size() + 1);

View File

@ -35,6 +35,7 @@ struct DatabaseReplicaInfo
String hostname;
String shard_name;
String replica_name;
String replica_group_name;
};
struct ClusterConnectionParameters
@ -111,6 +112,7 @@ public:
String host_name;
String database_shard_name;
String database_replica_name;
String database_replica_group_name;
UInt16 port{0};
String user;
String password;

View File

@ -927,7 +927,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return;
String full_replica_name = query_.shard.empty() ? query_.replica
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica);
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica, query_.replica_group);
if (replicated->getFullReplicaName() != full_replica_name)
return;
@ -943,7 +943,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
@ -968,7 +968,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
}
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
@ -981,7 +981,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, query.replica_group);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else

View File

@ -357,9 +357,9 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
size_t num = 0;
if (is_replicated_database)
{
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(parts.shard);
columns[num++]->insert(parts.replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
@ -511,9 +511,9 @@ Chunk DDLQueryStatusSource::generate()
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(parts.shard);
columns[num++]->insert(parts.replica);
columns[num++]->insert(OK);
}
else

View File

@ -107,6 +107,7 @@ public:
String replica;
String shard;
String replica_zk_path;
String replica_group;
bool is_drop_whole_replica{};
String storage_policy;
String volume;

View File

@ -165,6 +165,14 @@ enum class SystemQueryTargetType
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->shard = ast->as<ASTLiteral &>().value.safeGet<String>();
if (database && ParserKeyword{"FROM GROUP"}.ignore(pos, expected))
{
ASTPtr group_ast;
if (!ParserStringLiteral{}.parse(pos, group_ast, expected))
return false;
res->replica_group = group_ast->as<ASTLiteral &>().value.safeGet<String>();
}
}
if (ParserKeyword{"FROM"}.ignore(pos, expected))

View File

@ -0,0 +1,3 @@
<clickhouse>
<replica_group_name>backups</replica_group_name>
</clickhouse>

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<allow_drop_detached>1</allow_drop_detached>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,129 @@
import re
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
main_node_1 = cluster.add_instance(
"main_node_1",
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 1},
)
main_node_2 = cluster.add_instance(
"main_node_2",
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 2},
)
backup_node_1 = cluster.add_instance(
"backup_node_1",
main_configs=["configs/backup_group.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 3},
)
backup_node_2 = cluster.add_instance(
"backup_node_2",
main_configs=["configs/backup_group.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 4},
)
all_nodes = [
main_node_1,
main_node_2,
backup_node_1,
backup_node_2,
]
uuid_regex = re.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
def assert_create_query(nodes, table_name, expected):
replace_uuid = lambda x: re.sub(uuid_regex, "uuid", x)
query = "show create table {}".format(table_name)
for node in nodes:
assert_eq_with_retry(node, query, expected, get_result=replace_uuid)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_cluster_groups(started_cluster):
for node in all_nodes:
node.query(
f"CREATE DATABASE cluster_groups ENGINE = Replicated('/test/cluster_groups', '{node.macros['shard']}', '{node.macros['replica']}');"
)
# 1. system.clusters
cluster_query = "SELECT host_name from system.clusters WHERE cluster = 'cluster_groups' ORDER BY host_name"
expected_main = "main_node_1\nmain_node_2\n"
expected_backup = "backup_node_1\nbackup_node_2\n"
for node in [main_node_1, main_node_2]:
assert_eq_with_retry(node, cluster_query, expected_main)
for node in [backup_node_1, backup_node_2]:
assert_eq_with_retry(node, cluster_query, expected_backup)
# 2. Query execution depends only on your cluster group
backup_node_1.stop_clickhouse()
backup_node_2.stop_clickhouse()
# OK
main_node_1.query(
"CREATE TABLE cluster_groups.table_1 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
)
# Exception
main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,
)
)
# 3. After start both groups are synced
backup_node_1.start_clickhouse()
backup_node_2.start_clickhouse()
main_node_2.start_clickhouse()
expected_1 = "CREATE TABLE cluster_groups.table_1\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
expected_2 = "CREATE TABLE cluster_groups.table_2\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
assert_create_query(all_nodes, "cluster_groups.table_1", expected_1)
assert_create_query(all_nodes, "cluster_groups.table_2", expected_2)
# 4. SYSTEM DROP DATABASE REPLICA
backup_node_2.stop_clickhouse()
backup_node_1.query(
"SYSTEM DROP DATABASE REPLICA '4' FROM SHARD '1' FROM GROUP 'backups' FROM DATABASE cluster_groups"
)
assert_eq_with_retry(backup_node_1, cluster_query, "backup_node_1\n")
main_node_2.stop_clickhouse()
main_node_1.query("SYSTEM DROP DATABASE REPLICA '1|2' FROM DATABASE cluster_groups")
assert_eq_with_retry(main_node_1, cluster_query, "main_node_1\n")