ALTER TABLE ... DROP|DETACH PART for ReplicatedMergeTree

This commit is contained in:
Nicolae Vartolomei 2020-09-04 16:48:51 +01:00
parent d8ddb011b4
commit 6060a1ab57
9 changed files with 185 additions and 17 deletions

View File

@ -166,7 +166,8 @@ void ASTAlterCommand::formatImpl(
}
else if (type == ASTAlterCommand::DROP_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << (detach ? "DETACH" : "DROP") << " PARTITION "
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< (detach ? "DETACH" : "DROP") << (part ? " PART " : " PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}

View File

@ -51,13 +51,15 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_modify("MODIFY");
ParserKeyword s_attach_partition("ATTACH PARTITION");
ParserKeyword s_attach_part("ATTACH PART");
ParserKeyword s_detach_partition("DETACH PARTITION");
ParserKeyword s_detach_part("DETACH PART");
ParserKeyword s_drop_partition("DROP PARTITION");
ParserKeyword s_drop_part("DROP PART");
ParserKeyword s_move_partition("MOVE PARTITION");
ParserKeyword s_move_part("MOVE PART");
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_attach_part("ATTACH PART");
ParserKeyword s_move_part("MOVE PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_replace_partition("REPLACE PARTITION");
ParserKeyword s_freeze("FREEZE");
@ -149,6 +151,14 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::DROP_PARTITION;
}
else if (s_drop_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
command->type = ASTAlterCommand::DROP_PARTITION;
command->part = true;
}
else if (s_drop_detached_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
@ -342,6 +352,15 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::DROP_PARTITION;
command->detach = true;
}
else if (s_detach_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
command->type = ASTAlterCommand::DROP_PARTITION;
command->part = true;
command->detach = true;
}
else if (s_attach_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))

View File

@ -21,6 +21,7 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.type = DROP_PARTITION;
res.partition = command_ast->partition;
res.detach = command_ast->detach;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::DROP_DETACHED_PARTITION)

View File

@ -3894,8 +3894,13 @@ Pipe StorageReplicatedMergeTree::alterPartition(
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
checkPartitionCanBeDropped(command.partition);
dropPartition(query, command.partition, command.detach, query_context);
if (command.part)
{
/// TODO(nv) what to check here? it is possible to drop a big partition by dropping small parts...
}
else
checkPartitionCanBeDropped(command.partition);
dropPartition(query, command.partition, command.detach, command.part, query_context);
break;
case PartitionCommand::DROP_DETACHED_PARTITION:
@ -4017,18 +4022,30 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr &, const ASTPtr & partition, bool detach, const Context & query_context)
void StorageReplicatedMergeTree::dropPartition(
const ASTPtr &, const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context)
{
assertNotReadonly();
if (!is_leader)
throw Exception("DROP PARTITION cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
throw Exception("DROP PART|PARTITION cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
String partition_id = getPartitionIDFromQuery(partition, query_context);
LogEntry entry;
if (dropPartsInPartition(*zookeeper, partition_id, entry, detach))
bool did_drop;
if (drop_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
did_drop = dropPart(zookeeper, part_name, entry, detach);
}
else
{
String partition_id = getPartitionIDFromQuery(partition, query_context);
did_drop = dropAllPartsInPartition(*zookeeper, partition_id, entry, detach);
}
if (did_drop)
{
/// If necessary, wait until the operation is performed on itself or on all replicas.
if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
@ -4041,7 +4058,12 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr &, const ASTPtr & pa
}
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
cleanLastPartNode(partition_id);
/// TODO(nv) how is this related to dropPart? Is it?
if (!drop_part)
{
String partition_id = getPartitionIDFromQuery(partition, query_context);
cleanLastPartNode(partition_id);
}
}
@ -4062,7 +4084,7 @@ void StorageReplicatedMergeTree::truncate(
{
LogEntry entry;
if (dropPartsInPartition(*zookeeper, partition_id, entry, false))
if (dropAllPartsInPartition(*zookeeper, partition_id, entry, false))
waitForAllReplicasToProcessLogEntry(entry);
}
}
@ -5665,9 +5687,65 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
return true;
}
bool StorageReplicatedMergeTree::dropPart(
zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach)
{
LOG_TRACE(log, "Will try to insert a log entry to DROP_RANGE for part: " + part_name);
bool StorageReplicatedMergeTree::dropPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, StorageReplicatedMergeTree::LogEntry & entry, bool detach)
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
while (true)
{
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
/// TODO(nv) It is possible that part does not exist on replica which executes this statement.
/// Also, it possible for the part to not exist on any replicas, replica which created log entries for the part disappeared.
auto part = data_parts_by_info.find(part_info);
if (part == data_parts_by_info.end())
throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NOT_IMPLEMENTED);
/// TODO(nv) get ops and commit together w/ log entry?
clearBlocksInPartition(*zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block);
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge.
if (partIsAssignedToBackgroundOperation(*part))
throw Exception("Part " + part_name
+ " is currently participating in a background operation (mutation/merge)"
+ ", try again later.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
/// If `part_name` is result of a recent merge and source parts are still available then
/// DROP_RANGE with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = part_name;
entry.detach = detach;
entry.create_time = time(nullptr);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion())); /// Make sure no new events were added to the log.
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version.
Coordination::Responses responses;
Coordination::Error rc = zookeeper->tryMulti(ops, responses);
if (rc == Coordination::Error::ZBADVERSION)
{
LOG_TRACE(log, "A new log entry appeared while trying to commit DROP RANGE. Retry.");
continue;
}
else
zkutil::KeeperMultiException::check(rc, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses[1]).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
return true;
}
}
bool StorageReplicatedMergeTree::dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach)
{
MergeTreePartInfo drop_range_info;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))

