This reverts commit 3e60451ac7.
This commit is contained in:
Nikolay Degterinsky 2023-10-14 19:04:01 +00:00
parent 2bb5b79f02
commit c1fba94d67
15 changed files with 296 additions and 43 deletions

View File

@ -1396,6 +1396,23 @@ For more information, see the section [Creating replicated tables](../../engines
<macros incl="macros" optional="true" /> <macros incl="macros" optional="true" />
``` ```
## replica_group_name {#replica_group_name}
Replica group name for database Replicated.
The cluster created by Replicated database will consist of replicas in the same group.
DDL queries will only wail for the replicas in the same group.
Empty by default.
**Example**
``` xml
<replica_group_name>backups</replica_group_name>
```
Default value: ``.
## max_open_files {#max-open-files} ## max_open_files {#max-open-files}
The maximum number of open files. The maximum number of open files.

View File

@ -97,12 +97,12 @@ The fourth one is useful to remove metadata of dead replica when all other repli
Dead replicas of `Replicated` databases can be dropped using following syntax: Dead replicas of `Replicated` databases can be dropped using following syntax:
``` sql ``` sql
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM DATABASE database; SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM DATABASE database;
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name']; SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'];
SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] FROM ZKPATH '/path/to/table/in/zk'; SYSTEM DROP DATABASE REPLICA 'replica_name' [FROM SHARD 'shard_name'] [FROM GROUP 'group_name'] FROM ZKPATH '/path/to/table/in/zk';
``` ```
Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format. Similar to `SYSTEM DROP REPLICA`, but removes the `Replicated` database replica path from ZooKeeper when there's no database to run `DROP DATABASE`. Please note that it does not remove `ReplicatedMergeTree` replicas (so you may need `SYSTEM DROP REPLICA` as well). Shard and replica names are the names that were specified in `Replicated` engine arguments when creating the database. Also, these names can be obtained from `database_shard_name` and `database_replica_name` columns in `system.clusters`. Replica group name is the name defined by `replica_group_name` [setting](../../operations/server-configuration-parameters/settings.md#replica_group_name) in the server configuration. If the `FROM SHARD` clause is missing, then `replica_name` must be a full replica name in `shard_name|replica_name` format if replica groups are not used and in `shard_name|replica_name|group_name` otherwise.
## DROP UNCOMPRESSED CACHE ## DROP UNCOMPRESSED CACHE

View File

@ -926,6 +926,15 @@
</macros> </macros>
--> -->
<!-- Replica group name for database Replicated.
The cluster created by Replicated database will consist of replicas in the same group.
DDL queries will only wail for the replicas in the same group.
Empty by default.
-->
<!--
<replica_group_name><replica_group_name>
-->
<!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. --> <!-- Reloading interval for embedded dictionaries, in seconds. Default: 3600. -->
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval> <builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>

View File

