add drop relica database and the whole replica

fix removeReplicaByZKPath

fix bug: add zkpath empty judge

fix: rewrite code

delete useless code.

fix:ast fromat

fix bug

add test_drop_replica

add drop_replica doc

add drop databse checkAccess

refactor dropReplica

update tests

add static method StorageReplicatedMergeTree::dropReplicaByZkPath

update doc and delete useless code

fix conflict

fix doc

fix doc

fix StorageReplicatedMergeTree::dropReplica

fix bug

delete useless code
This commit is contained in:
amudong 2020-06-05 15:03:51 +08:00
parent 2e6a3eff8e
commit 7723dc4935
9 changed files with 308 additions and 272 deletions

View File

@ -12,7 +12,7 @@ toc_title: SYSTEM
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [DROP REPLICA TABLE](#query_language-system-drop-replica-table)
- [DROP REPLICA](#query_language-system-drop-replica)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
@ -68,6 +68,26 @@ For more convenient (automatic) cache management, see disable\_internal\_dns\_ca
Resets the mark cache. Used in development of ClickHouse and performance tests.
## DROP REPLICA {#query_language-system-drop-replica}
Replicas can be dropped using following syntax:
```sql
SYSTEM DROP REPLICA 'replica_name';
SYSTEM DROP REPLICA 'replica_name' FROM DATABASE database;
SYSTEM DROP REPLICA 'replica_name' FROM TABLE database.table;
```
Queries will remove the replica path in zookeeper, it's useful when you want to decrease your replica factor. It will only drop the inactive/stale replica, and it can't drop local replica, please use `SYSTEM DROP REPLICA` for that.
If you want to drop a inactive/stale replicate table that does not have a local replica, you can following syntax:
```sql
SYSTEM DROP REPLICA 'replica_name' FROM ZKPATH '/path/to/table/in/zk';
```
It's useful to remove metadata of dead replica from ZooKeeper. The right way to decrease replication factor is `DROP TABLE`.
## DROP UNCOMPRESSED CACHE {#query_language-system-drop-uncompressed-cache}
Reset the uncompressed data cache. Used in development of ClickHouse and performance tests.
@ -78,17 +98,6 @@ For manage uncompressed data cache parameters use following server level setting
Reset the compiled expression cache. Used in development of ClickHouse and performance tests.
Complied expression cache used when query/user/profile enable option [compile](../../operations/settings/settings.md#compile)
## DROP REPLICA TABLE {query_language-system-drop-replica-table}
Replicas can be dropped using following syntax:
```sql
SYSTEM DROP REPLICA replica_name FROM [db].name;
SYSTEM DROP REPLICA replica_name '/path/to/table/in/zk';
```
Queries will remove the replica path in zookeeper, it's useful when you want to decrease your replica factor. It will only drop the inactive/stale replica, and it can't drop local replica, please use `SYSTEM DROP REPLICA` for that.
## FLUSH LOGS {#query_language-system-flush_logs}
Flushes buffers of log messages to system tables (e.g. system.query\_log). Allows you to not wait 7.5 seconds when debugging.

View File

@ -185,7 +185,7 @@ BlockIO InterpreterSystemQuery::execute()
/// Make canonical query for simpler processing
if (!query.table.empty())
table_id = context.resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary);
table_id = context.resolveStorageID(StorageID(query.database, query.table), Context::ResolveOrdinary);
if (!query.target_dictionary.empty() && !query.database.empty())
query.target_dictionary = query.database + "." + query.target_dictionary;
@ -408,65 +408,57 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
if (!table_id.empty())
{
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
storage_replicated->dropReplica(query.replica);
storage_replicated->dropReplica(query.replica, false);
LOG_TRACE(log, "DROP REPLICA " + table_id.getNameForLogs() + " [" + query.replica + "]: OK");
}
else
throw Exception("Table " + table_id.getNameForLogs() + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
}
else
else if (!query.database.empty())
{
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA);
auto to_drop_path = query.replica_zk_path + "/replicas/" + query.replica;
auto & catalog = DatabaseCatalog::instance();
StorageReplicatedMergeTree::Status status;
for (auto & elem : catalog.getDatabases())
DatabasePtr database = DatabaseCatalog::instance().tryGetDatabase(query.database);
if (database.get() != NULL)
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(iterator->table().get()))
{
storage_replicated->getStatus(status);
if (to_drop_path.compare(status.replica_path) == 0)
throw Exception("We can't drop local replica, please use `DROP TABLE` if you want to clean the data and drop this replica",
ErrorCodes::LOGICAL_ERROR);
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA, iterator->table()->getStorageID());
storage_replicated->dropReplica(query.replica, false);
}
}
LOG_TRACE(log, "DROP REPLICA " + query.replica + " DATABSE " + database->getDatabaseName() + ": OK");
}
else
throw Exception("DATABSE " + query.database + " doesn't exist", ErrorCodes::BAD_ARGUMENTS);
}
else if (!query.replica_zk_path.empty())
{
StorageReplicatedMergeTree::dropReplicaByZkPath(context, query.replica_zk_path, query.replica);
LOG_INFO(log, "Removing replica {}", query.replica_zk_path + "/replicas/" + query.replica);
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto zookeeper = context.getZooKeeper();
// TODO check if local table have this this replica_path
//check if is active replica if we drop other replicas
if (zookeeper->exists(to_drop_path + "/is_active"))
for (auto & elem : databases)
{
throw Exception("Can't remove replica: " + query.replica + ", because it's active",
ErrorCodes::LOGICAL_ERROR);
}
/// It may left some garbage if to_drop_path subtree are concurently modified
zookeeper->tryRemoveRecursive(to_drop_path);
if (zookeeper->exists(to_drop_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, "
<< to_drop_path << " still exists and may contain some garbage.");
/// Check that `query.replica_zk_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (zookeeper->tryGetChildren(query.replica_zk_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
{
LOG_INFO(log, "Removing zookeeper path " << query.replica_zk_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(query.replica_zk_path);
if (zookeeper->exists(query.replica_zk_path))
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
<< query.replica_zk_path << " still exists and may contain some garbage.");
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(iterator->table().get()))
{
context.checkAccess(AccessType::SYSTEM_DROP_REPLICA, iterator->table()->getStorageID());
storage_replicated->dropReplica(query.replica, false);
}
}
LOG_TRACE(log, "DROP REPLICA " + query.replica + " DATABSE " + database->getDatabaseName() + ": OK");
}
}
}
void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)

