Add drop replica alter support

This commit is contained in:
sundy-li 2020-05-05 22:44:46 +08:00 committed by amudong
parent d147cd646a
commit 906a43e4a8
9 changed files with 233 additions and 0 deletions

View File

@ -9,6 +9,8 @@
#include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/ReplicaCommands.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/LiveView/LiveViewCommands.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Access/AccessRightsElement.h>
@ -52,6 +54,7 @@ BlockIO InterpreterAlterQuery::execute()
AlterCommands alter_commands;
PartitionCommands partition_commands;
ReplicaCommands replica_commands;
MutationCommands mutation_commands;
LiveViewCommands live_view_commands;
for (ASTAlterCommand * command_ast : alter.command_list->commands)
@ -66,6 +69,10 @@ BlockIO InterpreterAlterQuery::execute()
"(see allow_drop_detached setting)", ErrorCodes::SUPPORT_IS_DISABLED);
partition_commands.emplace_back(std::move(*partition_command));
}
else if (auto replica_command = ReplicaCommand::parse(command_ast))
{
replica_commands.emplace_back(std::move(*replica_command));
}
else if (auto mut_command = MutationCommand::parse(command_ast))
{
if (mut_command->type == MutationCommand::MATERIALIZE_TTL && !table->hasAnyTTL())
@ -94,6 +101,18 @@ BlockIO InterpreterAlterQuery::execute()
table->alterPartition(query_ptr, partition_commands, context);
}
if (!replica_commands.empty())
{
replica_commands.validate(*table);
auto replicate_table = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(table);
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
for (auto & command : replica_commands)
{
replicate_table->dropReplica(table_lock_holder, command.replica_name);
}
}
if (!live_view_commands.empty())
{
live_view_commands.validate(*table);
@ -273,6 +292,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_FREEZE_PARTITION, database, table);
break;
}
case ASTAlterCommand::DROP_REPLICA:
{
required_access.emplace_back(AccessType::ALTER_DELETE, alter.database, alter.table);
break;
}
case ASTAlterCommand::MODIFY_QUERY:
{
required_access.emplace_back(AccessType::ALTER_VIEW_MODIFY_QUERY, database, table);

View File

@ -60,6 +60,10 @@ ASTPtr ASTAlterCommand::clone() const
{
res->rename_to = rename_to->clone();
res->children.push_back(res->rename_to);
if (replica)
{
res->replica = replica->clone();
res->children.push_back(res->replica);
}
return res;
@ -290,6 +294,7 @@ void ASTAlterCommand::formatImpl(
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "REFRESH " << (settings.hilite ? hilite_none : "");
}
<<<<<<< HEAD
else if (type == ASTAlterCommand::RENAME_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RENAME COLUMN " << (if_exists ? "IF EXISTS " : "")
@ -298,6 +303,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO ";
rename_to->formatImpl(settings, state, frame);
=======
else if (type == ASTAlterCommand::DROP_REPLICA)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DROP " << (settings.hilite ? hilite_none : "") << " " << std::quoted(replica_name, '\'');
>>>>>>> b3fa746... Add drop replica alter support
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);

View File

@ -58,6 +58,8 @@ public:
NO_TYPE,
LIVE_VIEW_REFRESH,
DROP_REPLICA,
};
Type type = NO_TYPE;
@ -102,6 +104,9 @@ public:
*/
ASTPtr partition;
// FOR DROP REPLICA queries
ASTPtr replica;
/// For DELETE/UPDATE WHERE: the predicate that filters the rows to delete/update.
ASTPtr predicate;
@ -159,6 +164,8 @@ public:
/// Target column name
ASTPtr rename_to;
/// DROP REPLICA
String replica_name;
String getID(char delim) const override { return "AlterCommand" + (delim + std::to_string(static_cast<int>(type))); }

View File

@ -43,6 +43,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_add_constraint("ADD CONSTRAINT");
ParserKeyword s_drop_constraint("DROP CONSTRAINT");
ParserKeyword s_drop_replica("DROP REPLICA");
ParserKeyword s_add("ADD");
ParserKeyword s_drop("DROP");
ParserKeyword s_suspend("SUSPEND");
@ -421,6 +423,14 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->with_name = ast_with_name->as<ASTLiteral &>().value.get<const String &>();
}
}
else if (s_drop_replica.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->replica, expected))
return false;
command->type = ASTAlterCommand::DROP_REPLICA;
command->replica_name = command->replica->as<ASTLiteral &>().value.get<const String &>();
}
else if (s_modify_column.ignore(pos, expected))
{
if (s_if_exists.ignore(pos, expected))
@ -510,6 +520,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->children.push_back(command->column);
if (command->partition)
command->children.push_back(command->partition);
if (command->replica)
command->children.push_back(command->replica);
if (command->order_by)
command->children.push_back(command->order_by);
if (command->predicate)

View File

@ -17,6 +17,7 @@ namespace DB
* [MODIFY SETTING setting_name=setting_value, ...]
* [COMMENT COLUMN [IF EXISTS] col_name string]
* [DROP|DETACH|ATTACH PARTITION|PART partition, ...]
* [DROP REPLICA replica, ...]
* [FETCH PARTITION partition FROM ...]
* [FREEZE [PARTITION] [WITH NAME name]]
* [DELETE WHERE ...]

View File

@ -0,0 +1,41 @@
#include <Parsers/ASTAlterQuery.h>
#include <Storages/ReplicaCommands.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_STORAGE;
}
std::optional<ReplicaCommand> ReplicaCommand::parse(const ASTAlterCommand * command_ast)
{
if (command_ast->type == ASTAlterCommand::DROP_REPLICA)
{
ReplicaCommand res;
res.type = DROP_REPLICA;
res.replica = command_ast->replica;
res.replica_name = command_ast->replica_name;
return res;
}
else
return {};
}
void ReplicaCommands::validate(const IStorage & table)
{
for (const ReplicaCommand & command : *this)
{
if (command.type == ReplicaCommand::DROP_REPLICA)
{
if (!empty() && !dynamic_cast<const StorageReplicatedMergeTree *>(&table))
throw Exception("Wrong storage type. Must be StorageReplicateMergeTree", DB::ErrorCodes::UNKNOWN_STORAGE);
}
}
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Core/Field.h>
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage_fwd.h>
#include <optional>
#include <vector>
namespace DB
{
class ASTAlterCommand;
struct ReplicaCommand
{
enum Type
{
DROP_REPLICA,
};
Type type;
ASTPtr replica;
String replica_name;
static std::optional<ReplicaCommand> parse(const ASTAlterCommand * command);
};
class ReplicaCommands : public std::vector<ReplicaCommand>
{
public:
void validate(const IStorage & table);
};
}

View File

@ -836,6 +836,72 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
return res;
}
void StorageReplicatedMergeTree::createReplica()
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Creating replica " << replica_path);
int32_t code;
do
{
Coordination::Stat replicas_stat;
String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
/// If it is not the first replica, we will mark it as "lost", to immediately repair (clone) from existing replica.
String is_lost_value = last_added_replica.empty() ? "0" : "1";
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this).toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
else if (code == Coordination::Error::ZBADVERSION)
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
else
zkutil::KeeperMultiException::check(code, ops, responses);
} while (code == Coordination::Error::ZBADVERSION);
}
void StorageReplicatedMergeTree::removeReplica(const String & replica)
{
auto zookeeper = tryGetZooKeeper();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
auto to_drop_path = zookeeper_path + "/replicas/" + replica;
LOG_INFO(log, "Removing replica " << to_drop_path);
/// It may left some garbage if to_drop_path subtree are concurently modified
zookeeper->tryRemoveRecursive(to_drop_path);
if (zookeeper->exists(to_drop_path))
LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, "
<< to_drop_path << " still exists and may contain some garbage.");
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas;
if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == Coordination::ZOK && replicas.empty())
{
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->tryRemoveRecursive(zookeeper_path);
if (zookeeper->exists(zookeeper_path))
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, "
<< zookeeper_path << " still exists and may contain some garbage.");
}
}
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
@ -4038,6 +4104,30 @@ void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & parti
global_context.checkPartitionCanBeDropped(table_id.database_name, table_id.table_name, partition_size);
}
void StorageReplicatedMergeTree::drop()
{
{
auto zookeeper = tryGetZooKeeper();
if (is_readonly || !zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
}
shutdown();
replica_is_active_node = nullptr;
removeReplica(replica_name);
dropAllData();
}
void StorageReplicatedMergeTree::dropReplica(TableStructureWriteLockHolder & holder, const String & replica)
{
if (replica_name == replica)
{
drop(holder);
return;
}
removeReplica(replica);
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{

View File

@ -115,6 +115,10 @@ public:
*/
void drop() override;
/** Removes a specific replica from Zookeeper. If replica is local, it works same as `drop` method.
*/
void dropReplica(TableStructureWriteLockHolder &, const String & replica_name);
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
@ -303,6 +307,11 @@ private:
*/
void createReplica();
/** Remove replica by replica_name
*/
void removeReplica(const String & replica);
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/
void createNewZooKeeperNodes();