@ -116,28 +116,52 @@ DatabaseReplicated::DatabaseReplicated(
if (!db_settings.collection_name.value.empty()) if (!db_settings.collection_name.value.empty())
fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef()); fillClusterAuthInfo(db_settings.collection_name.value, context_->getConfigRef());
replica_group_name = context_->getConfigRef().getString("replica_group_name", "");
if (replica_group_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '/': {}", replica_group_name);
if (replica_group_name.find('|') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica group name should not contain '|': {}", replica_group_name);
} }
String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica) String DatabaseReplicated::getFullReplicaName(const String & shard, const String & replica, const String & replica_group)
{ {
if (replica_group.empty())
return shard + '|' + replica; return shard + '|' + replica;
else
return shard + '|' + replica + '|' + replica_group;
} }
String DatabaseReplicated::getFullReplicaName() const String DatabaseReplicated::getFullReplicaName() const
{ {
return getFullReplicaName(shard_name, replica_name); return getFullReplicaName(shard_name, replica_name, replica_group_name);
} }
std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String & name) DatabaseReplicated::NameParts DatabaseReplicated::parseFullReplicaName(const String & name)
{ {
String shard; NameParts parts;
String replica;
auto pos = name.find('|'); auto pos_first = name.find('|');
if (pos == std::string::npos || name.find('|', pos + 1) != std::string::npos) if (pos_first == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
shard = name.substr(0, pos);
replica = name.substr(pos + 1); parts.shard = name.substr(0, pos_first);
return {shard, replica};
auto pos_second = name.find('|', pos_first + 1);
if (pos_second == std::string::npos)
{
parts.replica = name.substr(pos_first + 1);
return parts;
}
if (name.find('|', pos_second + 1) != std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect replica identifier: {}", name);
parts.replica = name.substr(pos_first + 1, pos_second - pos_first - 1);
parts.replica_group = name.substr(pos_second + 1);
return parts;
} }
ClusterPtr DatabaseReplicated::tryGetCluster() const ClusterPtr DatabaseReplicated::tryGetCluster() const
@ -175,6 +199,7 @@ void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
ClusterPtr DatabaseReplicated::getClusterImpl() const ClusterPtr DatabaseReplicated::getClusterImpl() const
{ {
Strings unfiltered_hosts;
Strings hosts; Strings hosts;
Strings host_ids; Strings host_ids;
@ -186,11 +211,18 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
{ {
host_ids.resize(0); host_ids.resize(0);
Coordination::Stat stat; Coordination::Stat stat;
hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat); unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
if (hosts.empty()) if (unfiltered_hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. " throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet " "It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", zookeeper_path); "or if the last replica was just dropped or due to logical error", zookeeper_path);
for (const auto & host : unfiltered_hosts)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts.push_back(host);
}
Int32 cversion = stat.cversion; Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end()); ::sort(hosts.begin(), hosts.end());
@ -221,7 +253,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
assert(!hosts.empty()); assert(!hosts.empty());
assert(hosts.size() == host_ids.size()); assert(hosts.size() == host_ids.size());
String current_shard = parseFullReplicaName(hosts.front()).first; String current_shard = parseFullReplicaName(hosts.front()).shard;
std::vector<std::vector<DatabaseReplicaInfo>> shards; std::vector<std::vector<DatabaseReplicaInfo>> shards;
shards.emplace_back(); shards.emplace_back();
for (size_t i = 0; i < hosts.size(); ++i) for (size_t i = 0; i < hosts.size(); ++i)
@ -229,17 +261,17 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
const auto & id = host_ids[i]; const auto & id = host_ids[i];
if (id == DROPPED_MARK) if (id == DROPPED_MARK)
continue; continue;
auto [shard, replica] = parseFullReplicaName(hosts[i]); auto parts = parseFullReplicaName(hosts[i]);
auto pos = id.rfind(':'); auto pos = id.rfind(':');
String host_port = id.substr(0, pos); String host_port = id.substr(0, pos);
if (shard != current_shard) if (parts.shard != current_shard)
{ {
current_shard = shard; current_shard = parts.shard;
if (!shards.back().empty()) if (!shards.back().empty())
shards.emplace_back(); shards.emplace_back();
} }
String hostname = unescapeForFileName(host_port); String hostname = unescapeForFileName(host_port);
shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(shard), std::move(replica)}); shards.back().push_back(DatabaseReplicaInfo{std::move(hostname), std::move(parts.shard), std::move(parts.replica), std::move(parts.replica_group)});
} }
UInt16 default_port = getContext()->getTCPPort(); UInt16 default_port = getContext()->getTCPPort();
@ -269,7 +301,7 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
{ {
for (const auto & replica : addresses_with_failover[shard_index]) for (const auto & replica : addresses_with_failover[shard_index])
{ {
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name); String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active"); paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
} }
} }
@ -309,6 +341,7 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false); cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
} }
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode) void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
{ {
try try
@ -464,8 +497,26 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
for (int attempts = 10; attempts > 0; --attempts) for (int attempts = 10; attempts > 0; --attempts)
{ {
Coordination::Stat stat; Coordination::Stat stat_max_log_ptr;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat); Coordination::Stat stat_replicas;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat_max_log_ptr);
Strings replicas = current_zookeeper->getChildren(zookeeper_path + "/replicas", &stat_replicas);
for (const auto & replica : replicas)
{
NameParts parts = parseFullReplicaName(replica);
if (parts.shard == shard_name && parts.replica == replica_name)
{
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Replica {} of shard {} of replicated database already exists in the replica group {} at {}",
replica_name, shard_name, parts.replica_group, zookeeper_path);
}
}
/// This way we make sure that other replica with the same replica_name and shard_name
/// but with a different replica_group_name was not created at the same time.
String replica_value = "Last added replica: " + getFullReplicaName();
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
@ -473,7 +524,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where /// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to /// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering. /// notify other nodes that issued new queries while this node was recovering.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version)); ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat_max_log_ptr.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", replica_value, stat_replicas.version));
Coordination::Responses responses; Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses); const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
@ -704,7 +756,15 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
entry.tracing_context = OpenTelemetry::CurrentContext(); entry.tracing_context = OpenTelemetry::CurrentContext();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context); String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); Strings hosts_to_wait;
Strings unfiltered_hosts = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
for (const auto & host : unfiltered_hosts)
{
if (replica_group_name == parseFullReplicaName(host).replica_group)
hosts_to_wait.push_back(host);
}
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait); return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
} }
@ -1112,11 +1172,11 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
} }
void DatabaseReplicated::dropReplica( void DatabaseReplicated::dropReplica(
DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica) DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group)
{ {
assert(!database || database_zookeeper_path == database->zookeeper_path); assert(!database || database_zookeeper_path == database->zookeeper_path);
String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica); String full_replica_name = shard.empty() ? replica : getFullReplicaName(shard, replica, replica_group);
if (full_replica_name.find('/') != std::string::npos) if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);

