better code near dropPartition

This commit is contained in:
Anton Popov 2021-05-25 20:25:00 +03:00
parent e27acc26be
commit 7d1431f6b6
3 changed files with 31 additions and 31 deletions

View File

@ -1242,30 +1242,33 @@ MergeTreeDataPartPtr StorageMergeTree::outdatePart(const String & part_name, boo
void StorageMergeTree::dropPartNoWaitNoThrow(const String & part_name)
{
if (auto part = outdatePart(part_name, false))
dropPartsImpl({part}, false);
if (auto part = outdatePart(part_name, /*force=*/ false))
dropPartsImpl({part}, /*detach=*/ false);
/// Else nothing to do, part was removed in some different way
}
void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPtr /*query_context*/)
{
if (auto part = outdatePart(part_name, true))
if (auto part = outdatePart(part_name, /*force=*/ true))
dropPartsImpl({part}, detach);
/// Else nothing to do, part was removed in some different way
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr local_context)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
String partition_id = getPartitionIDFromQuery(partition, local_context);
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
DataPartsVector parts_to_remove;
/// New scope controls lifetime of merge_blocker.
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
String partition_id = getPartitionIDFromQuery(partition, local_context);
parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
/// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(parts_to_remove, true);
}
/// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(parts_to_remove, true);
dropPartsImpl(std::move(parts_to_remove), detach);
}

View File

@ -4936,7 +4936,7 @@ void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
LogEntry entry;
dropPartImpl(zookeeper, part_name, entry, false, false);
dropPartImpl(zookeeper, part_name, entry, /*detach=*/ false, /*throw_if_noop=*/ false);
}
void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach, ContextPtr query_context)
@ -4948,16 +4948,13 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach,
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
LogEntry entry;
bool did_drop = dropPartImpl(zookeeper, part_name, entry, detach, true);
dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true);
if (did_drop)
{
/// If necessary, wait until the operation is performed on itself or on all replicas.
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry);
}
/// If necessary, wait until the operation is performed on itself or on all replicas.
if (query_context->getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry);
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
@ -4979,9 +4976,9 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
waitForReplicaToProcessLogEntry(replica_name, entry);
else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2)
waitForAllReplicasToProcessLogEntry(entry);
}
cleanLastPartNode(partition_id);
cleanLastPartNode(partition_id);
}
}
@ -6716,7 +6713,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
return true;
}
bool StorageReplicatedMergeTree::dropPartImpl(
void StorageReplicatedMergeTree::dropPartImpl(
zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop)
{
LOG_TRACE(log, "Will try to insert a log entry to DROP_RANGE for part: " + part_name);
@ -6733,7 +6730,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
{
if (throw_if_noop)
throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART);
return false;
return;
}
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
@ -6744,7 +6741,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
throw Exception("Part " + part_name
+ " is currently participating in a background operation (mutation/merge)"
+ ", try again later", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
return false;
return;
}
if (partIsLastQuorumPart(part->info))
@ -6752,7 +6749,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
if (throw_if_noop)
throw Exception("Part " + part_name + " is last inserted part with quorum in partition. Cannot drop",
ErrorCodes::NOT_IMPLEMENTED);
return false;
return;
}
if (partIsInsertingWithParallelQuorum(part->info))
@ -6760,7 +6757,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
if (throw_if_noop)
throw Exception("Part " + part_name + " is inserting with parallel quorum. Cannot drop",
ErrorCodes::NOT_IMPLEMENTED);
return false;
return;
}
Coordination::Requests ops;
@ -6802,7 +6799,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses[clear_block_ops_size + 1]).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
return true;
return;
}
}

View File

@ -648,7 +648,7 @@ private:
bool fetch_part,
ContextPtr query_context) override;
bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
void dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed