better drop of empty parts

This commit is contained in:
Anton Popov 2021-04-20 05:31:08 +03:00
parent 66913c5865
commit 42a0416a2b
6 changed files with 81 additions and 58 deletions

View File

@ -1305,18 +1305,11 @@ void MergeTreeData::clearEmptyParts()
if (!getSettings()->remove_empty_parts)
return;
/// There is no need to wait for drop and hold thread in background pool.
auto context_for_drop = Context::createCopy(getContext());
context_for_drop->setSetting("replication_alter_partitions_sync", Field(0));
auto parts = getDataPartsVector();
for (const auto & part : parts)
{
if (part->rows_count == 0)
{
ASTPtr literal = std::make_shared<ASTLiteral>(part->name);
/// If another replica has already started drop, it's ok, no need to throw.
dropPartition(literal, /* detach = */ false, /*drop_part = */ true, context_for_drop, /* throw_if_noop = */ false);
}
dropPart(part->name);
}
}

View File

@ -964,7 +964,9 @@ protected:
// Partition helpers
bool canReplacePartition(const DataPartPtr & src_part) const;
virtual void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr context, bool throw_if_noop = true) = 0;
/// Tries to drop part in background without any waits or throwing exceptions in case of errors.
virtual void dropPart(const String & name) = 0;
virtual void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr context) = 0;
virtual PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr context) = 0;
virtual void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) = 0;
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;

View File

@ -1213,55 +1213,69 @@ MergeTreeDataPartPtr StorageMergeTree::outdatePart(const String & part_name, boo
}
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr local_context, bool throw_if_noop)
void StorageMergeTree::dropPart(const String & name)
{
auto part = outdatePart(name, false);
/// Nothing to do, part was removed in some different way
if (!part)
return;
dropPartsImpl({part}, false);
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr local_context)
{
DataPartsVector parts_to_remove;
if (drop_part)
{
MergeTreeData::DataPartsVector parts_to_remove;
auto metadata_snapshot = getInMemoryMetadataPtr();
auto part = outdatePart(partition->as<ASTLiteral &>().value.safeGet<String>(), true);
/// Nothing to do, part was removed in some different way
if (!part)
return;
if (drop_part)
{
auto part = outdatePart(partition->as<ASTLiteral &>().value.safeGet<String>(), throw_if_noop);
/// Nothing to do, part was removed in some different way
if (!part)
return;
parts_to_remove.push_back(part);
}
else
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
String partition_id = getPartitionIDFromQuery(partition, local_context);
parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
/// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(parts_to_remove, true);
}
if (detach)
{
/// If DETACH clone parts to detached/ directory
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
part->makeCloneInDetached("", metadata_snapshot);
}
}
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part->info);
}
if (detach)
LOG_INFO(log, "Detached {} parts.", parts_to_remove.size());
else
LOG_INFO(log, "Removed {} parts.", parts_to_remove.size());
parts_to_remove.push_back(part);
}
else
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
String partition_id = getPartitionIDFromQuery(partition, local_context);
parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
/// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(parts_to_remove, true);
}
dropPartsImpl(parts_to_remove, detach);
}
void StorageMergeTree::dropPartsImpl(const DataPartsVector & parts_to_remove, bool detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (detach)
{
/// If DETACH clone parts to detached/ directory
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
part->makeCloneInDetached("", metadata_snapshot);
}
}
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part->info);
}
if (detach)
LOG_INFO(log, "Detached {} parts.", parts_to_remove.size());
else
LOG_INFO(log, "Removed {} parts.", parts_to_remove.size());
clearOldPartsFromFilesystem();
}

View File

@ -211,7 +211,9 @@ private:
void clearOldMutations(bool truncate = false);
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr context, bool throw_if_noop) override;
void dropPart(const String & name) override;
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr context) override;
void dropPartsImpl(const DataPartsVector & parts_to_remove, bool detach);
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override;

View File

@ -4777,8 +4777,19 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
return true;
}
void StorageReplicatedMergeTree::dropPart(const String & name)
{
assertNotReadonly();
if (!is_leader)
throw Exception("DROP PART cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr query_context, bool throw_if_noop)
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
LogEntry entry;
dropPart(zookeeper, name, entry, false, false);
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr query_context)
{
assertNotReadonly();
if (!is_leader)
@ -4792,7 +4803,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
if (drop_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
did_drop = dropPart(zookeeper, part_name, entry, detach, throw_if_noop);
did_drop = dropPart(zookeeper, part_name, entry, detach, true);
}
else
{

View File

@ -620,12 +620,13 @@ private:
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
void dropPart(const String & name) override;
bool dropPart(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
bool dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, ContextPtr query_context, bool detach);
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr query_context, bool throw_if_noop) override;
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, ContextPtr query_context) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;