add SYSTEM DROP REPLICA

This commit is contained in:
sundy-li 2020-05-17 20:44:22 +08:00 committed by amudong
parent 15ad830290
commit 0a4af8f0a7
8 changed files with 117 additions and 6 deletions

View File

@ -133,6 +133,7 @@ enum class AccessType
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP_REPLICATED_SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP_REPLICATION_QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \
M(SYSTEM_FLUSH_DISTRIBUTED, "FLUSH DISTRIBUTED", TABLE, SYSTEM_FLUSH) \

View File

@ -285,6 +285,9 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_DISTRIBUTED_SENDS:
startStopAction(ActionLocks::DistributedSend, true);
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
@ -400,6 +403,54 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
pool.wait();
}
void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
{
if (!table_id.empty())
{
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
storage_replicated->dropReplica(query.replica);
LOG_TRACE(log, "DROP REPLICA " + table_id.getNameForLogs() + " [" + query.replica + "]: OK");
}
else
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
else
{
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA);
auto zookeeper = context.getZooKeeper();
auto to_drop_path = query.replica_zk_path + "/replicas/" + query.replica;
// TODO check if local table have this this replica_path
//check if is active replica if we drop other replicas
if (zookeeper->exists(to_drop_path + "/is_active"))
{
throw Exception("Can't remove replica: " + query.replica + ", because it's active",
ErrorCodes::LOGICAL_ERROR);
}
/// It may left some garbage if to_drop_path subtree are concurently modified
zookeeper->tryRemoveRecursive(to_drop_path);
if (zookeeper->exists(to_drop_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, "
<< to_drop_path << " still exists and may contain some garbage.");
/// Check that `query.replica_zk_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (zookeeper->tryGetChildren(query.replica_zk_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
{
LOG_INFO(log, "Removing zookeeper path " << query.replica_zk_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(query.replica_zk_path);
if (zookeeper->exists(query.replica_zk_path))
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
<< query.replica_zk_path << " still exists and may contain some garbage.");
}
}
}
void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)
{
context.checkAccess(AccessType::SYSTEM_SYNC_REPLICA, table_id);
@ -530,6 +581,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_REPLICATION_QUEUES, query.database, query.table);
break;
}
case Type::DROP_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.database, query.table);
break;
}
case Type::SYNC_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_REPLICA, query.database, query.table);

View File

@ -51,6 +51,7 @@ private:
void restartReplicas(Context & system_context);
void syncReplica(ASTSystemQuery & query);
void dropReplica(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
AccessRightsElements getRequiredAccessForDDLOnCluster() const;

View File

@ -39,6 +39,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RESTART REPLICAS";
case Type::RESTART_REPLICA:
return "RESTART REPLICA";
case Type::DROP_REPLICA:
return "DROP REPLICA";
case Type::SYNC_REPLICA:
return "SYNC REPLICA";
case Type::FLUSH_DISTRIBUTED:
@ -116,6 +118,19 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
<< (settings.hilite ? hilite_none : "");
};
auto print_drop_replica = [&] {
settings.ostr << " " << (settings.hilite ? hilite_identifier : "")
<< quoteString(replica) << (settings.hilite ? hilite_none : "")
<< " FROM ";
if (!table.empty())
print_database_table();
else
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << quoteString(replica_zk_path)
<< (settings.hilite ? hilite_none : "");
}
};
if (!cluster.empty())
formatOnCluster(settings);
@ -143,6 +158,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
}
else if (type == Type::RELOAD_DICTIONARY)
print_database_dictionary();
else if (type == Type::DROP_REPLICA)
print_drop_replica();
}

View File

@ -30,6 +30,7 @@ public:
START_LISTEN_QUERIES,
RESTART_REPLICAS,
RESTART_REPLICA,
DROP_REPLICA,
SYNC_REPLICA,
RELOAD_DICTIONARY,
RELOAD_DICTIONARIES,
@ -61,6 +62,8 @@ public:
String target_dictionary;
String database;
String table;
String replica;
String replica_zk_path;
String getID(char) const override { return "SYSTEM query"; }

View File

@ -57,6 +57,27 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
break;
}
case Type::DROP_REPLICA:
{
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->replica = ast->as<ASTLiteral &>().value.safeGet<String>();
if (!ParserKeyword{"FROM"}.ignore(pos, expected))
return false;
// way 1. parse database and tables
// way 2. parse replica zk path
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table))
{
ASTPtr path_ast;
if (!ParserStringLiteral{}.parse(pos, path_ast, expected))
return false;
res->replica_zk_path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
}
break;
}
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table))

View File

@ -11,6 +11,10 @@
<host>node_1_2</host>
<port>9000</port>
</replica>
<replica>
<host>node_1_3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>

View File

@ -20,6 +20,7 @@ cluster = ClickHouseCluster(__file__)
node_1_1 = cluster.add_instance('node_1_1', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
node_1_2 = cluster.add_instance('node_1_2', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
node_1_3 = cluster.add_instance('node_1_3', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
@pytest.fixture(scope="module")
@ -43,17 +44,24 @@ def test_drop_replica(start_cluster):
zk = cluster.get_kazoo_client('zoo1')
assert "can't drop local replica" in node_1_1.query_and_get_error("ALTER TABLE test.test_table drop replica 'node_1_1'")
assert "can't drop local replica" in node_1_2.query_and_get_error("ALTER TABLE test.test_table drop replica 'node_1_2'")
assert "it's active" in node_1_1.query_and_get_error("ALTER TABLE test.test_table drop replica 'node_1_2'")
assert "can't drop local replica" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM test.test_table")
assert "can't drop local replica" in node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
assert "it's active" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
with PartitionManager() as pm:
node_1_2.kill_clickhouse()
pm.drop_instance_zk_connections(node_1_2)
## make node_1_2 dead
node_1_2.kill_clickhouse()
time.sleep(120)
node_1_1.query("ALTER TABLE test.test_table drop replica 'node_1_2'")
node_1_1.query("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
exists_replica_1_2 = zk.exists("/clickhouse/tables/test/{shard}/replicated/replicas/{replica}".format(shard=1, replica='node_1_2'))
assert (exists_replica_1_2 == None)
node_1_1.query("DROP TABLE test.test_table")
## make node_1_1 dead
node_1_1.kill_clickhouse()
time.sleep(120)
node_1_3.query("SYSTEM DROP REPLICA 'node_1_1' FROM '/clickhouse/tables/test/{shard}/replicated'".format(shard=1))
exists_base_path = zk.exists("/clickhouse/tables/test/{shard}/replicated".format(shard=1))
assert(exists_base_path == None)