Require explicit rename of parts in transaction

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-04-04 20:50:30 +02:00
parent ca2c720d0e
commit 3675c27fe9
9 changed files with 61 additions and 36 deletions

View File

@ -3894,7 +3894,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts
}
}
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename)
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction)
{
part->is_temp = false;
part->setState(DataPartState::PreActive);
@ -3906,9 +3906,15 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
}());
if (need_rename && !rename_in_transaction)
part->renameTo(part->name, true);
LOG_TEST(log, "preparePartForCommit: inserting {} into data_parts_indexes", part->getNameWithState());
data_parts_indexes.insert(part);
out_transaction.addPart(part, need_rename);
if (rename_in_transaction)
out_transaction.addPart(part, need_rename);
else
out_transaction.addPart(part, /* need_rename= */ false);
}
bool MergeTreeData::addTempPart(
@ -3957,7 +3963,8 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
DataPartsVector * out_covered_parts,
bool rename_in_transaction)
{
LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID());
@ -3996,7 +4003,7 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
preparePartForCommit(part, out_transaction, /* need_rename */ true);
preparePartForCommit(part, out_transaction, /* need_rename= */ true, rename_in_transaction);
if (out_covered_parts)
{
@ -4011,29 +4018,31 @@ bool MergeTreeData::renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
bool rename_in_transaction)
{
return renameTempPartAndReplaceImpl(part, out_transaction, lock, out_covered_parts);
return renameTempPartAndReplaceImpl(part, out_transaction, lock, /*out_covered_parts=*/ nullptr, rename_in_transaction);
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction)
Transaction & out_transaction,
bool rename_in_transaction)
{
auto part_lock = lockParts();
DataPartsVector covered_parts;
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts);
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts, rename_in_transaction);
return covered_parts;
}
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
DataPartsLock & lock,
bool rename_in_transaction)
{
DataPartsVector covered_parts;
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts, rename_in_transaction))
return false;
if (!covered_parts.empty())
@ -4242,7 +4251,7 @@ MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromW
auto [new_data_part, tmp_dir_holder] = createEmptyPart(empty_info, partition, empty_part_name, NO_TRANSACTION_PTR);
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndAdd(new_data_part, transaction, lock); /// All covered parts must be already removed
renameTempPartAndAdd(new_data_part, transaction, lock, /*rename_in_transaction=*/ true); /// All covered parts must be already removed
transaction.renameParts();
/// It will add the empty part to the set of Outdated parts without making it Active (exactly what we need)
@ -6683,6 +6692,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock
if (!isEmpty())
{
if (!precommitted_parts_need_rename.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts not renamed");
auto settings = data.getSettings();
auto parts_lock = acquired_parts_lock ? DataPartsLock() : data.lockParts();
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;

View File

@ -590,20 +590,22 @@ public:
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & transaction,
DataPartsLock & lock);
DataPartsLock & lock,
bool rename_in_transaction);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction);
Transaction & out_transaction,
bool rename_in_transaction);
/// Unlocked version of previous one. Useful when added multiple parts with a single lock.
bool renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
bool rename_in_transaction);
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
@ -1604,7 +1606,10 @@ private:
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommitted state and to transaction
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);
///
/// @param need_rename - rename the part
/// @param rename_in_transaction - if set, the rename will be done as part of transaction (without holding DataPartsLock), otherwise inplace (when it does not make sense).
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction = false);
/// Low-level method for preparing parts for commit (in-memory).
/// FIXME Merge MergeTreeTransaction and Transaction
@ -1612,7 +1617,8 @@ private:
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts);
DataPartsVector * out_covered_parts,
bool rename_in_transaction);
/// RAII Wrapper for atomic work with currently moving parts
/// Acquire them in constructor and remove them in destructor

View File

@ -748,7 +748,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
"but transactions were enabled for this table");
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction, /*rename_in_transaction=*/ true);
/// Explicitly rename part while still holding the lock for tmp folder to avoid cleanup
out_transaction.renameParts();

View File

@ -186,7 +186,8 @@ void MergeTreeSink::finishDelayedChunk()
}
}
added = storage.renameTempPartAndAdd(part, transaction, lock);
/// FIXME
added = storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}

View File

