From 12bbb7de87e005f9507863a056d692a6dcaf5a76 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 May 2022 12:41:44 +0200 Subject: [PATCH 1/7] fix race on TID allocation --- base/base/defines.h | 19 +++++++ src/Common/Exception.cpp | 15 ++++- src/Common/Exception.h | 6 +- src/Interpreters/DDLWorker.cpp | 2 +- src/Interpreters/MergeTreeTransaction.cpp | 6 +- src/Interpreters/TransactionLog.cpp | 49 +++++++++------- .../TransactionVersionMetadata.cpp | 30 +++++----- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 20 +++---- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- .../01133_begin_commit_race.reference | 0 .../0_stateless/01133_begin_commit_race.sh | 56 +++++++++++++++++++ 11 files changed, 148 insertions(+), 57 deletions(-) create mode 100644 tests/queries/0_stateless/01133_begin_commit_race.reference create mode 100755 tests/queries/0_stateless/01133_begin_commit_race.sh diff --git a/base/base/defines.h b/base/base/defines.h index bd98e99f5b9..084e710abf6 100644 --- a/base/base/defines.h +++ b/base/base/defines.h @@ -105,6 +105,25 @@ # define ASAN_POISON_MEMORY_REGION(a, b) #endif +#if !defined(ABORT_ON_LOGICAL_ERROR) + #if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER) + #define ABORT_ON_LOGICAL_ERROR + #endif +#endif + +/// chassert(x) is similar to assert(x), but: +/// - works in builds with sanitizers, not only in debug builds +/// - tries to print failed assertion into server log +/// It can be used for all assertions except heavy ones. +/// Heavy assertions (that run loops or call complex functions) are allowed in debug builds only. +#if !defined(chassert) + #if defined(ABORT_ON_LOGICAL_ERROR) + #define chassert(x) static_cast(x) ? void(0) : abortOnFailedAssertion(#x) + #else + #define chassert(x) ((void)0) + #endif +#endif + /// A template function for suppressing warnings about unused variables or function results. template constexpr void UNUSED(Args &&... args [[maybe_unused]]) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 21f605ad353..d0f7af2da6b 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -35,6 +35,18 @@ namespace ErrorCodes extern const int CANNOT_MREMAP; } +void abortOnFailedAssertion(const String & description) +{ + LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", description); + + /// This is to suppress -Wmissing-noreturn + volatile bool always_false = false; + if (always_false) + return; + + abort(); +} + /// - Aborts the process if error code is LOGICAL_ERROR. /// - Increments error codes statistics. void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool remote, const Exception::FramePointers & trace) @@ -44,8 +56,7 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool #ifdef ABORT_ON_LOGICAL_ERROR if (code == ErrorCodes::LOGICAL_ERROR) { - LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", msg); - abort(); + abortOnFailedAssertion(msg); } #endif diff --git a/src/Common/Exception.h b/src/Common/Exception.h index b2fc369237e..7a96aee555c 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -12,16 +12,14 @@ #include -#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER) -#define ABORT_ON_LOGICAL_ERROR -#endif - namespace Poco { class Logger; } namespace DB { +void abortOnFailedAssertion(const String & description); + class Exception : public Poco::Exception { public: diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 0e12c5e9e5a..2b2de84c314 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -287,7 +287,7 @@ void DDLWorker::scheduleTasks(bool reinitialized) Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event); size_t size_before_filtering = queue_nodes.size(); filterAndSortQueueNodes(queue_nodes); - /// The following message is too verbose, but it can be useful too debug mysterious test failures in CI + /// The following message is too verbose, but it can be useful to debug mysterious test failures in CI LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, " "entries={}..{}, " "first_failed_task_name={}, current_tasks_size={}, " diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index c0d3cdfeb62..d3f523aafc9 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -158,7 +158,7 @@ void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & bool MergeTreeTransaction::isReadOnly() const { std::lock_guard lock{mutex}; - assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty()); + chassert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty()); return storages.empty(); } @@ -204,7 +204,7 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept /// and we will be able to remove old entries from transaction log in ZK. /// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log. [[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn); - assert(prev_value == Tx::CommittingCSN); + chassert(prev_value == Tx::CommittingCSN); for (const auto & part : creating_parts) { part->version.creation_csn.store(csn); @@ -321,7 +321,7 @@ String MergeTreeTransaction::dumpDescription() const { String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn); std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info)); - assert(!part->version.creation_csn || part->version.creation_csn <= snapshot); + chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot); } for (const auto & mutation : mutations) diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index e65630d907b..3fe0920427e 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -145,24 +145,29 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite NOEXCEPT_SCOPE; LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); - std::lock_guard lock{mutex}; - for (const auto & entry : loaded) { - if (entry.first == Tx::EmptyTID.getHash()) - continue; + std::lock_guard lock{mutex}; + for (const auto & entry : loaded) + { + if (entry.first == Tx::EmptyTID.getHash()) + continue; - tid_to_csn.emplace(entry.first, entry.second); + tid_to_csn.emplace(entry.first, entry.second); + } + last_loaded_entry = last_entry; + } + { + std::lock_guard lock{running_list_mutex}; + latest_snapshot = loaded.back().second.csn; + local_tid_counter = Tx::MaxReservedLocalTID; } - last_loaded_entry = last_entry; - latest_snapshot = loaded.back().second.csn; - local_tid_counter = Tx::MaxReservedLocalTID; } void TransactionLog::loadLogFromZooKeeper() { - assert(!zookeeper); - assert(tid_to_csn.empty()); - assert(last_loaded_entry.empty()); + chassert(!zookeeper); + chassert(tid_to_csn.empty()); + chassert(last_loaded_entry.empty()); zookeeper = global_context->getZooKeeper(); /// We do not write local_tid_counter to disk or zk and maintain it only in memory. @@ -172,7 +177,7 @@ void TransactionLog::loadLogFromZooKeeper() if (code != Coordination::Error::ZOK) { /// Log probably does not exist, create it - assert(code == Coordination::Error::ZNONODE); + chassert(code == Coordination::Error::ZNONODE); zookeeper->createAncestors(zookeeper_path_log); Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", serializeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent)); @@ -192,11 +197,11 @@ void TransactionLog::loadLogFromZooKeeper() /// 2. simplify log rotation /// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers) Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event); - assert(!entries_list.empty()); + chassert(!entries_list.empty()); std::sort(entries_list.begin(), entries_list.end()); loadEntries(entries_list.begin(), entries_list.end()); - assert(!last_loaded_entry.empty()); - assert(latest_snapshot == deserializeCSN(last_loaded_entry)); + chassert(!last_loaded_entry.empty()); + chassert(latest_snapshot == deserializeCSN(last_loaded_entry)); local_tid_counter = Tx::MaxReservedLocalTID; tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr")); @@ -241,12 +246,12 @@ void TransactionLog::runUpdatingThread() void TransactionLog::loadNewEntries() { Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event); - assert(!entries_list.empty()); + chassert(!entries_list.empty()); std::sort(entries_list.begin(), entries_list.end()); auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry); loadEntries(it, entries_list.end()); - assert(last_loaded_entry == entries_list.back()); - assert(latest_snapshot == deserializeCSN(last_loaded_entry)); + chassert(last_loaded_entry == entries_list.back()); + chassert(latest_snapshot == deserializeCSN(last_loaded_entry)); latest_snapshot.notify_all(); } @@ -396,7 +401,7 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no if (!txn->rollback()) { /// Transaction was cancelled concurrently, it's already rolled back. - assert(txn->csn == Tx::RolledBackCSN); + chassert(txn->csn == Tx::RolledBackCSN); return; } @@ -438,8 +443,8 @@ CSN TransactionLog::getCSN(const TIDHash & tid) CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const { - assert(tid_hash); - assert(tid_hash != Tx::EmptyTID.getHash()); + chassert(tid_hash); + chassert(tid_hash != Tx::EmptyTID.getHash()); std::lock_guard lock{mutex}; auto it = tid_to_csn.find(tid_hash); @@ -467,6 +472,8 @@ CSN TransactionLog::getOldestSnapshot() const std::lock_guard lock{running_list_mutex}; if (snapshots_in_use.empty()) return getLatestSnapshot(); + chassert(running_list.size() == snapshots_in_use.size()); + chassert(snapshots_in_use.size() < 2 || snapshots_in_use.front() <= *++snapshots_in_use.begin()); return snapshots_in_use.front(); } diff --git a/src/Interpreters/TransactionVersionMetadata.cpp b/src/Interpreters/TransactionVersionMetadata.cpp index b965ade8d10..fd75faaf206 100644 --- a/src/Interpreters/TransactionVersionMetadata.cpp +++ b/src/Interpreters/TransactionVersionMetadata.cpp @@ -88,8 +88,8 @@ void VersionMetadata::lockRemovalTID(const TransactionID & tid, const Transactio bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id) { - assert(!tid.isEmpty()); - assert(!creation_tid.isEmpty()); + chassert(!tid.isEmpty()); + chassert(!creation_tid.isEmpty()); TIDHash removal_lock_value = tid.getHash(); TIDHash expected_removal_lock_value = 0; bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value); @@ -115,7 +115,7 @@ bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const Transac void VersionMetadata::unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context) { LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name); - assert(!tid.isEmpty()); + chassert(!tid.isEmpty()); TIDHash removal_lock_value = tid.getHash(); TIDHash locked_by = removal_tid_lock.load(); @@ -145,7 +145,7 @@ bool VersionMetadata::isRemovalTIDLocked() const void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context) { /// NOTE ReplicatedMergeTreeSink may add one part multiple times - assert(creation_tid.isEmpty() || creation_tid == tid); + chassert(creation_tid.isEmpty() || creation_tid == tid); creation_tid = tid; if (context) tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, *context); @@ -158,7 +158,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn) bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid) { - assert(!creation_tid.isEmpty()); + chassert(!creation_tid.isEmpty()); CSN creation = creation_csn.load(std::memory_order_relaxed); TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed); CSN removal = removal_csn.load(std::memory_order_relaxed); @@ -166,10 +166,10 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid) [[maybe_unused]] bool had_creation_csn = creation; [[maybe_unused]] bool had_removal_tid = removal_lock; [[maybe_unused]] bool had_removal_csn = removal; - assert(!had_removal_csn || had_removal_tid); - assert(!had_removal_csn || had_creation_csn); - assert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation); - assert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal); + chassert(!had_removal_csn || had_removal_tid); + chassert(!had_removal_csn || had_creation_csn); + chassert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation); + chassert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal); /// Special snapshot for introspection purposes if (unlikely(snapshot_version == Tx::EverythingVisibleCSN)) @@ -204,8 +204,8 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid) /// Data part has creation_tid/removal_tid, but does not have creation_csn/removal_csn. /// It means that some transaction is creating/removing the part right now or has done it recently /// and we don't know if it was already committed or not. - assert(!had_creation_csn || (had_removal_tid && !had_removal_csn)); - assert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash())); + chassert(!had_creation_csn || (had_removal_tid && !had_removal_csn)); + chassert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash())); /// Before doing CSN lookup, let's check some extra conditions. /// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid @@ -347,8 +347,8 @@ void VersionMetadata::write(WriteBuffer & buf) const if (removal_tid_lock) { - assert(!removal_tid.isEmpty()); - assert(removal_tid.getHash() == removal_tid_lock); + chassert(!removal_tid.isEmpty()); + chassert(removal_tid.getHash() == removal_tid_lock); writeRemovalTID(buf); writeCSN(buf, REMOVAL, /* internal */ true); } @@ -384,7 +384,7 @@ void VersionMetadata::read(ReadBuffer & buf) if (name == CREATION_CSN_STR) { - assert(!creation_csn); + chassert(!creation_csn); creation_csn = read_csn(); } else if (name == REMOVAL_TID_STR) @@ -398,7 +398,7 @@ void VersionMetadata::read(ReadBuffer & buf) { if (removal_tid.isEmpty()) throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found removal_csn in metadata file, but removal_tid is {}", removal_tid); - assert(!removal_csn); + chassert(!removal_csn); removal_csn = read_csn(); } else diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 2c9dd2b4934..2ddca8dce26 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1282,12 +1282,12 @@ void IMergeTreeDataPart::storeVersionMetadata() const void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const { - assert(!version.creation_tid.isEmpty()); - assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric())); - assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0)); - assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty()))); - assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0)); - assert(isStoredOnDisk()); + chassert(!version.creation_tid.isEmpty()); + chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric())); + chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0)); + chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty()))); + chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0)); + chassert(isStoredOnDisk()); /// Small enough appends to file are usually atomic, /// so we append new metadata instead of rewriting file to reduce number of fsyncs. @@ -1303,10 +1303,10 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const { - assert(!version.creation_tid.isEmpty()); - assert(version.removal_csn == 0); - assert(!version.removal_tid.isEmpty()); - assert(isStoredOnDisk()); + chassert(!version.creation_tid.isEmpty()); + chassert(version.removal_csn == 0); + chassert(!version.removal_tid.isEmpty()); + chassert(isStoredOnDisk()); if (version.creation_tid.isPrehistoric() && !clear) { diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 50811daa4ab..d2c757f6750 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1364,7 +1364,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// Check if CSNs were witten after committing transaction, update and write if needed. bool version_updated = false; - assert(!version.creation_tid.isEmpty()); + chassert(!version.creation_tid.isEmpty()); if (!part->version.creation_csn) { auto min = TransactionLog::getCSN(version.creation_tid); diff --git a/tests/queries/0_stateless/01133_begin_commit_race.reference b/tests/queries/0_stateless/01133_begin_commit_race.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01133_begin_commit_race.sh b/tests/queries/0_stateless/01133_begin_commit_race.sh new file mode 100755 index 00000000000..29e7ef423a1 --- /dev/null +++ b/tests/queries/0_stateless/01133_begin_commit_race.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Tags: long + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mt"; +$CLICKHOUSE_CLIENT --query "CREATE TABLE mt (n Int64) ENGINE=MergeTree ORDER BY n SETTINGS old_parts_lifetime=0"; + + +function begin_commit_readonly() +{ + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + COMMIT;"; +} + +function begin_rollback_readonly() +{ + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + ROLLBACK;"; +} + +function begin_insert_commit() +{ + $CLICKHOUSE_CLIENT --multiquery --query " + BEGIN TRANSACTION; + INSERT INTO mt VALUES ($RANDOM); + COMMIT;"; +} + +function introspection() +{ + $CLICKHOUSE_CLIENT -q "SELECT * FROM system.transactions FORMAT Null" + $CLICKHOUSE_CLIENT -q "SELECT transactionLatestSnapshot(), transactionOldestSnapshot() FORMAT Null" +} + +export -f begin_commit_readonly +export -f begin_rollback_readonly +export -f begin_insert_commit +export -f introspection + +TIMEOUT=20 + +clickhouse_client_loop_timeout $TIMEOUT begin_commit_readonly & +clickhouse_client_loop_timeout $TIMEOUT begin_rollback_readonly & +clickhouse_client_loop_timeout $TIMEOUT begin_insert_commit & +clickhouse_client_loop_timeout $TIMEOUT introspection & + +wait + +$CLICKHOUSE_CLIENT --query "DROP TABLE mt"; From 10277985177ea9066109d3c04c18d54847e138b3 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 May 2022 17:35:29 +0200 Subject: [PATCH 2/7] handle connection loss on commit --- src/Common/ErrorCodes.cpp | 1 + .../InterpreterTransactionControlQuery.cpp | 15 ++- src/Interpreters/MergeTreeTransaction.cpp | 4 +- src/Interpreters/MergeTreeTransaction.h | 1 + src/Interpreters/TransactionLog.cpp | 102 ++++++++++++++---- src/Interpreters/TransactionLog.h | 7 ++ src/Interpreters/executeQuery.cpp | 5 +- .../System/StorageSystemTransactions.cpp | 1 + 8 files changed, 114 insertions(+), 22 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 5f78c79f606..5d5ce052aaa 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -627,6 +627,7 @@ M(656, MEILISEARCH_EXCEPTION) \ M(657, UNSUPPORTED_MEILISEARCH_TYPE) \ M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \ + M(659, UNKNOWN_STATUS_OF_TRANSACTION) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Interpreters/InterpreterTransactionControlQuery.cpp b/src/Interpreters/InterpreterTransactionControlQuery.cpp index 61b2a4e865f..148c6e93919 100644 --- a/src/Interpreters/InterpreterTransactionControlQuery.cpp +++ b/src/Interpreters/InterpreterTransactionControlQuery.cpp @@ -10,6 +10,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INVALID_TRANSACTION; + extern const int UNKNOWN_STATUS_OF_TRANSACTION; } BlockIO InterpreterTransactionControlQuery::execute() @@ -55,7 +56,17 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess if (txn->getState() != MergeTreeTransaction::RUNNING) throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state"); - TransactionLog::instance().commitTransaction(txn); + try + { + TransactionLog::instance().commitTransaction(txn); + } + catch (const Exception & e) + { + /// Detach transaction from current context if connection was lost and its status is unknown + if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION) + session_context->setCurrentTransaction(NO_TRANSACTION_PTR); + throw; + } session_context->setCurrentTransaction(NO_TRANSACTION_PTR); return {}; } @@ -67,6 +78,8 @@ BlockIO InterpreterTransactionControlQuery::executeRollback(ContextMutablePtr se throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction"); if (txn->getState() == MergeTreeTransaction::COMMITTED) throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state"); + if (txn->getState() == MergeTreeTransaction::COMMITTING) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTING state"); if (txn->getState() == MergeTreeTransaction::RUNNING) TransactionLog::instance().rollbackTransaction(txn); diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index d3f523aafc9..0607279ac68 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -38,8 +38,10 @@ void MergeTreeTransaction::setSnapshot(CSN new_snapshot) MergeTreeTransaction::State MergeTreeTransaction::getState() const { CSN c = csn.load(); - if (c == Tx::UnknownCSN || c == Tx::CommittingCSN) + if (c == Tx::UnknownCSN) return RUNNING; + if (c == Tx::CommittingCSN) + return COMMITTING; if (c == Tx::RolledBackCSN) return ROLLED_BACK; return COMMITTED; diff --git a/src/Interpreters/MergeTreeTransaction.h b/src/Interpreters/MergeTreeTransaction.h index 7ebea450dd0..6c34e8a8388 100644 --- a/src/Interpreters/MergeTreeTransaction.h +++ b/src/Interpreters/MergeTreeTransaction.h @@ -26,6 +26,7 @@ public: enum State { RUNNING, + COMMITTING, COMMITTED, ROLLED_BACK, }; diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index 3fe0920427e..c1ecc9e0f21 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -21,6 +21,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNKNOWN_STATUS_OF_TRANSACTION; } static void tryWriteEventToSystemLog(Poco::Logger * log, ContextPtr context, @@ -217,7 +218,8 @@ void TransactionLog::runUpdatingThread() if (stop_flag.load()) return; - if (getZooKeeper()->expired()) + bool connection_loss = getZooKeeper()->expired(); + if (connection_loss) { auto new_zookeeper = global_context->getZooKeeper(); std::lock_guard lock{mutex}; @@ -226,6 +228,9 @@ void TransactionLog::runUpdatingThread() loadNewEntries(); removeOldEntries(); + + if (connection_loss) + tryFinalizeUnknownStateTransactions(); } catch (const Coordination::Exception &) { @@ -314,6 +319,32 @@ void TransactionLog::removeOldEntries() tid_to_csn.erase(tid_hash); } +void TransactionLog::tryFinalizeUnknownStateTransactions() +{ + /// We just recovered connection to [Zoo]Keeper. + /// Check if transactions in unknown state were actually committed or not and finalize or rollback them. + UnknownStateList list; + { + std::lock_guard lock{running_list_mutex}; + std::swap(list, unknown_state_list); + } + + for (auto & [txn, state_guard] : list) + { + /// CSNs must be already loaded, only need to check if the corresponding mapping exists. + if (auto csn = getCSN(txn->tid)) + { + finalizeCommittedTransaction(txn, csn); + } + else + { + assertTIDIsNotOutdated(txn->tid); + state_guard = {}; + rollbackTransaction(txn->shared_from_this()); + } + } +} + CSN TransactionLog::getLatestSnapshot() const { return latest_snapshot.load(); @@ -342,55 +373,90 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction() CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) { /// Some precommit checks, may throw - auto committing_lock = txn->beforeCommit(); + auto state_guard = txn->beforeCommit(); - CSN new_csn; + CSN allocated_csn = Tx::UnknownCSN; if (txn->isReadOnly()) { /// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp LOG_TEST(log, "Closing readonly transaction {}", txn->tid); - new_csn = txn->snapshot; - tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn); } else { LOG_TEST(log, "Committing transaction {}", txn->dumpDescription()); - /// TODO handle connection loss /// TODO support batching auto current_zookeeper = getZooKeeper(); - String path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point - NOEXCEPT_SCOPE; + String csn_path_created; + try + { + /// Commit point + csn_path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential); + } + catch (const Coordination::Exception & e) + { + if (!Coordination::isHardwareError(e.code)) + throw; + /// We don't know if transaction has been actually committed or not. + /// The only thing we can do is to postpone its finalization. + { + std::lock_guard lock{running_list_mutex}; + unknown_state_list.emplace_back(txn.get(), std::move(state_guard)); + } + log_updated_event->set(); + throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message()); + } + + NOEXCEPT_SCOPE; /// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use /// (overflow is possible in a several weeks/months of active usage) - new_csn = deserializeCSN(path_created.substr(zookeeper_path_log.size() + 1)); + allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1)); + } - LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn); - tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn); + return finalizeCommittedTransaction(txn.get(), allocated_csn); +} + +CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn) noexcept +{ + chassert(!allocated_csn == txn->isReadOnly()); + if (allocated_csn) + { + LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, allocated_csn); + tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn); /// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes /// TODO it's optional, add a setting for this auto current_latest_snapshot = latest_snapshot.load(); - while (current_latest_snapshot < new_csn && !stop_flag) + while (current_latest_snapshot < allocated_csn && !stop_flag) { latest_snapshot.wait(current_latest_snapshot); current_latest_snapshot = latest_snapshot.load(); } } + else + { + /// Transaction was readonly + allocated_csn = txn->snapshot; + tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn); + } /// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept. - txn->afterCommit(new_csn); + txn->afterCommit(allocated_csn); { /// Finally we can remove transaction from the list and release the snapshot + MergeTreeTransactionPtr txn_ptr; std::lock_guard lock{running_list_mutex}; + snapshots_in_use.erase(txn->snapshot_in_use_it); bool removed = running_list.erase(txn->tid.getHash()); if (!removed) - throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid); - snapshots_in_use.erase(txn->snapshot_in_use_it); + { + LOG_ERROR(log , "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid); + abort(); + } } - return new_csn; + return allocated_csn; } void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept @@ -400,8 +466,8 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no if (!txn->rollback()) { - /// Transaction was cancelled concurrently, it's already rolled back. - chassert(txn->csn == Tx::RolledBackCSN); + /// Transaction was cancelled or committed concurrently + chassert(txn->csn != Tx::UnknownCSN); return; } diff --git a/src/Interpreters/TransactionLog.h b/src/Interpreters/TransactionLog.h index 86584a74c68..69a9c9c7b75 100644 --- a/src/Interpreters/TransactionLog.h +++ b/src/Interpreters/TransactionLog.h @@ -127,6 +127,10 @@ private: void loadNewEntries(); void removeOldEntries(); + CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn) noexcept; + + void tryFinalizeUnknownStateTransactions(); + static UInt64 deserializeCSN(const String & csn_node_name); static String serializeCSN(CSN csn); static TransactionID deserializeTID(const String & csn_node_content); @@ -159,6 +163,9 @@ private: mutable std::mutex running_list_mutex; /// Transactions that are currently processed TransactionsList running_list; + /// If we lost connection on attempt to create csn- node then we don't know transaction's state. + using UnknownStateList = std::vector>; + UnknownStateList unknown_state_list; /// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup. std::list snapshots_in_use; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 3c03bea3dd1..186c8c30cfa 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -444,9 +444,10 @@ static std::tuple executeQueryImpl( if (auto txn = context->getCurrentTransaction()) { - assert(txn->getState() != MergeTreeTransaction::COMMITTED); + chassert(txn->getState() != MergeTreeTransaction::COMMITTING); + chassert(txn->getState() != MergeTreeTransaction::COMMITTED); if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as() && !ast->as()) - throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query: transaction is rolled back"); + throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query because current transaction failed. Expecting ROLLBACK statement."); } /// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter), diff --git a/src/Storages/System/StorageSystemTransactions.cpp b/src/Storages/System/StorageSystemTransactions.cpp index 396fc875f74..21fa72ea12a 100644 --- a/src/Storages/System/StorageSystemTransactions.cpp +++ b/src/Storages/System/StorageSystemTransactions.cpp @@ -15,6 +15,7 @@ static DataTypePtr getStateEnumType() DataTypeEnum8::Values { {"RUNNING", static_cast(MergeTreeTransaction::State::RUNNING)}, + {"COMMITTING", static_cast(MergeTreeTransaction::State::COMMITTING)}, {"COMMITTED", static_cast(MergeTreeTransaction::State::COMMITTED)}, {"ROLLED_BACK", static_cast(MergeTreeTransaction::State::ROLLED_BACK)}, }); From 44f2d4529a1ef19b5f692ab3d72d397eae89dda7 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 May 2022 22:08:46 +0200 Subject: [PATCH 3/7] better waiting, add fault injection --- src/Core/Settings.h | 1 + src/Core/SettingsEnums.cpp | 5 ++ src/Core/SettingsEnums.h | 9 +++ .../InterpreterTransactionControlQuery.cpp | 27 +++++++- .../InterpreterTransactionControlQuery.h | 2 +- src/Interpreters/MergeTreeTransaction.cpp | 11 +++ src/Interpreters/MergeTreeTransaction.h | 2 + .../MergeTreeTransactionHolder.cpp | 2 +- src/Interpreters/TransactionLog.cpp | 67 +++++++++++++++---- src/Interpreters/TransactionLog.h | 13 +++- .../TransactionVersionMetadata.cpp | 4 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 7 +- src/Storages/System/StorageSystemParts.cpp | 3 + tests/config/config.d/transactions.xml | 8 +++ .../0_stateless/01133_begin_commit_race.sh | 12 ++-- .../01172_transaction_counters.sql | 8 ++- 16 files changed, 154 insertions(+), 27 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 29427c673ac..6a166f53bf9 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -590,6 +590,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \ M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \ M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \ + M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \ M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \ M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \ // End of COMMON_SETTINGS diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index a37c1e9be86..bff1971bad9 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -131,6 +131,11 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS {"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE}, {"disable", ShortCircuitFunctionEvaluation::DISABLE}}) +IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS, + {{"async", TransactionsWaitCSNMode::ASYNC}, + {"wait", TransactionsWaitCSNMode::WAIT}, + {"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}}) + IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS, {{"by_names", FormatSettings::EnumComparingMode::BY_NAMES}, {"by_values", FormatSettings::EnumComparingMode::BY_VALUES}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 08091da6d6c..83a65f2a320 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -183,6 +183,15 @@ enum class ShortCircuitFunctionEvaluation DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation) +enum class TransactionsWaitCSNMode +{ + ASYNC, + WAIT, + WAIT_UNKNOWN, +}; + +DECLARE_SETTING_ENUM(TransactionsWaitCSNMode) + DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode) DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule) diff --git a/src/Interpreters/InterpreterTransactionControlQuery.cpp b/src/Interpreters/InterpreterTransactionControlQuery.cpp index 148c6e93919..bdcc351c32b 100644 --- a/src/Interpreters/InterpreterTransactionControlQuery.cpp +++ b/src/Interpreters/InterpreterTransactionControlQuery.cpp @@ -56,17 +56,40 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess if (txn->getState() != MergeTreeTransaction::RUNNING) throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state"); + TransactionsWaitCSNMode mode = query_context->getSettingsRef().wait_changes_become_visible_after_commit_mode; + CSN csn; try { - TransactionLog::instance().commitTransaction(txn); + csn = TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ mode != TransactionsWaitCSNMode::WAIT_UNKNOWN); } catch (const Exception & e) { - /// Detach transaction from current context if connection was lost and its status is unknown if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION) + { + /// Detach transaction from current context if connection was lost and its status is unknown session_context->setCurrentTransaction(NO_TRANSACTION_PTR); + } throw; } + + if (csn == Tx::CommittingCSN) + { + chassert(mode == TransactionsWaitCSNMode::WAIT_UNKNOWN); + + /// Try to wait for connection to be restored and its status to be loaded. + /// It's useful for testing. It allows to enable fault injection (after commit) without breaking tests. + txn->waitStateChange(Tx::CommittingCSN); + + if (txn->getState() == MergeTreeTransaction::ROLLED_BACK) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid); + if (txn->getState() != MergeTreeTransaction::COMMITTED) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction {} has invalid state {}", txn->tid, txn->getState()); + } + + /// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes + if (mode != TransactionsWaitCSNMode::ASYNC) + TransactionLog::instance().waitForCSNLoaded(csn); + session_context->setCurrentTransaction(NO_TRANSACTION_PTR); return {}; } diff --git a/src/Interpreters/InterpreterTransactionControlQuery.h b/src/Interpreters/InterpreterTransactionControlQuery.h index 05d3068e095..bf2dc7891a7 100644 --- a/src/Interpreters/InterpreterTransactionControlQuery.h +++ b/src/Interpreters/InterpreterTransactionControlQuery.h @@ -22,7 +22,7 @@ public: private: BlockIO executeBegin(ContextMutablePtr session_context); - static BlockIO executeCommit(ContextMutablePtr session_context); + BlockIO executeCommit(ContextMutablePtr session_context); static BlockIO executeRollback(ContextMutablePtr session_context); static BlockIO executeSetSnapshot(ContextMutablePtr session_context, UInt64 snapshot); diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index 0607279ac68..11287f5de97 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -47,6 +47,17 @@ MergeTreeTransaction::State MergeTreeTransaction::getState() const return COMMITTED; } +bool MergeTreeTransaction::waitStateChange(CSN expected_state_csn) const +{ + CSN current_value = expected_state_csn; + while (current_value == expected_state_csn && !TransactionLog::instance().isShuttingDown()) + { + csn.wait(current_value); + current_value = csn.load(); + } + return current_value != expected_state_csn; +} + void MergeTreeTransaction::checkIsNotCancelled() const { CSN c = csn.load(); diff --git a/src/Interpreters/MergeTreeTransaction.h b/src/Interpreters/MergeTreeTransaction.h index 6c34e8a8388..f466262cb2e 100644 --- a/src/Interpreters/MergeTreeTransaction.h +++ b/src/Interpreters/MergeTreeTransaction.h @@ -56,6 +56,8 @@ public: Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); } + bool waitStateChange(CSN expected_state_csn) const; + private: scope_guard beforeCommit(); void afterCommit(CSN assigned_csn) noexcept; diff --git a/src/Interpreters/MergeTreeTransactionHolder.cpp b/src/Interpreters/MergeTreeTransactionHolder.cpp index bf63a471282..2944fb78b76 100644 --- a/src/Interpreters/MergeTreeTransactionHolder.cpp +++ b/src/Interpreters/MergeTreeTransactionHolder.cpp @@ -53,7 +53,7 @@ void MergeTreeTransactionHolder::onDestroy() noexcept { try { - TransactionLog::instance().commitTransaction(txn); + TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ false); return; } catch (...) diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index c1ecc9e0f21..6fe89a2b80e 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -53,6 +53,8 @@ TransactionLog::TransactionLog() zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn"); zookeeper_path_log = zookeeper_path + "/log"; + fault_probability_before_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0); + fault_probability_after_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0); loadLogFromZooKeeper(); @@ -214,7 +216,10 @@ void TransactionLog::runUpdatingThread() { try { - log_updated_event->wait(); + /// Do not wait if we have some transactions to finalize + if (!unknown_state_list_loaded.empty()) + log_updated_event->wait(); + if (stop_flag.load()) return; @@ -229,7 +234,7 @@ void TransactionLog::runUpdatingThread() loadNewEntries(); removeOldEntries(); - if (connection_loss) + if (connection_loss || fault_probability_before_commit || fault_probability_after_commit) tryFinalizeUnknownStateTransactions(); } catch (const Coordination::Exception &) @@ -325,8 +330,22 @@ void TransactionLog::tryFinalizeUnknownStateTransactions() /// Check if transactions in unknown state were actually committed or not and finalize or rollback them. UnknownStateList list; { + /// We must be sure that the corresponding CSN entry is loaded from ZK. + /// Otherwise we may accidentally rollback committed transaction in case of race condition like this: + /// - runUpdatingThread: loaded some entries, ready to call tryFinalizeUnknownStateTransactions() + /// - commitTransaction: creates CSN entry in the log (txn is committed) + /// - [session expires] + /// - commitTransaction: catches Coordination::Exception (maybe due to fault injection), appends txn to unknown_state_list + /// - runUpdatingThread: calls tryFinalizeUnknownStateTransactions(), fails to find CSN for this txn, rolls it back + /// So all CSN entries that might exist at the moment of appending txn to unknown_state_list + /// must be loaded from ZK before we start finalize that txn. + /// That's why we use two lists here: + /// 1. At first we put txn into unknown_state_list + /// 2. We move it to unknown_state_list_loaded when runUpdatingThread done at least one iteration + /// 3. Then we can safely finalize txns from unknown_state_list_loaded, because all required entries are loaded std::lock_guard lock{running_list_mutex}; std::swap(list, unknown_state_list); + std::swap(list, unknown_state_list_loaded); } for (auto & [txn, state_guard] : list) @@ -370,7 +389,7 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction() return txn; } -CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) +CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status) { /// Some precommit checks, may throw auto state_guard = txn->beforeCommit(); @@ -389,8 +408,22 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) String csn_path_created; try { + if (unlikely(fault_probability_before_commit)) + { + std::bernoulli_distribution fault(fault_probability_before_commit); + if (fault(thread_local_rng)) + throw Coordination::Exception("Fault injected (before commit)", Coordination::Error::ZCONNECTIONLOSS); + } + /// Commit point csn_path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential); + + if (unlikely(fault_probability_after_commit)) + { + std::bernoulli_distribution fault(fault_probability_after_commit); + if (fault(thread_local_rng)) + throw Coordination::Exception("Fault injected (after commit)", Coordination::Error::ZCONNECTIONLOSS); + } } catch (const Coordination::Exception & e) { @@ -404,7 +437,13 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) unknown_state_list.emplace_back(txn.get(), std::move(state_guard)); } log_updated_event->set(); - throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message()); + if (throw_on_unknown_status) + throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION, + "Connection lost on attempt to commit transaction {}, will finalize it later: {}", + txn->tid, e.message()); + + LOG_INFO(log, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message()); + return Tx::CommittingCSN; } NOEXCEPT_SCOPE; @@ -423,15 +462,6 @@ CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN { LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, allocated_csn); tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn); - - /// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes - /// TODO it's optional, add a setting for this - auto current_latest_snapshot = latest_snapshot.load(); - while (current_latest_snapshot < allocated_csn && !stop_flag) - { - latest_snapshot.wait(current_latest_snapshot); - current_latest_snapshot = latest_snapshot.load(); - } } else { @@ -459,6 +489,17 @@ CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN return allocated_csn; } +bool TransactionLog::waitForCSNLoaded(CSN csn) const +{ + auto current_latest_snapshot = latest_snapshot.load(); + while (current_latest_snapshot < csn && !stop_flag) + { + latest_snapshot.wait(current_latest_snapshot); + current_latest_snapshot = latest_snapshot.load(); + } + return csn <= current_latest_snapshot; +} + void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept { LOG_TRACE(log, "Rolling back transaction {}{}", txn->tid, diff --git a/src/Interpreters/TransactionLog.h b/src/Interpreters/TransactionLog.h index 69a9c9c7b75..25892f77bd7 100644 --- a/src/Interpreters/TransactionLog.h +++ b/src/Interpreters/TransactionLog.h @@ -97,7 +97,8 @@ public: /// Tries to commit transaction. Returns Commit Sequence Number. /// Throw if transaction was concurrently killed or if some precommit check failed. /// May throw if ZK connection is lost. Transaction status is unknown in this case. - CSN commitTransaction(const MergeTreeTransactionPtr & txn); + /// Returns CommittingCSN if throw_on_unknown_status is false and connection was lost. + CSN commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status); /// Releases locks that that were acquired by transaction, releases snapshot, removes transaction from the list of active transactions. /// Normally it should not throw, but if it does for some reason (global memory limit exceeded, disk failure, etc) @@ -119,6 +120,12 @@ public: /// Returns copy of list of running transactions. TransactionsList getTransactionsList() const; + /// Waits for provided CSN (and all previous ones) to be loaded from the log. + /// Returns false if waiting was interrupted (e.g. by shutdown) + bool waitForCSNLoaded(CSN csn) const; + + bool isShuttingDown() const { return stop_flag.load(); } + private: void loadLogFromZooKeeper(); void runUpdatingThread(); @@ -166,6 +173,7 @@ private: /// If we lost connection on attempt to create csn- node then we don't know transaction's state. using UnknownStateList = std::vector>; UnknownStateList unknown_state_list; + UnknownStateList unknown_state_list_loaded; /// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup. std::list snapshots_in_use; @@ -182,6 +190,9 @@ private: std::atomic_bool stop_flag = false; ThreadFromGlobalPool updating_thread; + + Float64 fault_probability_before_commit = 0; + Float64 fault_probability_after_commit = 0; }; template diff --git a/src/Interpreters/TransactionVersionMetadata.cpp b/src/Interpreters/TransactionVersionMetadata.cpp index fd75faaf206..36a4fb9cc5b 100644 --- a/src/Interpreters/TransactionVersionMetadata.cpp +++ b/src/Interpreters/TransactionVersionMetadata.cpp @@ -391,7 +391,9 @@ void VersionMetadata::read(ReadBuffer & buf) { /// NOTE Metadata file may actually contain multiple creation TIDs, we need the last one. removal_tid = TransactionID::read(buf); - if (!removal_tid.isEmpty()) + if (removal_tid.isEmpty()) + removal_tid_lock = 0; + else removal_tid_lock = removal_tid.getHash(); } else if (name == REMOVAL_CSN_STR) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 2ddca8dce26..40fba34cd03 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1437,7 +1437,9 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const bool valid_removal_tid = version.removal_tid == file.removal_tid || version.removal_tid == Tx::PrehistoricTID; bool valid_creation_csn = version.creation_csn == file.creation_csn || version.creation_csn == Tx::RolledBackCSN; bool valid_removal_csn = version.removal_csn == file.removal_csn || version.removal_csn == Tx::PrehistoricCSN; - if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn) + bool valid_removal_tid_lock = (version.removal_tid.isEmpty() && version.removal_tid_lock == 0) + || (version.removal_tid_lock == version.removal_tid.getHash()); + if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn || !valid_removal_tid_lock) throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid version metadata file"); return true; } @@ -1445,7 +1447,8 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const { WriteBufferFromOwnString expected; version.write(expected); - tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}", version_file_name, content, expected.str())); + tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}\nlock: {}", + version_file_name, content, expected.str(), version.removal_tid_lock)); return false; } } diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 6674de06c07..a8edb8dd78b 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -85,6 +85,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) {"visible", std::make_shared()}, {"creation_tid", getTransactionIDDataType()}, + {"removal_tid_lock", std::make_shared()}, {"removal_tid", getTransactionIDDataType()}, {"creation_csn", std::make_shared()}, {"removal_csn", std::make_shared()}, @@ -295,6 +296,8 @@ void StorageSystemParts::processNextStorage( if (columns_mask[src_index++]) columns[res_index++]->insert(get_tid_as_field(part->version.creation_tid)); + if (columns_mask[src_index++]) + columns[res_index++]->insert(part->version.removal_tid_lock.load(std::memory_order_relaxed)); if (columns_mask[src_index++]) columns[res_index++]->insert(get_tid_as_field(part->version.getRemovalTID())); if (columns_mask[src_index++]) diff --git a/tests/config/config.d/transactions.xml b/tests/config/config.d/transactions.xml index 19810986ea1..9948b1f1865 100644 --- a/tests/config/config.d/transactions.xml +++ b/tests/config/config.d/transactions.xml @@ -10,4 +10,12 @@ 7500 + + /test/clickhouse/txn + + 0.0 + + + 0.01 + diff --git a/tests/queries/0_stateless/01133_begin_commit_race.sh b/tests/queries/0_stateless/01133_begin_commit_race.sh index 29e7ef423a1..f64570950c7 100755 --- a/tests/queries/0_stateless/01133_begin_commit_race.sh +++ b/tests/queries/0_stateless/01133_begin_commit_race.sh @@ -14,23 +14,25 @@ $CLICKHOUSE_CLIENT --query "CREATE TABLE mt (n Int64) ENGINE=MergeTree ORDER BY function begin_commit_readonly() { $CLICKHOUSE_CLIENT --multiquery --query " + SET wait_changes_become_visible_after_commit_mode='wait'; BEGIN TRANSACTION; - COMMIT;"; + COMMIT;" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_STATUS_OF_TRANSACTION } function begin_rollback_readonly() { - $CLICKHOUSE_CLIENT --multiquery --query " + $CLICKHOUSE_CLIENT --wait_changes_become_visible_after_commit_mode=wait_unknown --multiquery --query " BEGIN TRANSACTION; - ROLLBACK;"; + SET TRANSACTION SNAPSHOT 42; + ROLLBACK;" } function begin_insert_commit() { - $CLICKHOUSE_CLIENT --multiquery --query " + $CLICKHOUSE_CLIENT --wait_changes_become_visible_after_commit_mode=async --multiquery --query " BEGIN TRANSACTION; INSERT INTO mt VALUES ($RANDOM); - COMMIT;"; + COMMIT;" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_STATUS_OF_TRANSACTION } function introspection() diff --git a/tests/queries/0_stateless/01172_transaction_counters.sql b/tests/queries/0_stateless/01172_transaction_counters.sql index 5431673fd62..b84a7b25c47 100644 --- a/tests/queries/0_stateless/01172_transaction_counters.sql +++ b/tests/queries/0_stateless/01172_transaction_counters.sql @@ -42,7 +42,13 @@ rollback; system flush logs; select indexOf((select arraySort(groupUniqArray(tid)) from system.transactions_info_log where database=currentDatabase() and table='txn_counters'), tid), - (toDecimal64(now64(6), 6) - toDecimal64(event_time, 6)) < 100, type, thread_id!=0, length(query_id)=length(queryID()), tid_hash!=0, csn=0, part + (toDecimal64(now64(6), 6) - toDecimal64(event_time, 6)) < 100, + type, + thread_id!=0, + length(query_id)=length(queryID()) or type='Commit' and query_id='', -- ignore fault injection after commit + tid_hash!=0, + csn=0, + part from system.transactions_info_log where tid in (select tid from system.transactions_info_log where database=currentDatabase() and table='txn_counters' and not (tid.1=1 and tid.2=1)) or (database=currentDatabase() and table='txn_counters') order by event_time; From d4974ddaf0dcb44dcef23e585010e46e60ef8637 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 May 2022 22:59:55 +0200 Subject: [PATCH 4/7] fix test --- .../queries/0_stateless/02117_show_create_table_system.reference | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index ad18e38adcc..d4ada9ba5c8 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -485,6 +485,7 @@ CREATE TABLE system.parts `projections` Array(String), `visible` UInt8, `creation_tid` Tuple(UInt64, UInt64, UUID), + `removal_tid_lock` UInt64, `removal_tid` Tuple(UInt64, UInt64, UUID), `creation_csn` UInt64, `removal_csn` UInt64, From 1b453f517d2263c4b431865fcbf0d11313d61324 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Sat, 21 May 2022 02:32:35 +0200 Subject: [PATCH 5/7] fix --- src/Interpreters/TransactionLog.cpp | 8 ++++---- src/Interpreters/TransactionLog.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index 6fe89a2b80e..699190e2d6f 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -353,7 +353,7 @@ void TransactionLog::tryFinalizeUnknownStateTransactions() /// CSNs must be already loaded, only need to check if the corresponding mapping exists. if (auto csn = getCSN(txn->tid)) { - finalizeCommittedTransaction(txn, csn); + finalizeCommittedTransaction(txn, csn, state_guard); } else { @@ -452,10 +452,10 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1)); } - return finalizeCommittedTransaction(txn.get(), allocated_csn); + return finalizeCommittedTransaction(txn.get(), allocated_csn, state_guard); } -CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn) noexcept +CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept { chassert(!allocated_csn == txn->isReadOnly()); if (allocated_csn) @@ -472,10 +472,10 @@ CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN /// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept. txn->afterCommit(allocated_csn); + state_guard = {}; { /// Finally we can remove transaction from the list and release the snapshot - MergeTreeTransactionPtr txn_ptr; std::lock_guard lock{running_list_mutex}; snapshots_in_use.erase(txn->snapshot_in_use_it); bool removed = running_list.erase(txn->tid.getHash()); diff --git a/src/Interpreters/TransactionLog.h b/src/Interpreters/TransactionLog.h index 25892f77bd7..a0268ce9b88 100644 --- a/src/Interpreters/TransactionLog.h +++ b/src/Interpreters/TransactionLog.h @@ -134,7 +134,7 @@ private: void loadNewEntries(); void removeOldEntries(); - CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn) noexcept; + CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept; void tryFinalizeUnknownStateTransactions(); From a262492cc1007128c71b47d7d7b6d7275ba49846 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 May 2022 20:53:33 +0200 Subject: [PATCH 6/7] slightly better --- .../InterpreterTransactionControlQuery.cpp | 2 ++ src/Interpreters/MergeTreeTransaction.cpp | 8 ++++---- src/Interpreters/MergeTreeTransaction.h | 5 ++++- src/Interpreters/TransactionLog.cpp | 13 +++++++++++-- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/Interpreters/InterpreterTransactionControlQuery.cpp b/src/Interpreters/InterpreterTransactionControlQuery.cpp index bdcc351c32b..1e4868788ba 100644 --- a/src/Interpreters/InterpreterTransactionControlQuery.cpp +++ b/src/Interpreters/InterpreterTransactionControlQuery.cpp @@ -84,6 +84,8 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid); if (txn->getState() != MergeTreeTransaction::COMMITTED) throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction {} has invalid state {}", txn->tid, txn->getState()); + + csn = txn->getCSN(); } /// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index 11287f5de97..cab40f3c6db 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -47,15 +47,15 @@ MergeTreeTransaction::State MergeTreeTransaction::getState() const return COMMITTED; } -bool MergeTreeTransaction::waitStateChange(CSN expected_state_csn) const +bool MergeTreeTransaction::waitStateChange(CSN current_state_csn) const { - CSN current_value = expected_state_csn; - while (current_value == expected_state_csn && !TransactionLog::instance().isShuttingDown()) + CSN current_value = current_state_csn; + while (current_value == current_state_csn && !TransactionLog::instance().isShuttingDown()) { csn.wait(current_value); current_value = csn.load(); } - return current_value != expected_state_csn; + return current_value != current_state_csn; } void MergeTreeTransaction::checkIsNotCancelled() const diff --git a/src/Interpreters/MergeTreeTransaction.h b/src/Interpreters/MergeTreeTransaction.h index f466262cb2e..309b8e3eeff 100644 --- a/src/Interpreters/MergeTreeTransaction.h +++ b/src/Interpreters/MergeTreeTransaction.h @@ -56,7 +56,10 @@ public: Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); } - bool waitStateChange(CSN expected_state_csn) const; + /// Waits for transaction state to become not equal to the state corresponding to current_state_csn + bool waitStateChange(CSN current_state_csn) const; + + CSN getCSN() const { return csn; } private: scope_guard beforeCommit(); diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index 699190e2d6f..7b141ef4219 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -227,8 +227,15 @@ void TransactionLog::runUpdatingThread() if (connection_loss) { auto new_zookeeper = global_context->getZooKeeper(); - std::lock_guard lock{mutex}; - zookeeper = new_zookeeper; + { + std::lock_guard lock{mutex}; + zookeeper = new_zookeeper; + } + + /// It's possible that we connected to different [Zoo]Keeper instance + /// so we may read a bit stale state. Run some writing request before loading log entries + /// to make that instance up-to-date. + zookeeper->set(zookeeper_path_log, ""); } loadNewEntries(); @@ -446,6 +453,8 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool return Tx::CommittingCSN; } + /// Do not allow exceptions between commit point and the and of transaction finalization + /// (otherwise it may stuck in COMMITTING state holding snapshot). NOEXCEPT_SCOPE; /// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use /// (overflow is possible in a several weeks/months of active usage) From fa21121f77ce2250b568441177a0608c488b1d2e Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 May 2022 21:17:52 +0200 Subject: [PATCH 7/7] fix --- src/Interpreters/TransactionLog.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index 7b141ef4219..4f0e79297b8 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -240,9 +240,7 @@ void TransactionLog::runUpdatingThread() loadNewEntries(); removeOldEntries(); - - if (connection_loss || fault_probability_before_commit || fault_probability_after_commit) - tryFinalizeUnknownStateTransactions(); + tryFinalizeUnknownStateTransactions(); } catch (const Coordination::Exception &) {