View File

@ -54,11 +54,19 @@ public:
void stopReplication() override; void stopReplication() override;
struct NameParts
{
String shard;
String replica;
String replica_group;
};
String getShardName() const { return shard_name; } String getShardName() const { return shard_name; }
String getReplicaName() const { return replica_name; } String getReplicaName() const { return replica_name; }
String getReplicaGroupName() const { return replica_group_name; }
String getFullReplicaName() const; String getFullReplicaName() const;
static String getFullReplicaName(const String & shard, const String & replica); static String getFullReplicaName(const String & shard, const String & replica, const String & replica_group);
static std::pair<String, String> parseFullReplicaName(const String & name); static NameParts parseFullReplicaName(const String & name);
const String & getZooKeeperPath() const { return zookeeper_path; } const String & getZooKeeperPath() const { return zookeeper_path; }
@ -80,7 +88,7 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override; bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica); static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, const String & replica_group);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const; std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
@ -126,6 +134,7 @@ private:
String zookeeper_path; String zookeeper_path;
String shard_name; String shard_name;
String replica_name; String replica_name;
String replica_group_name;
String replica_path; String replica_path;
DatabaseReplicatedSettings db_settings; DatabaseReplicatedSettings db_settings;

View File

@ -159,6 +159,7 @@ Cluster::Address::Address(
host_name = parsed_host_port.first; host_name = parsed_host_port.first;
database_shard_name = info.shard_name; database_shard_name = info.shard_name;
database_replica_name = info.replica_name; database_replica_name = info.replica_name;
database_replica_group_name = info.replica_group_name;
port = parsed_host_port.second; port = parsed_host_port.second;
secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable; secure = params.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = params.priority; priority = params.priority;
@ -516,7 +517,7 @@ Cluster::Cluster(
Addresses current; Addresses current;
for (const auto & replica : shard) for (const auto & replica : shard)
current.emplace_back( current.emplace_back(
DatabaseReplicaInfo{replica, "", ""}, DatabaseReplicaInfo{replica, "", "", ""},
params, params,
current_shard_num, current_shard_num,
current.size() + 1); current.size() + 1);

View File

@ -35,6 +35,7 @@ struct DatabaseReplicaInfo
String hostname; String hostname;
String shard_name; String shard_name;
String replica_name; String replica_name;
String replica_group_name;
}; };
struct ClusterConnectionParameters struct ClusterConnectionParameters
@ -111,6 +112,7 @@ public:
String host_name; String host_name;
String database_shard_name; String database_shard_name;
String database_replica_name; String database_replica_name;
String database_replica_group_name;
UInt16 port{0}; UInt16 port{0};
String user; String user;
String password; String password;

View File

@ -927,7 +927,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path)) if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return; return;
String full_replica_name = query_.shard.empty() ? query_.replica String full_replica_name = query_.shard.empty() ? query_.replica
: DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica); : DatabaseReplicated::getFullReplicaName(query_.shard, query_.replica, query_.replica_group);
if (replicated->getFullReplicaName() != full_replica_name) if (replicated->getFullReplicaName() != full_replica_name)
return; return;
@ -943,7 +943,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get())) if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{ {
check_not_local_replica(replicated, query); check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica); DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
} }
else else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
@ -968,7 +968,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
} }
check_not_local_replica(replicated, query); check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica); DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.shard, query.replica, query.replica_group);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName())); LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
} }
} }
@ -981,7 +981,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get())) if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query); check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica); DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.shard, query.replica, query.replica_group);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path); LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
} }
else else

View File

@ -357,9 +357,9 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
size_t num = 0; size_t num = 0;
if (is_replicated_database) if (is_replicated_database)
{ {
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id); auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard); columns[num++]->insert(parts.shard);
columns[num++]->insert(replica); columns[num++]->insert(parts.replica);
if (active_hosts_set.contains(host_id)) if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS); columns[num++]->insert(IN_PROGRESS);
else else
@ -511,9 +511,9 @@ Chunk DDLQueryStatusSource::generate()
{ {
if (status.code != 0) if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message); throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id); auto parts = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard); columns[num++]->insert(parts.shard);
columns[num++]->insert(replica); columns[num++]->insert(parts.replica);
columns[num++]->insert(OK); columns[num++]->insert(OK);
} }
else else