View File

@ -119,16 +119,23 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
};
auto print_drop_replica = [&] {
settings.ostr << " " << (settings.hilite ? hilite_identifier : "")
<< quoteString(replica) << (settings.hilite ? hilite_none : "")
<< " FROM ";
settings.ostr << " " << quoteString(replica) << (settings.hilite ? hilite_none : "");
if (!table.empty())
print_database_table();
else
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << quoteString(replica_zk_path)
settings.ostr << " FROM TABLE";
print_database_table();
}
else if (!replica_zk_path.empty())
{
settings.ostr << " FROM ZKPATH " << (settings.hilite ? hilite_identifier : "") << quoteString(replica_zk_path)
<< (settings.hilite ? hilite_none : "");
}
else if (!database.empty())
{
settings.ostr << " FROM DATABASE ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database)
<< (settings.hilite ? hilite_none : "");
}
};
if (!cluster.empty())

View File

@ -64,6 +64,7 @@ public:
String table;
String replica;
String replica_zk_path;
bool is_drop_whole_replica;
String getID(char) const override { return "SYSTEM query"; }

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/parseDatabaseAndTableName.h>
@ -63,20 +64,38 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->replica = ast->as<ASTLiteral &>().value.safeGet<String>();
if (!ParserKeyword{"FROM"}.ignore(pos, expected))
return false;
// way 1. parse database and tables
// way 2. parse replica zk path
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table))
if (ParserKeyword{"FROM"}.ignore(pos, expected))
{
ASTPtr path_ast;
if (!ParserStringLiteral{}.parse(pos, path_ast, expected))
// way 1. parse replica database
// way 2. parse replica tables
// way 3. parse replica zkpath
if (ParserKeyword{"DATABASE"}.ignore(pos, expected))
{
ParserIdentifier database_parser;
ASTPtr database;
if (!database_parser.parse(pos, database, expected))
return false;
tryGetIdentifierNameInto(database, res->database);
}
else if (ParserKeyword{"TABLE"}.ignore(pos, expected))
{
parseDatabaseAndTableName(pos, expected, res->database, res->table);
}
else if (ParserKeyword{"ZKPATH"}.ignore(pos, expected))
{
ASTPtr path_ast;
if (!ParserStringLiteral{}.parse(pos, path_ast, expected))
return false;
String zk_path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
if (!zk_path.empty() && zk_path[zk_path.size() - 1] == '/')
zk_path.pop_back();
res->replica_zk_path = zk_path;
}
else
return false;
String zk_path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
if (zk_path[zk_path.size()-1] == '/')
zk_path.pop_back();
res->replica_zk_path = zk_path;
}
else
res->is_drop_whole_replica = true;
break;
}

