Remove transaction argument

This commit is contained in:
alesapin 2022-06-24 13:34:00 +02:00
parent 9910395823
commit af1a9d18ab
12 changed files with 31 additions and 33 deletions

View File

@ -264,7 +264,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
/// Task is not needed
merge_task.reset();
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, transaction_ptr.get());
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
try
{

View File

@ -117,7 +117,7 @@ void MergePlainMergeTreeTask::finish()
new_part = merge_task->getFuture().get();
MergeTreeData::Transaction transaction(storage, txn.get());
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, &transaction);
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
transaction.commit();
write_part_log({});

View File

@ -2786,7 +2786,6 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
Transaction & out_transaction,
SimpleIncrement * increment,
MergeTreeDeduplicationLog * deduplication_log,
@ -2799,7 +2798,7 @@ bool MergeTreeData::renameTempPartAndAdd(
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplaceImpl(part, txn, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
if (!renameTempPartAndReplaceImpl(part, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
return false;
}
if (!covered_parts.empty())
@ -2812,7 +2811,6 @@ bool MergeTreeData::renameTempPartAndAdd(
bool MergeTreeData::renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
SimpleIncrement * increment,
Transaction & out_transaction,
std::unique_lock<std::mutex> & lock,
@ -2824,9 +2822,6 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
ErrorCodes::LOGICAL_ERROR);
if (txn)
transactions_enabled.store(true);
part->assertState({DataPartState::Temporary});
MergeTreePartInfo part_info = part->info;
@ -2903,7 +2898,6 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
data_parts_indexes.insert(part);
chassert(out_transaction.txn == txn);
out_transaction.precommitted_parts.insert(part);
auto part_in_memory = asInMemoryPart(part);
@ -2924,7 +2918,6 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
Transaction & out_transaction,
SimpleIncrement * increment,
MergeTreeDeduplicationLog * deduplication_log,
@ -2935,11 +2928,11 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
if (!lock)
{
auto part_lock = lockParts();
renameTempPartAndReplaceImpl(part, txn, increment, out_transaction, part_lock, &covered_parts, deduplication_log);
renameTempPartAndReplaceImpl(part, increment, out_transaction, part_lock, &covered_parts, deduplication_log);
}
else
{
renameTempPartAndReplaceImpl(part, txn, increment, out_transaction, *lock, &covered_parts, deduplication_log);
renameTempPartAndReplaceImpl(part, increment, out_transaction, *lock, &covered_parts, deduplication_log);
}
}
return covered_parts;
@ -4850,6 +4843,14 @@ MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
}
MergeTreeData::Transaction::Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_)
: data(data_)
, txn(txn_)
{
if (txn)
data.transactions_enabled.store(true);
}
void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
{
if (!isEmpty())

View File

@ -252,7 +252,7 @@ public:
class Transaction : private boost::noncopyable
{
public:
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_) : data(data_), txn(txn_) {}
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_);
DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
@ -556,7 +556,6 @@ public:
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
Transaction & transaction,
SimpleIncrement * increment = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
@ -567,7 +566,6 @@ public:
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
Transaction & out_transaction,
SimpleIncrement * increment = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
@ -1248,7 +1246,6 @@ private:
/// FIXME Transactions: remove add_to_txn flag, maybe merge MergeTreeTransaction and Transaction
bool renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
MergeTreeTransaction * txn,
SimpleIncrement * increment,
Transaction & out_transaction,
DataPartsLock & lock,

View File

@ -541,7 +541,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
MergeTreeData::MutableDataPartPtr & new_data_part,
const MergeTreeData::DataPartsVector & parts,
const MergeTreeTransactionPtr & txn,
MergeTreeData::Transaction * out_transaction)
MergeTreeData::Transaction & out_transaction)
{
/// Some of source parts was possibly created in transaction, so non-transactional merge may break isolation.
if (data.transactions_enabled.load(std::memory_order_relaxed) && !txn)
@ -549,7 +549,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
"but transactions were enabled for this table");
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, txn.get(), *out_transaction);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction);
/// Let's check that all original parts have been deleted and only them.
if (replaced_parts.size() != parts.size())

View File

@ -133,7 +133,7 @@ public:
MergeTreeData::MutableDataPartPtr & new_data_part,
const MergeTreeData::DataPartsVector & parts,
const MergeTreeTransactionPtr & txn,
MergeTreeData::Transaction * out_transaction = nullptr);
MergeTreeData::Transaction & out_transaction);
/// The approximate amount of disk space needed for merge or mutation. With a surplus.

