From 9c3e4cdc6e33a2976febed0277aa138128e2e111 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 15 Mar 2022 14:35:48 +0100 Subject: [PATCH] fix --- src/Interpreters/MergeTreeTransaction.cpp | 22 +++++++++++++++++++--- src/Interpreters/MergeTreeTransaction.h | 3 ++- src/Interpreters/TransactionLog.cpp | 3 ++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index 263282f36d5..c03ce2f530c 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -135,7 +135,7 @@ bool MergeTreeTransaction::isReadOnly() const return storages.empty(); } -void MergeTreeTransaction::beforeCommit() +scope_guard MergeTreeTransaction::beforeCommit() { RunningMutationsList mutations_to_wait; { @@ -162,6 +162,13 @@ void MergeTreeTransaction::beforeCommit() throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected); } + + /// We should set CSN back to Unknown if we will fail to commit transaction for some reason (connection loss, etc) + return [this]() + { + CSN expected_value = Tx::CommittingCSN; + csn.compare_exchange_strong(expected_value, Tx::UnknownCSN); + }; } void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept @@ -196,10 +203,19 @@ bool MergeTreeTransaction::rollback() noexcept /// It's not a problem if server crash at this point /// because on startup we will see that TID is not committed and will simply discard these changes. - /// Forcefully stop related mutations if any - for (const auto & table_and_mutation : mutations) + /// Forcefully stop related mutations if any (call killMutation with unlocked mutex) + RunningMutationsList mutations_to_kill; + { + std::lock_guard lock{mutex}; + mutations_to_kill = mutations; + } + + for (const auto & table_and_mutation : mutations_to_kill) table_and_mutation.first->killMutation(table_and_mutation.second); + std::lock_guard lock{mutex}; + assert(mutations == mutations_to_kill); + /// Kind of optimization: cleanup thread can remove these parts immediately for (const auto & part : creating_parts) part->version.creation_csn.store(Tx::RolledBackCSN); diff --git a/src/Interpreters/MergeTreeTransaction.h b/src/Interpreters/MergeTreeTransaction.h index a9d338c87bd..8d28c25fc51 100644 --- a/src/Interpreters/MergeTreeTransaction.h +++ b/src/Interpreters/MergeTreeTransaction.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -50,7 +51,7 @@ public: Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); } private: - void beforeCommit(); + scope_guard beforeCommit(); void afterCommit(CSN assigned_csn) noexcept; bool rollback() noexcept; void checkIsNotCancelled() const; diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index ae453bf40c4..3315387edc5 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -340,7 +340,7 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction() CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) { /// Some precommit checks, may throw - txn->beforeCommit(); + auto committing_lock = txn->beforeCommit(); CSN new_csn; if (txn->isReadOnly()) @@ -399,6 +399,7 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no if (!txn->rollback()) { /// Transaction was cancelled concurrently, it's already rolled back. + assert(txn->csn == Tx::RolledBackCSN); return; }