Remove more trash

This commit is contained in:
alesapin 2022-04-15 16:24:38 +02:00
parent 2fcc00380d
commit eb7593f786
7 changed files with 18 additions and 28 deletions

View File

@ -1514,8 +1514,6 @@ try
SyncGuardPtr sync_guard;
if (storage.getSettings()->fsync_part_directory)
sync_guard = volume->getDisk()->getDirectorySyncGuard(to);
storage.lockSharedData(*this);
}
catch (...)
{
@ -1530,14 +1528,6 @@ catch (...)
throw;
}
void IMergeTreeDataPart::cleanupOldName(const String & old_part_name) const
{
if (name == old_part_name)
return;
storage.unlockSharedData(*this, old_part_name);
}
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
{
/// NOTE: It's needed for zero-copy replication

View File

@ -354,9 +354,6 @@ public:
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const;
/// Cleanup shared locks made with old name after part renaming
virtual void cleanupOldName(const String & old_part_name) const;
/// Makes clone of a part in detached/ directory via hard links
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -2663,8 +2663,6 @@ bool MergeTreeData::renameTempPartAndReplace(
MergeTreePartInfo part_info = part->info;
String part_name;
String old_part_name = part->name;
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{
if (part->partition.value != existing_part_in_partition->partition.value)
@ -2786,9 +2784,6 @@ bool MergeTreeData::renameTempPartAndReplace(
out_covered_parts->emplace_back(std::move(covered_part));
}
/// Cleanup shared locks made with old name
part->cleanupOldName(old_part_name);
return true;
}

View File

@ -100,8 +100,6 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
if (sync)
file->sync();
}
part->storage.lockSharedData(*part);
}
MergedBlockOutputStream::Finalizer::~Finalizer()

View File

@ -481,7 +481,6 @@ void finalizeMutatedPart(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->volume->getDisk(), part_path));
new_data_part->default_codec = codec;
new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
new_data_part->storage.lockSharedData(*new_data_part);
}
}

View File

@ -314,8 +314,6 @@ void ReplicatedMergeTreeSink::commitPart(
bool is_already_existing_part = false;
String old_part_name = part->name;
while (true)
{
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
@ -499,6 +497,8 @@ void ReplicatedMergeTreeSink::commitPart(
part->name);
}
storage.lockSharedData(*part, false);
Coordination::Responses responses;
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
@ -553,11 +553,13 @@ void ReplicatedMergeTreeSink::commitPart(
}
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else
{
storage.unlockSharedData(*part);
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
transaction.rollback();
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
@ -567,12 +569,14 @@ void ReplicatedMergeTreeSink::commitPart(
}
else if (Coordination::isHardwareError(multi_code))
{
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
else
{
storage.unlockSharedData(*part);
transaction.rollback();
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
@ -595,9 +599,6 @@ void ReplicatedMergeTreeSink::commitPart(
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
}
/// Cleanup shared locks made with old name
part->cleanupOldName(old_part_name);
}
void ReplicatedMergeTreeSink::onStart()

View File

@ -1450,11 +1450,14 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
{
auto zookeeper = getZooKeeper();
while (true)
{
Coordination::Requests ops;
NameSet absent_part_paths_on_replicas;
lockSharedData(*part, false);
/// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas);
@ -1493,7 +1496,10 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
LOG_INFO(log, "The part {} on a replica suddenly appeared, will recheck checksums", e.getPathForFirstFailedOp());
}
else
{
unlockSharedData(*part);
throw;
}
}
}
}
@ -7296,7 +7302,9 @@ void StorageReplicatedMergeTree::createTableSharedID()
void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
if (!disk || !disk->supportZeroCopyReplication())
auto settings = getSettings();
if (!disk || !disk->supportZeroCopyReplication() || !settings->allow_remote_fs_zero_copy_replication)
return;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
@ -7320,7 +7328,9 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock) const
{
if (!part.volume || !part.isStoredOnDisk())
auto settings = getSettings();
if (!part.volume || !part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication)
return;
DiskPtr disk = part.volume->getDisk();