View File

@ -135,7 +135,7 @@ void MergeTreeSink::finishDelayedChunk()
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), transaction, &storage.increment, storage.getDeduplicationLog(), partition.block_dedup_token))
if (storage.renameTempPartAndAdd(part, transaction, &storage.increment, storage.getDeduplicationLog(), partition.block_dedup_token))
{
transaction.commit();
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);

View File

@ -171,7 +171,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
{
new_part = mutate_task->getFuture().get();
storage.renameTempPartAndReplace(new_part, NO_TRANSACTION_RAW, *transaction_ptr);
storage.renameTempPartAndReplace(new_part, *transaction_ptr);
try
{

View File

@ -86,7 +86,7 @@ bool MutatePlainMergeTreeTask::executeStep()
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
storage.renameTempPartAndReplace(new_part, merge_mutate_entry->txn.get(), transaction);
storage.renameTempPartAndReplace(new_part, transaction);
transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, "");

View File

@ -476,7 +476,7 @@ void ReplicatedMergeTreeSink::commitPart(
try
{
renamed = storage.renameTempPartAndAdd(part, NO_TRANSACTION_RAW, transaction);
renamed = storage.renameTempPartAndAdd(part, transaction);
}
catch (const Exception & e)
{

View File

@ -1543,7 +1543,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
String old_name = renamed_parts.old_and_new_names[i].old_name;
MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get());
renameTempPartAndAdd(loaded_parts[i], local_context->getCurrentTransaction().get(), transaction, &increment);
renameTempPartAndAdd(loaded_parts[i], transaction, &increment);
transaction.commit();
renamed_parts.old_and_new_names[i].old_name.clear();
@ -1619,7 +1619,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
/// Populate transaction
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, local_context->getCurrentTransaction().get(), transaction, &increment, nullptr, &data_parts_lock);
renameTempPartAndReplace(part, transaction, &increment, nullptr, &data_parts_lock);
transaction.commit(&data_parts_lock);
@ -1697,7 +1697,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, local_context->getCurrentTransaction().get(), transaction, &dest_table_storage->increment, nullptr, &lock);
dest_table_storage->renameTempPartAndReplace(part, transaction, &dest_table_storage->increment, nullptr, &lock);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, lock);
transaction.commit(&lock);
@ -1792,7 +1792,7 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
for (auto part : parts)
{
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndAdd(part, NO_TRANSACTION_RAW, transaction, &increment);
renameTempPartAndAdd(part, transaction, &increment);
transaction.commit();
}
}

View File

@ -1657,7 +1657,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
Transaction transaction(*this, NO_TRANSACTION_RAW);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
renameTempPartAndReplace(part, NO_TRANSACTION_RAW, transaction);
renameTempPartAndReplace(part, transaction);
checkPartChecksumsAndCommit(transaction, part);
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
@ -2342,7 +2342,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
Coordination::Requests ops;
for (PartDescriptionPtr & part_desc : final_parts)
{
renameTempPartAndReplace(part_desc->res_part, NO_TRANSACTION_RAW, transaction);
renameTempPartAndReplace(part_desc->res_part, transaction);
getCommitPartOps(ops, part_desc->res_part);
lockSharedData(*part_desc->res_part, false, part_desc->hardlinked_files);
@ -4081,7 +4081,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
if (!to_detached)
{
Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndReplace(part, NO_TRANSACTION_RAW, transaction);
renameTempPartAndReplace(part, transaction);
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files);
@ -6604,7 +6604,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
auto data_parts_lock = lockParts();
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, query_context->getCurrentTransaction().get(), transaction, nullptr, nullptr, &data_parts_lock);
renameTempPartAndReplace(part, transaction, nullptr, nullptr, &data_parts_lock);
}
for (size_t i = 0; i < dst_parts.size(); ++i)
@ -6841,7 +6841,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
DataPartsLock lock(mutex);
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, query_context->getCurrentTransaction().get(), transaction, nullptr, nullptr, &lock);
dest_table_storage->renameTempPartAndReplace(part, transaction, nullptr, nullptr, &lock);
for (size_t i = 0; i < dst_parts.size(); ++i)
dest_table_storage->lockSharedData(*dst_parts[i], false, hardlinked_files_for_parts[i]);
@ -8020,7 +8020,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
try
{
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
auto replaced_parts = renameTempPartAndReplace(new_data_part, NO_TRANSACTION_RAW, transaction);
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction);
if (!replaced_parts.empty())
{