@ -236,10 +236,11 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
if (data_part_storage.hasActiveTransaction())
data_part_storage.precommitTransaction();
storage.renameTempPartAndReplace(new_part, *transaction_ptr);
storage.renameTempPartAndReplace(new_part, *transaction_ptr, /*rename_in_transaction=*/ true);
try
{
transaction_ptr->renameParts();
storage.checkPartChecksumsAndCommit(*transaction_ptr, new_part, mutate_task->getHardlinkedFiles());
}
catch (const Exception & e)

View File

@ -97,7 +97,8 @@ bool MutatePlainMergeTreeTask::executeStep()
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
storage.renameTempPartAndReplace(new_part, transaction);
storage.renameTempPartAndReplace(new_part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");

View File

@ -888,7 +888,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
try
{
auto lock = storage.lockParts();
storage.renameTempPartAndAdd(part, transaction, lock);
storage.renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
}
catch (const Exception & e)
{

View File

@ -1788,7 +1788,7 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa
for (auto & part: new_parts)
{
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction);
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
if (covered_parts_by_one_part.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
@ -1798,10 +1798,10 @@ void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_pa
std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts));
}
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.",
covered_parts.size(), new_parts.size(), transaction.getTID());
transaction.renameParts();
transaction.commit();
/// Remove covered parts without waiting for old_parts_lifetime seconds.
@ -2064,7 +2064,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
{
auto lock = lockParts();
fillNewPartNameAndResetLevel(loaded_parts[i], lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock);
renameTempPartAndAdd(loaded_parts[i], transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}
@ -2180,8 +2180,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
for (auto part : dst_parts)
{
fillNewPartName(part, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true);
}
transaction.renameParts();
/// Populate transaction
transaction.commit(&data_parts_lock);
@ -2284,10 +2285,9 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
for (auto & part : dst_parts)
{
dest_table_storage->fillNewPartName(part, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false);
}
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
@ -2447,7 +2447,7 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
auto lock = lockParts();
fillNewPartName(part, lock);
renameTempPartAndAdd(part, transaction, lock);
renameTempPartAndAdd(part, transaction, lock, /*rename_in_transaction=*/ false);
transaction.commit(&lock);
}
}

View File

@ -2093,7 +2093,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
Transaction transaction(*this, NO_TRANSACTION_RAW);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
renameTempPartAndReplace(part, transaction);
renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
checkPartChecksumsAndCommit(transaction, part);
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
@ -2882,11 +2883,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
Coordination::Requests ops;
for (PartDescriptionPtr & part_desc : final_parts)
{
renameTempPartAndReplace(part_desc->res_part, transaction);
renameTempPartAndReplace(part_desc->res_part, transaction, /*rename_in_transaction=*/ true);
getCommitPartOps(ops, part_desc->res_part);
lockSharedData(*part_desc->res_part, /* replace_existing_lock */ true, part_desc->hardlinked_files);
lockSharedData(*part_desc->res_part, /*replace_existing_lock=*/ true, part_desc->hardlinked_files);
}
transaction.renameParts();
if (!ops.empty())
@ -4958,7 +4959,8 @@ bool StorageReplicatedMergeTree::fetchPart(
if (!to_detached)
{
Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndReplace(part, transaction);
renameTempPartAndReplace(part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
chassert(!part_to_clone || !is_zero_copy_part(part));
replaced_parts = checkPartChecksumsAndCommit(transaction, part, /*hardlinked_files*/ {}, /*replace_zero_copy_lock*/ true);
@ -8202,8 +8204,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
{
auto data_parts_lock = lockParts();
for (auto & part : dst_parts)
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock, /*rename_in_transaction=*/ true);
}
transaction.renameParts();
for (const auto & dst_part : dst_parts)
lockSharedData(*dst_part, false, /*hardlinked_files*/ {});
@ -8478,7 +8481,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto dest_data_parts_lock = dest_table_storage->lockParts();
for (auto & part : dst_parts)
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock, /*rename_in_transaction=*/ false);
for (const auto & dst_part : dst_parts)
dest_table_storage->lockSharedData(*dst_part, false, /*hardlinked_files*/ {});
@ -10111,7 +10114,8 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
try
{
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction);
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction, /*rename_in_transaction=*/ true);
transaction.renameParts();
if (!replaced_parts.empty())
{