View File

@ -620,110 +620,20 @@ void StorageReplicatedMergeTree::createReplica()
void StorageReplicatedMergeTree::drop()
{
/// There is also the case when user has configured ClickHouse to wrong ZooKeeper cluster,
/// in this case, has_metadata_in_zookeeper = false, and we also permit to drop the table.
if (has_metadata_in_zookeeper)
{
auto zookeeper = tryGetZooKeeper();
/// If probably there is metadata in ZooKeeper, we don't allow to drop the table.
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica {}", replica_path);
replica_is_active_node = nullptr;
/// It may left some garbage if replica_path subtree are concurently modified
zookeeper->tryRemoveRecursive(replica_path);
if (zookeeper->exists(replica_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", replica_path);
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (Coordination::Error::ZOK == zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) && replicas.empty())
{
LOG_INFO(log, "{} is the last replica, will remove table", replica_path);
/** At this moment, another replica can be created and we cannot remove the table.
* Try to remove /replicas node first. If we successfully removed it,
* it guarantees that we are the only replica that proceed to remove the table
* and no new replicas can be created after that moment (it requires the existence of /replicas node).
* and table cannot be recreated with new /replicas node on another servers while we are removing data,
* because table creation is executed in single transaction that will conflict with remaining nodes.
*/
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/replicas", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent));
Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_WARNING(log, "Table {} is already started to be removing by another replica right now", replica_path);
}
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_WARNING(log, "Another replica was suddenly created, will keep the table {}", replica_path);
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, ops, responses);
}
else
{
LOG_INFO(log, "Removing table {} (this might take several minutes)", zookeeper_path);
Strings children;
code = zookeeper->tryGetChildren(zookeeper_path, children);
if (code == Coordination::Error::ZNONODE)
{
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
}
else
{
for (const auto & child : children)
if (child != "dropped")
zookeeper->tryRemoveRecursive(zookeeper_path + "/" + child);
ops.clear();
responses.clear();
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/dropped", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE)
{
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
}
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.",
zookeeper_path);
}
else if (code != Coordination::Error::ZOK)
{
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
zkutil::KeeperMultiException::check(code, ops, responses);
}
else
{
LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
}
}
}
}
dropReplica(replica_name, true);
}
dropAllData();
}
/** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/ metadata).
* If not, throw an exception.
*/
@ -836,81 +746,146 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
return res;
}
void StorageReplicatedMergeTree::createReplica()
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Creating replica " << replica_path);
int32_t code;
do
{
Coordination::Stat replicas_stat;
String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
/// If it is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
String is_lost_value = last_added_replica.empty() ? "0" : "1";
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this).toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
else if (code == Coordination::Error::ZBADVERSION)
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
else
zkutil::KeeperMultiException::check(code, ops, responses);
} while (code == Coordination::Error::ZBADVERSION);
}
void StorageReplicatedMergeTree::removeReplica(const String & replica)
void StorageReplicatedMergeTree::dropReplica(const String & replica, bool is_drop_table)
{
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
auto to_drop_path = zookeeper_path + "/replicas/" + replica;
//check if is active replica if we drop other replicas
if (replica != replica_name && zookeeper->exists(to_drop_path + "/is_active"))
if (!is_drop_table)
{
throw Exception("Can't remove replica: " + replica + ", because it's active",
if (replica == replica_name)
throw Exception("We can't drop local replica, please use `DROP TABLE` if you want to clean the data and drop this replica", ErrorCodes::LOGICAL_ERROR);
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
throw Exception("Can't drop replica: " + replica + ", because it's active",
ErrorCodes::LOGICAL_ERROR);
}
LOG_INFO(log, "Removing replica " << to_drop_path);
/// It may left some garbage if to_drop_path subtree are concurently modified
zookeeper->tryRemoveRecursive(to_drop_path);
if (zookeeper->exists(to_drop_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, "
<< to_drop_path << " still exists and may contain some garbage.");
auto remote_replica_path = zookeeper_path + "/replicas" + "/" + replica;
LOG_INFO(log, "Removing replica {}", remote_replica_path);
/// It may left some garbage if replica_path subtree are concurently modified
zookeeper->tryRemoveRecursive(remote_replica_path);
if (zookeeper->exists(remote_replica_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", remote_replica_path);
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
if (Coordination::Error::ZOK == zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
if (zookeeper->exists(zookeeper_path))
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
<< zookeeper_path << " still exists and may contain some garbage.");
LOG_INFO(log, "{} is the last replica, will remove table", remote_replica_path);
/** At this moment, another replica can be created and we cannot remove the table.
* Try to remove /replicas node first. If we successfully removed it,
* it guarantees that we are the only replica that proceed to remove the table
* and no new replicas can be created after that moment (it requires the existence of /replicas node).
* and table cannot be recreated with new /replicas node on another servers while we are removing data,
* because table creation is executed in single transaction that will conflict with remaining nodes.
*/
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/replicas", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent));
Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_WARNING(log, "Table {} is already started to be removing by another replica right now", remote_replica_path);
}
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_WARNING(log, "Another replica was suddenly created, will keep the table {}", remote_replica_path);
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, ops, responses);
}
else
{
LOG_INFO(log, "Removing table {} (this might take several minutes)", zookeeper_path);
Strings children;
code = zookeeper->tryGetChildren(zookeeper_path, children);
if (code == Coordination::Error::ZNONODE)
{
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", remote_replica_path);
}
else
{
for (const auto & child : children)
if (child != "dropped")
zookeeper->tryRemoveRecursive(zookeeper_path + "/" + child);
ops.clear();
responses.clear();
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/dropped", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE)
{
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", remote_replica_path);
}
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.",
zookeeper_path);
}
else if (code != Coordination::Error::ZOK)
{
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
zkutil::KeeperMultiException::check(code, ops, responses);
}
else
{
LOG_INFO(log, "Table {} was successfully removed from ZooKeeper", zookeeper_path);
}
}
}
}
}
void StorageReplicatedMergeTree::dropReplicaByZkPath(Context & context, const String & replica_zk_path, const String & replica)
{
auto remote_replica_path = replica_zk_path + "/replicas/" + replica;
auto & catalog = DatabaseCatalog::instance();
StorageReplicatedMergeTree::Status status;
for (auto & elem : catalog.getDatabases())
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(iterator->table().get()))
{
storage_replicated->getStatus(status);
if (status.replica_path.compare(remote_replica_path) == 0)
throw Exception("We can't drop local replica, please use `DROP TABLE` if you want to clean the data and drop this replica",
ErrorCodes::LOGICAL_ERROR);
if (status.replica_path.compare(replica_zk_path + "/replicas/" + status.replica_name) == 0)
{
storage_replicated->dropReplica(replica, false);
return;
}
}
}
}
/// It may left some garbage if replica_path subtree are concurently modified
auto zookeeper = context.getZooKeeper();
//check if is active replica if we drop other replicas
if (zookeeper->exists(remote_replica_path + "/is_active"))
throw Exception("Can't remove replica: " + replica + ", because it's active",
ErrorCodes::LOGICAL_ERROR);
zookeeper->tryRemoveRecursive(remote_replica_path);
}
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
auto zookeeper = getZooKeeper();
@ -4112,32 +4087,6 @@ void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & parti
global_context.checkPartitionCanBeDropped(table_id.database_name, table_id.table_name, partition_size);
}
void StorageReplicatedMergeTree::drop()
{
{
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
}
shutdown();
replica_is_active_node = nullptr;
removeReplica(replica_name);
dropAllData();
}
void StorageReplicatedMergeTree::dropReplica(const String & replica)
{
if (replica_name == replica)
{
throw Exception("We can't drop local replica, please use `DROP TABLE` if you want to clean the data and drop this replica",
ErrorCodes::LOGICAL_ERROR);
}
// remove other replicas
removeReplica(replica);
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{
MergeTreeData::rename(new_path_to_table_data, new_table_id);

View File

@ -117,9 +117,9 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
/** Removes a specific replica from Zookeeper.
/** Remove a specific replica from zookeeper.
*/
void dropReplica(const String & replica_name);
void dropReplica(const String & replica, bool is_drop_table);
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
@ -184,6 +184,10 @@ public:
int getMetadataVersion() const { return metadata_version; }
/** Remove a specific replica from zookeeper by zkpath.
*/
static void dropReplicaByZkPath(Context & context, const String & replica_zk_path, const String & replica);
private:
/// Get a sequential consistent view of current parts.
@ -307,10 +311,6 @@ private:
*/
void createReplica();
/** Remove replica by name
*/
void removeReplica(const String & replica);
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/
void createNewZooKeeperNodes();

View File

@ -13,7 +13,40 @@ def fill_nodes(nodes, shard):
CREATE DATABASE test;
CREATE TABLE test.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
node.query(
'''
CREATE DATABASE test1;
CREATE TABLE test1.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test1/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
node.query(
'''
CREATE DATABASE test2;
CREATE TABLE test2.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test2/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
node.query(
'''
CREATE DATABASE test3;
CREATE TABLE test3.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test3/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
node.query(
'''
CREATE DATABASE test4;
CREATE TABLE test4.test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test4/{shard}/replicated/test_table', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__)
@ -28,7 +61,7 @@ def start_cluster():
try:
cluster.start()
fill_nodes([node_1_1, node_1_2, node_1_3], 1)
fill_nodes([node_1_1, node_1_2], 1)
yield cluster
@ -41,28 +74,53 @@ def start_cluster():
def test_drop_replica(start_cluster):
for i in range(100):
node_1_1.query("INSERT INTO test.test_table VALUES (1, {})".format(i))
node_1_1.query("INSERT INTO test1.test_table VALUES (1, {})".format(i))
node_1_1.query("INSERT INTO test2.test_table VALUES (1, {})".format(i))
node_1_1.query("INSERT INTO test3.test_table VALUES (1, {})".format(i))
node_1_1.query("INSERT INTO test4.test_table VALUES (1, {})".format(i))
zk = cluster.get_kazoo_client('zoo1')
assert "can't drop local replica" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM test.test_table")
assert "can't drop local replica" in node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
assert "can't drop local replica" in node_1_3.query_and_get_error("SYSTEM DROP REPLICA 'node_1_3' FROM '/clickhouse/tables/test/{shard}/replicated'".format(shard=1))
assert "it's active" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
assert "can't drop local replica" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1'")
assert "can't drop local replica" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM DATABASE test")
assert "can't drop local replica" in node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM TABLE test.test_table")
assert "can't drop local replica" in \
node_1_1.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM ZKPATH '/clickhouse/tables/test/{shard}/replicated/test_table'".format(shard=1))
assert "it's active" in node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1'")
assert "it's active" in node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM DATABASE test")
assert "it's active" in node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM TABLE test.test_table")
assert "it's active" in \
node_1_2.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM ZKPATH '/clickhouse/tables/test/{shard}/replicated/test_table'".format(shard=1))
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node_1_2)
## make node_1_2 dead
node_1_2.kill_clickhouse()
time.sleep(120)
node_1_1.query("SYSTEM DROP REPLICA 'node_1_2' FROM test.test_table")
exists_replica_1_2 = zk.exists("/clickhouse/tables/test/{shard}/replicated/replicas/{replica}".format(shard=1, replica='node_1_2'))
assert (exists_replica_1_2 == None)
## make node_1_1 dead
node_1_1.kill_clickhouse()
time.sleep(120)
pm.drop_instance_zk_connections(node_1_1)
time.sleep(10)
node_1_3.query("SYSTEM DROP REPLICA 'node_1_1' FROM '/clickhouse/tables/test/{shard}/replicated'".format(shard=1))
exists_base_path = zk.exists("/clickhouse/tables/test/{shard}/replicated".format(shard=1))
assert(exists_base_path == None)
assert "doesn't exist" in node_1_3.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM TABLE test.test_table")
assert "doesn't exist" in node_1_3.query_and_get_error("SYSTEM DROP REPLICA 'node_1_1' FROM DATABASE test1")
node_1_3.query("SYSTEM DROP REPLICA 'node_1_1'")
exists_replica_1_1 = zk.exists("/clickhouse/tables/test3/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 != None)
## If you want to drop a inactive/stale replicate table that does not have a local replica, you can following syntax(ZKPATH):
node_1_3.query("SYSTEM DROP REPLICA 'node_1_1' FROM ZKPATH '/clickhouse/tables/test2/{shard}/replicated/test_table'".format(shard=1))
exists_replica_1_1 = zk.exists("/clickhouse/tables/test2/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 == None)
node_1_2.query("SYSTEM DROP REPLICA 'node_1_1' FROM TABLE test.test_table")
exists_replica_1_1 = zk.exists("/clickhouse/tables/test/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 == None)
node_1_2.query("SYSTEM DROP REPLICA 'node_1_1' FROM DATABASE test1")
exists_replica_1_1 = zk.exists("/clickhouse/tables/test1/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 == None)
node_1_2.query("SYSTEM DROP REPLICA 'node_1_1' FROM ZKPATH '/clickhouse/tables/test3/{shard}/replicated/test_table'".format(shard=1))
exists_replica_1_1 = zk.exists("/clickhouse/tables/test3/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 == None)
node_1_2.query("SYSTEM DROP REPLICA 'node_1_1'")
exists_replica_1_1 = zk.exists("/clickhouse/tables/test4/{shard}/replicated/test_table/replicas/{replica}".format(shard=1, replica='node_1_1'))
assert (exists_replica_1_1 == None)

View File

@ -89,6 +89,7 @@ SYSTEM DISTRIBUTED SENDS ['SYSTEM STOP DISTRIBUTED SENDS','SYSTEM START DISTRIBU
SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED SENDS','STOP_REPLICATED_SENDS','START REPLICATED SENDS'] TABLE SYSTEM SENDS
SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS'] \N SYSTEM
SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP_REPLICATION_QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
SYSTEM RESTART REPLICA ['RESTART REPLICA'] TABLE SYSTEM
SYSTEM FLUSH DISTRIBUTED ['FLUSH DISTRIBUTED'] TABLE SYSTEM FLUSH