View File

@ -529,11 +529,12 @@ private:
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
bool dropPartsInPartition(zkutil::ZooKeeper & zookeeper, String & partition_id,
StorageReplicatedMergeTree::LogEntry & entry, bool detach);
bool dropPart(zkutil::ZooKeeperPtr & zookeeper, String partition_id, LogEntry & entry, bool detach);
bool dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach);
// Partition helpers
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
void dropPartition(const ASTPtr &, const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context);
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context);

View File

@ -0,0 +1,9 @@
0
1
2
0
2
all_1_1_0
0
1
2

View File

@ -0,0 +1,28 @@
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS attach_01451_r1;
DROP TABLE IF EXISTS attach_01451_r2;
CREATE TABLE attach_01451_r1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/attach', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE attach_01451_r2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/attach', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO attach_01451_r1 VALUES (0);
INSERT INTO attach_01451_r1 VALUES (1);
INSERT INTO attach_01451_r1 VALUES (2);
SELECT v FROM attach_01451_r1 ORDER BY v;
ALTER TABLE attach_01451_r2 DETACH PART 'all_1_1_0';
SELECT v FROM attach_01451_r1 ORDER BY v;
SELECT name FROM system.detached_parts WHERE table = 'attach_01451_r2';
ALTER TABLE attach_01451_r2 ATTACH PART 'all_1_1_0';
SELECT v FROM attach_01451_r1 ORDER BY v;
SELECT name FROM system.detached_parts WHERE table = 'attach_01451_r2';
DROP TABLE attach_01451_r1;
DROP TABLE attach_01451_r2;

View File

@ -0,0 +1,6 @@
0
1
2
0
2
all_0_2_1

View File

@ -0,0 +1,25 @@
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS attach_r1;
DROP TABLE IF EXISTS attach_r2;
CREATE TABLE attach_r1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01452/attach', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE attach_r2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01452/attach', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO attach_r1 VALUES (0);
INSERT INTO attach_r1 VALUES (1);
INSERT INTO attach_r1 VALUES (2);
SELECT v FROM attach_r1 ORDER BY v;
ALTER TABLE attach_r2 DROP PART 'all_1_1_0';
SELECT v FROM attach_r1 ORDER BY v;
ALTER TABLE attach_r1 MODIFY SETTING max_replicated_merges_in_queue = 1;
OPTIMIZE TABLE attach_r1 FINAL;
SELECT name FROM system.parts WHERE table = 'attach_r1' AND active;
DROP TABLE attach_r1;
DROP TABLE attach_r2;