View File

@ -107,6 +107,7 @@ public:
String replica; String replica;
String shard; String shard;
String replica_zk_path; String replica_zk_path;
String replica_group;
bool is_drop_whole_replica{}; bool is_drop_whole_replica{};
String storage_policy; String storage_policy;
String volume; String volume;

View File

@ -165,6 +165,14 @@ enum class SystemQueryTargetType
if (!ParserStringLiteral{}.parse(pos, ast, expected)) if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false; return false;
res->shard = ast->as<ASTLiteral &>().value.safeGet<String>(); res->shard = ast->as<ASTLiteral &>().value.safeGet<String>();
if (database && ParserKeyword{"FROM GROUP"}.ignore(pos, expected))
{
ASTPtr group_ast;
if (!ParserStringLiteral{}.parse(pos, group_ast, expected))
return false;
res->replica_group = group_ast->as<ASTLiteral &>().value.safeGet<String>();
}
} }
if (ParserKeyword{"FROM"}.ignore(pos, expected)) if (ParserKeyword{"FROM"}.ignore(pos, expected))

View File

@ -0,0 +1,3 @@
<clickhouse>
<replica_group_name>backups</replica_group_name>
</clickhouse>

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<allow_drop_detached>1</allow_drop_detached>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,129 @@
import re
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
main_node_1 = cluster.add_instance(
"main_node_1",
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 1},
)
main_node_2 = cluster.add_instance(
"main_node_2",
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 2},
)
backup_node_1 = cluster.add_instance(
"backup_node_1",
main_configs=["configs/backup_group.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 3},
)
backup_node_2 = cluster.add_instance(
"backup_node_2",
main_configs=["configs/backup_group.xml"],
user_configs=["configs/settings.xml"],
with_zookeeper=True,
stay_alive=True,
macros={"shard": 1, "replica": 4},
)
all_nodes = [
main_node_1,
main_node_2,
backup_node_1,
backup_node_2,
]
uuid_regex = re.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
def assert_create_query(nodes, table_name, expected):
replace_uuid = lambda x: re.sub(uuid_regex, "uuid", x)
query = "show create table {}".format(table_name)
for node in nodes:
assert_eq_with_retry(node, query, expected, get_result=replace_uuid)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_cluster_groups(started_cluster):
for node in all_nodes:
node.query(
f"CREATE DATABASE cluster_groups ENGINE = Replicated('/test/cluster_groups', '{node.macros['shard']}', '{node.macros['replica']}');"
)
# 1. system.clusters
cluster_query = "SELECT host_name from system.clusters WHERE cluster = 'cluster_groups' ORDER BY host_name"
expected_main = "main_node_1\nmain_node_2\n"
expected_backup = "backup_node_1\nbackup_node_2\n"
for node in [main_node_1, main_node_2]:
assert_eq_with_retry(node, cluster_query, expected_main)
for node in [backup_node_1, backup_node_2]:
assert_eq_with_retry(node, cluster_query, expected_backup)
# 2. Query execution depends only on your cluster group
backup_node_1.stop_clickhouse()
backup_node_2.stop_clickhouse()
# OK
main_node_1.query(
"CREATE TABLE cluster_groups.table_1 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
)
# Exception
main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,
)
)
# 3. After start both groups are synced
backup_node_1.start_clickhouse()
backup_node_2.start_clickhouse()
main_node_2.start_clickhouse()
expected_1 = "CREATE TABLE cluster_groups.table_1\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
expected_2 = "CREATE TABLE cluster_groups.table_2\\n(\\n `d` Date,\\n `k` UInt64\\n)\\nENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nPARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
assert_create_query(all_nodes, "cluster_groups.table_1", expected_1)
assert_create_query(all_nodes, "cluster_groups.table_2", expected_2)
# 4. SYSTEM DROP DATABASE REPLICA
backup_node_2.stop_clickhouse()
backup_node_1.query(
"SYSTEM DROP DATABASE REPLICA '4' FROM SHARD '1' FROM GROUP 'backups' FROM DATABASE cluster_groups"
)
assert_eq_with_retry(backup_node_1, cluster_query, "backup_node_1\n")
main_node_2.stop_clickhouse()
main_node_1.query("SYSTEM DROP DATABASE REPLICA '1|2' FROM DATABASE cluster_groups")
assert_eq_with_retry(main_node_1, cluster_query, "main_node_1\n")