This commit is contained in:
Alexander Tokmakov 2022-03-15 14:35:48 +01:00
parent c8d6c13c2d
commit 9c3e4cdc6e
3 changed files with 23 additions and 5 deletions

View File

@ -135,7 +135,7 @@ bool MergeTreeTransaction::isReadOnly() const
return storages.empty(); return storages.empty();
} }
void MergeTreeTransaction::beforeCommit() scope_guard MergeTreeTransaction::beforeCommit()
{ {
RunningMutationsList mutations_to_wait; RunningMutationsList mutations_to_wait;
{ {
@ -162,6 +162,13 @@ void MergeTreeTransaction::beforeCommit()
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled"); throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
} }
/// We should set CSN back to Unknown if we will fail to commit transaction for some reason (connection loss, etc)
return [this]()
{
CSN expected_value = Tx::CommittingCSN;
csn.compare_exchange_strong(expected_value, Tx::UnknownCSN);
};
} }
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
@ -196,10 +203,19 @@ bool MergeTreeTransaction::rollback() noexcept
/// It's not a problem if server crash at this point /// It's not a problem if server crash at this point
/// because on startup we will see that TID is not committed and will simply discard these changes. /// because on startup we will see that TID is not committed and will simply discard these changes.
/// Forcefully stop related mutations if any /// Forcefully stop related mutations if any (call killMutation with unlocked mutex)
for (const auto & table_and_mutation : mutations) RunningMutationsList mutations_to_kill;
{
std::lock_guard lock{mutex};
mutations_to_kill = mutations;
}
for (const auto & table_and_mutation : mutations_to_kill)
table_and_mutation.first->killMutation(table_and_mutation.second); table_and_mutation.first->killMutation(table_and_mutation.second);
std::lock_guard lock{mutex};
assert(mutations == mutations_to_kill);
/// Kind of optimization: cleanup thread can remove these parts immediately /// Kind of optimization: cleanup thread can remove these parts immediately
for (const auto & part : creating_parts) for (const auto & part : creating_parts)
part->version.creation_csn.store(Tx::RolledBackCSN); part->version.creation_csn.store(Tx::RolledBackCSN);

View File

@ -3,6 +3,7 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <base/scope_guard.h>
#include <list> #include <list>
#include <unordered_set> #include <unordered_set>
@ -50,7 +51,7 @@ public:
Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); } Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); }
private: private:
void beforeCommit(); scope_guard beforeCommit();
void afterCommit(CSN assigned_csn) noexcept; void afterCommit(CSN assigned_csn) noexcept;
bool rollback() noexcept; bool rollback() noexcept;
void checkIsNotCancelled() const; void checkIsNotCancelled() const;

View File

@ -340,7 +340,7 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
{ {
/// Some precommit checks, may throw /// Some precommit checks, may throw
txn->beforeCommit(); auto committing_lock = txn->beforeCommit();
CSN new_csn; CSN new_csn;
if (txn->isReadOnly()) if (txn->isReadOnly())
@ -399,6 +399,7 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
if (!txn->rollback()) if (!txn->rollback())
{ {
/// Transaction was cancelled concurrently, it's already rolled back. /// Transaction was cancelled concurrently, it's already rolled back.
assert(txn->csn == Tx::RolledBackCSN);
return; return;
} }