Merge pull request #37398 from ClickHouse/fixes_for_transactions

Fixes for transactions
This commit is contained in:
Alexander Tokmakov 2022-05-24 15:28:01 +03:00 committed by GitHub
commit 229d35408b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 419 additions and 96 deletions

View File

@ -105,6 +105,25 @@
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
#if !defined(ABORT_ON_LOGICAL_ERROR)
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
#endif
/// chassert(x) is similar to assert(x), but:
/// - works in builds with sanitizers, not only in debug builds
/// - tries to print failed assertion into server log
/// It can be used for all assertions except heavy ones.
/// Heavy assertions (that run loops or call complex functions) are allowed in debug builds only.
#if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR)
#define chassert(x) static_cast<bool>(x) ? void(0) : abortOnFailedAssertion(#x)
#else
#define chassert(x) ((void)0)
#endif
#endif
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])

View File

@ -627,6 +627,7 @@
M(656, MEILISEARCH_EXCEPTION) \
M(657, UNSUPPORTED_MEILISEARCH_TYPE) \
M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -35,6 +35,18 @@ namespace ErrorCodes
extern const int CANNOT_MREMAP;
}
void abortOnFailedAssertion(const String & description)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", description);
/// This is to suppress -Wmissing-noreturn
volatile bool always_false = false;
if (always_false)
return;
abort();
}
/// - Aborts the process if error code is LOGICAL_ERROR.
/// - Increments error codes statistics.
void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)
@ -44,8 +56,7 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
#ifdef ABORT_ON_LOGICAL_ERROR
if (code == ErrorCodes::LOGICAL_ERROR)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", msg);
abort();
abortOnFailedAssertion(msg);
}
#endif

View File

@ -12,16 +12,14 @@
#include <fmt/format.h>
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
namespace Poco { class Logger; }
namespace DB
{
void abortOnFailedAssertion(const String & description);
class Exception : public Poco::Exception
{
public:

View File

@ -592,6 +592,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
// End of COMMON_SETTINGS

View File

@ -131,6 +131,11 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},
{"disable", ShortCircuitFunctionEvaluation::DISABLE}})
IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS,
{{"async", TransactionsWaitCSNMode::ASYNC},
{"wait", TransactionsWaitCSNMode::WAIT},
{"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},

View File

@ -183,6 +183,15 @@ enum class ShortCircuitFunctionEvaluation
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
enum class TransactionsWaitCSNMode
{
ASYNC,
WAIT,
WAIT_UNKNOWN,
};
DECLARE_SETTING_ENUM(TransactionsWaitCSNMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

View File

@ -287,7 +287,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
/// The following message is too verbose, but it can be useful to debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={}, "

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_TRANSACTION;
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
}
BlockIO InterpreterTransactionControlQuery::execute()
@ -55,7 +56,42 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess
if (txn->getState() != MergeTreeTransaction::RUNNING)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state");
TransactionLog::instance().commitTransaction(txn);
TransactionsWaitCSNMode mode = query_context->getSettingsRef().wait_changes_become_visible_after_commit_mode;
CSN csn;
try
{
csn = TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ mode != TransactionsWaitCSNMode::WAIT_UNKNOWN);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION)
{
/// Detach transaction from current context if connection was lost and its status is unknown
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
}
throw;
}
if (csn == Tx::CommittingCSN)
{
chassert(mode == TransactionsWaitCSNMode::WAIT_UNKNOWN);
/// Try to wait for connection to be restored and its status to be loaded.
/// It's useful for testing. It allows to enable fault injection (after commit) without breaking tests.
txn->waitStateChange(Tx::CommittingCSN);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid);
if (txn->getState() != MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction {} has invalid state {}", txn->tid, txn->getState());
csn = txn->getCSN();
}
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
if (mode != TransactionsWaitCSNMode::ASYNC)
TransactionLog::instance().waitForCSNLoaded(csn);
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
return {};
}
@ -67,6 +103,8 @@ BlockIO InterpreterTransactionControlQuery::executeRollback(ContextMutablePtr se
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() == MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state");
if (txn->getState() == MergeTreeTransaction::COMMITTING)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTING state");
if (txn->getState() == MergeTreeTransaction::RUNNING)
TransactionLog::instance().rollbackTransaction(txn);

View File

@ -22,7 +22,7 @@ public:
private:
BlockIO executeBegin(ContextMutablePtr session_context);
static BlockIO executeCommit(ContextMutablePtr session_context);
BlockIO executeCommit(ContextMutablePtr session_context);
static BlockIO executeRollback(ContextMutablePtr session_context);
static BlockIO executeSetSnapshot(ContextMutablePtr session_context, UInt64 snapshot);

View File

@ -38,13 +38,26 @@ void MergeTreeTransaction::setSnapshot(CSN new_snapshot)
MergeTreeTransaction::State MergeTreeTransaction::getState() const
{
CSN c = csn.load();
if (c == Tx::UnknownCSN || c == Tx::CommittingCSN)
if (c == Tx::UnknownCSN)
return RUNNING;
if (c == Tx::CommittingCSN)
return COMMITTING;
if (c == Tx::RolledBackCSN)
return ROLLED_BACK;
return COMMITTED;
}
bool MergeTreeTransaction::waitStateChange(CSN current_state_csn) const
{
CSN current_value = current_state_csn;
while (current_value == current_state_csn && !TransactionLog::instance().isShuttingDown())
{
csn.wait(current_value);
current_value = csn.load();
}
return current_value != current_state_csn;
}
void MergeTreeTransaction::checkIsNotCancelled() const
{
CSN c = csn.load();
@ -158,7 +171,7 @@ void MergeTreeTransaction::addMutation(const StoragePtr & table, const String &
bool MergeTreeTransaction::isReadOnly() const
{
std::lock_guard lock{mutex};
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
chassert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
return storages.empty();
}
@ -204,7 +217,7 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
/// and we will be able to remove old entries from transaction log in ZK.
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
assert(prev_value == Tx::CommittingCSN);
chassert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
{
part->version.creation_csn.store(csn);
@ -321,7 +334,7 @@ String MergeTreeTransaction::dumpDescription() const
{
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
}
for (const auto & mutation : mutations)

View File

@ -26,6 +26,7 @@ public:
enum State
{
RUNNING,
COMMITTING,
COMMITTED,
ROLLED_BACK,
};
@ -55,6 +56,11 @@ public:
Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); }
/// Waits for transaction state to become not equal to the state corresponding to current_state_csn
bool waitStateChange(CSN current_state_csn) const;
CSN getCSN() const { return csn; }
private:
scope_guard beforeCommit();
void afterCommit(CSN assigned_csn) noexcept;

View File

@ -53,7 +53,7 @@ void MergeTreeTransactionHolder::onDestroy() noexcept
{
try
{
TransactionLog::instance().commitTransaction(txn);
TransactionLog::instance().commitTransaction(txn, /* throw_on_unknown_status */ false);
return;
}
catch (...)

View File

@ -21,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
}
static void tryWriteEventToSystemLog(Poco::Logger * log, ContextPtr context,
@ -52,6 +53,8 @@ TransactionLog::TransactionLog()
zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn");
zookeeper_path_log = zookeeper_path + "/log";
fault_probability_before_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0);
fault_probability_after_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0);
loadLogFromZooKeeper();
@ -145,24 +148,29 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
NOEXCEPT_SCOPE;
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
tid_to_csn.emplace(entry.first, entry.second);
tid_to_csn.emplace(entry.first, entry.second);
}
last_loaded_entry = last_entry;
}
{
std::lock_guard lock{running_list_mutex};
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
last_loaded_entry = last_entry;
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
void TransactionLog::loadLogFromZooKeeper()
{
assert(!zookeeper);
assert(tid_to_csn.empty());
assert(last_loaded_entry.empty());
chassert(!zookeeper);
chassert(tid_to_csn.empty());
chassert(last_loaded_entry.empty());
zookeeper = global_context->getZooKeeper();
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
@ -172,7 +180,7 @@ void TransactionLog::loadLogFromZooKeeper()
if (code != Coordination::Error::ZOK)
{
/// Log probably does not exist, create it
assert(code == Coordination::Error::ZNONODE);
chassert(code == Coordination::Error::ZNONODE);
zookeeper->createAncestors(zookeeper_path_log);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", serializeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent));
@ -192,11 +200,11 @@ void TransactionLog::loadLogFromZooKeeper()
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
loadEntries(entries_list.begin(), entries_list.end());
assert(!last_loaded_entry.empty());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(!last_loaded_entry.empty());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
local_tid_counter = Tx::MaxReservedLocalTID;
tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr"));
@ -208,19 +216,31 @@ void TransactionLog::runUpdatingThread()
{
try
{
log_updated_event->wait();
/// Do not wait if we have some transactions to finalize
if (!unknown_state_list_loaded.empty())
log_updated_event->wait();
if (stop_flag.load())
return;
if (getZooKeeper()->expired())
bool connection_loss = getZooKeeper()->expired();
if (connection_loss)
{
auto new_zookeeper = global_context->getZooKeeper();
std::lock_guard lock{mutex};
zookeeper = new_zookeeper;
{
std::lock_guard lock{mutex};
zookeeper = new_zookeeper;
}
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state. Run some writing request before loading log entries
/// to make that instance up-to-date.
zookeeper->set(zookeeper_path_log, "");
}
loadNewEntries();
removeOldEntries();
tryFinalizeUnknownStateTransactions();
}
catch (const Coordination::Exception &)
{
@ -241,12 +261,12 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
loadEntries(it, entries_list.end());
assert(last_loaded_entry == entries_list.back());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(last_loaded_entry == entries_list.back());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
latest_snapshot.notify_all();
}
@ -309,6 +329,46 @@ void TransactionLog::removeOldEntries()
tid_to_csn.erase(tid_hash);
}
void TransactionLog::tryFinalizeUnknownStateTransactions()
{
/// We just recovered connection to [Zoo]Keeper.
/// Check if transactions in unknown state were actually committed or not and finalize or rollback them.
UnknownStateList list;
{
/// We must be sure that the corresponding CSN entry is loaded from ZK.
/// Otherwise we may accidentally rollback committed transaction in case of race condition like this:
/// - runUpdatingThread: loaded some entries, ready to call tryFinalizeUnknownStateTransactions()
/// - commitTransaction: creates CSN entry in the log (txn is committed)
/// - [session expires]
/// - commitTransaction: catches Coordination::Exception (maybe due to fault injection), appends txn to unknown_state_list
/// - runUpdatingThread: calls tryFinalizeUnknownStateTransactions(), fails to find CSN for this txn, rolls it back
/// So all CSN entries that might exist at the moment of appending txn to unknown_state_list
/// must be loaded from ZK before we start finalize that txn.
/// That's why we use two lists here:
/// 1. At first we put txn into unknown_state_list
/// 2. We move it to unknown_state_list_loaded when runUpdatingThread done at least one iteration
/// 3. Then we can safely finalize txns from unknown_state_list_loaded, because all required entries are loaded
std::lock_guard lock{running_list_mutex};
std::swap(list, unknown_state_list);
std::swap(list, unknown_state_list_loaded);
}
for (auto & [txn, state_guard] : list)
{
/// CSNs must be already loaded, only need to check if the corresponding mapping exists.
if (auto csn = getCSN(txn->tid))
{
finalizeCommittedTransaction(txn, csn, state_guard);
}
else
{
assertTIDIsNotOutdated(txn->tid);
state_guard = {};
rollbackTransaction(txn->shared_from_this());
}
}
}
CSN TransactionLog::getLatestSnapshot() const
{
return latest_snapshot.load();
@ -334,58 +394,117 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
return txn;
}
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status)
{
/// Some precommit checks, may throw
auto committing_lock = txn->beforeCommit();
auto state_guard = txn->beforeCommit();
CSN new_csn;
CSN allocated_csn = Tx::UnknownCSN;
if (txn->isReadOnly())
{
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
LOG_TEST(log, "Closing readonly transaction {}", txn->tid);
new_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
}
else
{
LOG_TEST(log, "Committing transaction {}", txn->dumpDescription());
/// TODO handle connection loss
/// TODO support batching
auto current_zookeeper = getZooKeeper();
String path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point
NOEXCEPT_SCOPE;
String csn_path_created;
try
{
if (unlikely(fault_probability_before_commit))
{
std::bernoulli_distribution fault(fault_probability_before_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (before commit)", Coordination::Error::ZCONNECTIONLOSS);
}
/// Commit point
csn_path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential);
if (unlikely(fault_probability_after_commit))
{
std::bernoulli_distribution fault(fault_probability_after_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (after commit)", Coordination::Error::ZCONNECTIONLOSS);
}
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code))
throw;
/// We don't know if transaction has been actually committed or not.
/// The only thing we can do is to postpone its finalization.
{
std::lock_guard lock{running_list_mutex};
unknown_state_list.emplace_back(txn.get(), std::move(state_guard));
}
log_updated_event->set();
if (throw_on_unknown_status)
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION,
"Connection lost on attempt to commit transaction {}, will finalize it later: {}",
txn->tid, e.message());
LOG_INFO(log, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message());
return Tx::CommittingCSN;
}
/// Do not allow exceptions between commit point and the and of transaction finalization
/// (otherwise it may stuck in COMMITTING state holding snapshot).
NOEXCEPT_SCOPE;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
new_csn = deserializeCSN(path_created.substr(zookeeper_path_log.size() + 1));
allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1));
}
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
return finalizeCommittedTransaction(txn.get(), allocated_csn, state_guard);
}
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
/// TODO it's optional, add a setting for this
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < new_csn && !stop_flag)
{
latest_snapshot.wait(current_latest_snapshot);
current_latest_snapshot = latest_snapshot.load();
}
CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept
{
chassert(!allocated_csn == txn->isReadOnly());
if (allocated_csn)
{
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, allocated_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
}
else
{
/// Transaction was readonly
allocated_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
}
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
txn->afterCommit(new_csn);
txn->afterCommit(allocated_csn);
state_guard = {};
{
/// Finally we can remove transaction from the list and release the snapshot
std::lock_guard lock{running_list_mutex};
snapshots_in_use.erase(txn->snapshot_in_use_it);
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
snapshots_in_use.erase(txn->snapshot_in_use_it);
{
LOG_ERROR(log , "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
abort();
}
}
return new_csn;
return allocated_csn;
}
bool TransactionLog::waitForCSNLoaded(CSN csn) const
{
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < csn && !stop_flag)
{
latest_snapshot.wait(current_latest_snapshot);
current_latest_snapshot = latest_snapshot.load();
}
return csn <= current_latest_snapshot;
}
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
@ -395,8 +514,8 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
if (!txn->rollback())
{
/// Transaction was cancelled concurrently, it's already rolled back.
assert(txn->csn == Tx::RolledBackCSN);
/// Transaction was cancelled or committed concurrently
chassert(txn->csn != Tx::UnknownCSN);
return;
}
@ -438,8 +557,8 @@ CSN TransactionLog::getCSN(const TIDHash & tid)
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
{
assert(tid_hash);
assert(tid_hash != Tx::EmptyTID.getHash());
chassert(tid_hash);
chassert(tid_hash != Tx::EmptyTID.getHash());
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid_hash);
@ -467,6 +586,8 @@ CSN TransactionLog::getOldestSnapshot() const
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())
return getLatestSnapshot();
chassert(running_list.size() == snapshots_in_use.size());
chassert(snapshots_in_use.size() < 2 || snapshots_in_use.front() <= *++snapshots_in_use.begin());
return snapshots_in_use.front();
}

View File

@ -97,7 +97,8 @@ public:
/// Tries to commit transaction. Returns Commit Sequence Number.
/// Throw if transaction was concurrently killed or if some precommit check failed.
/// May throw if ZK connection is lost. Transaction status is unknown in this case.
CSN commitTransaction(const MergeTreeTransactionPtr & txn);
/// Returns CommittingCSN if throw_on_unknown_status is false and connection was lost.
CSN commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status);
/// Releases locks that that were acquired by transaction, releases snapshot, removes transaction from the list of active transactions.
/// Normally it should not throw, but if it does for some reason (global memory limit exceeded, disk failure, etc)
@ -119,6 +120,12 @@ public:
/// Returns copy of list of running transactions.
TransactionsList getTransactionsList() const;
/// Waits for provided CSN (and all previous ones) to be loaded from the log.
/// Returns false if waiting was interrupted (e.g. by shutdown)
bool waitForCSNLoaded(CSN csn) const;
bool isShuttingDown() const { return stop_flag.load(); }
private:
void loadLogFromZooKeeper();
void runUpdatingThread();
@ -127,6 +134,10 @@ private:
void loadNewEntries();
void removeOldEntries();
CSN finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept;
void tryFinalizeUnknownStateTransactions();
static UInt64 deserializeCSN(const String & csn_node_name);
static String serializeCSN(CSN csn);
static TransactionID deserializeTID(const String & csn_node_content);
@ -159,6 +170,10 @@ private:
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list;
/// If we lost connection on attempt to create csn- node then we don't know transaction's state.
using UnknownStateList = std::vector<std::pair<MergeTreeTransaction *, scope_guard>>;
UnknownStateList unknown_state_list;
UnknownStateList unknown_state_list_loaded;
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use;
@ -175,6 +190,9 @@ private:
std::atomic_bool stop_flag = false;
ThreadFromGlobalPool updating_thread;
Float64 fault_probability_before_commit = 0;
Float64 fault_probability_after_commit = 0;
};
template <typename Derived>

View File

@ -88,8 +88,8 @@ void VersionMetadata::lockRemovalTID(const TransactionID & tid, const Transactio
bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
{
assert(!tid.isEmpty());
assert(!creation_tid.isEmpty());
chassert(!tid.isEmpty());
chassert(!creation_tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash expected_removal_lock_value = 0;
bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value);
@ -115,7 +115,7 @@ bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const Transac
void VersionMetadata::unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
assert(!tid.isEmpty());
chassert(!tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash locked_by = removal_tid_lock.load();
@ -145,7 +145,7 @@ bool VersionMetadata::isRemovalTIDLocked() const
void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context)
{
/// NOTE ReplicatedMergeTreeSink may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
chassert(creation_tid.isEmpty() || creation_tid == tid);
creation_tid = tid;
if (context)
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, *context);
@ -158,7 +158,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
{
assert(!creation_tid.isEmpty());
chassert(!creation_tid.isEmpty());
CSN creation = creation_csn.load(std::memory_order_relaxed);
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN removal = removal_csn.load(std::memory_order_relaxed);
@ -166,10 +166,10 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
[[maybe_unused]] bool had_creation_csn = creation;
[[maybe_unused]] bool had_removal_tid = removal_lock;
[[maybe_unused]] bool had_removal_csn = removal;
assert(!had_removal_csn || had_removal_tid);
assert(!had_removal_csn || had_creation_csn);
assert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
assert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
chassert(!had_removal_csn || had_removal_tid);
chassert(!had_removal_csn || had_creation_csn);
chassert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
chassert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
/// Special snapshot for introspection purposes
if (unlikely(snapshot_version == Tx::EverythingVisibleCSN))
@ -204,8 +204,8 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
/// Data part has creation_tid/removal_tid, but does not have creation_csn/removal_csn.
/// It means that some transaction is creating/removing the part right now or has done it recently
/// and we don't know if it was already committed or not.
assert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
assert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
chassert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
chassert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
/// Before doing CSN lookup, let's check some extra conditions.
/// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid
@ -347,8 +347,8 @@ void VersionMetadata::write(WriteBuffer & buf) const
if (removal_tid_lock)
{
assert(!removal_tid.isEmpty());
assert(removal_tid.getHash() == removal_tid_lock);
chassert(!removal_tid.isEmpty());
chassert(removal_tid.getHash() == removal_tid_lock);
writeRemovalTID(buf);
writeCSN(buf, REMOVAL, /* internal */ true);
}
@ -384,21 +384,23 @@ void VersionMetadata::read(ReadBuffer & buf)
if (name == CREATION_CSN_STR)
{
assert(!creation_csn);
chassert(!creation_csn);
creation_csn = read_csn();
}
else if (name == REMOVAL_TID_STR)
{
/// NOTE Metadata file may actually contain multiple creation TIDs, we need the last one.
removal_tid = TransactionID::read(buf);
if (!removal_tid.isEmpty())
if (removal_tid.isEmpty())
removal_tid_lock = 0;
else
removal_tid_lock = removal_tid.getHash();
}
else if (name == REMOVAL_CSN_STR)
{
if (removal_tid.isEmpty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found removal_csn in metadata file, but removal_tid is {}", removal_tid);
assert(!removal_csn);
chassert(!removal_csn);
removal_csn = read_csn();
}
else

View File

@ -444,9 +444,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto txn = context->getCurrentTransaction())
{
assert(txn->getState() != MergeTreeTransaction::COMMITTED);
chassert(txn->getState() != MergeTreeTransaction::COMMITTING);
chassert(txn->getState() != MergeTreeTransaction::COMMITTED);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as<ASTTransactionControl>() && !ast->as<ASTExplainQuery>())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query: transaction is rolled back");
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query because current transaction failed. Expecting ROLLBACK statement.");
}
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),

View File

@ -1282,12 +1282,12 @@ void IMergeTreeDataPart::storeVersionMetadata() const
void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const
{
assert(!version.creation_tid.isEmpty());
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
chassert(isStoredOnDisk());
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
@ -1303,10 +1303,10 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh
void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
{
assert(!version.creation_tid.isEmpty());
assert(version.removal_csn == 0);
assert(!version.removal_tid.isEmpty());
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(version.removal_csn == 0);
chassert(!version.removal_tid.isEmpty());
chassert(isStoredOnDisk());
if (version.creation_tid.isPrehistoric() && !clear)
{
@ -1437,7 +1437,9 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
bool valid_removal_tid = version.removal_tid == file.removal_tid || version.removal_tid == Tx::PrehistoricTID;
bool valid_creation_csn = version.creation_csn == file.creation_csn || version.creation_csn == Tx::RolledBackCSN;
bool valid_removal_csn = version.removal_csn == file.removal_csn || version.removal_csn == Tx::PrehistoricCSN;
if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn)
bool valid_removal_tid_lock = (version.removal_tid.isEmpty() && version.removal_tid_lock == 0)
|| (version.removal_tid_lock == version.removal_tid.getHash());
if (!valid_creation_tid || !valid_removal_tid || !valid_creation_csn || !valid_removal_csn || !valid_removal_tid_lock)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid version metadata file");
return true;
}
@ -1445,7 +1447,8 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
{
WriteBufferFromOwnString expected;
version.write(expected);
tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}", version_file_name, content, expected.str()));
tryLogCurrentException(storage.log, fmt::format("File {} contains:\n{}\nexpected:\n{}\nlock: {}",
version_file_name, content, expected.str(), version.removal_tid_lock));
return false;
}
}

View File

@ -1364,7 +1364,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Check if CSNs were witten after committing transaction, update and write if needed.
bool version_updated = false;
assert(!version.creation_tid.isEmpty());
chassert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
{
auto min = TransactionLog::getCSN(version.creation_tid);

View File

@ -85,6 +85,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"visible", std::make_shared<DataTypeUInt8>()},
{"creation_tid", getTransactionIDDataType()},
{"removal_tid_lock", std::make_shared<DataTypeUInt64>()},
{"removal_tid", getTransactionIDDataType()},
{"creation_csn", std::make_shared<DataTypeUInt64>()},
{"removal_csn", std::make_shared<DataTypeUInt64>()},
@ -295,6 +296,8 @@ void StorageSystemParts::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->version.creation_tid));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->version.removal_tid_lock.load(std::memory_order_relaxed));
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->version.getRemovalTID()));
if (columns_mask[src_index++])

View File

@ -15,6 +15,7 @@ static DataTypePtr getStateEnumType()
DataTypeEnum8::Values
{
{"RUNNING", static_cast<Int8>(MergeTreeTransaction::State::RUNNING)},
{"COMMITTING", static_cast<Int8>(MergeTreeTransaction::State::COMMITTING)},
{"COMMITTED", static_cast<Int8>(MergeTreeTransaction::State::COMMITTED)},
{"ROLLED_BACK", static_cast<Int8>(MergeTreeTransaction::State::ROLLED_BACK)},
});

View File

@ -10,4 +10,12 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</transactions_info_log>
<transaction_log>
<zookeeper_path>/test/clickhouse/txn</zookeeper_path>
<fault_probability_before_commit>0.0</fault_probability_before_commit>
<!-- Fault injection after commit should not affect tests, because default waiting mode is WAIT_UNKNOWN -->
<fault_probability_after_commit>0.01</fault_probability_after_commit>
</transaction_log>
</yandex>

View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mt";
$CLICKHOUSE_CLIENT --query "CREATE TABLE mt (n Int64) ENGINE=MergeTree ORDER BY n SETTINGS old_parts_lifetime=0";
function begin_commit_readonly()
{
$CLICKHOUSE_CLIENT --multiquery --query "
SET wait_changes_become_visible_after_commit_mode='wait';
BEGIN TRANSACTION;
COMMIT;" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_STATUS_OF_TRANSACTION
}
function begin_rollback_readonly()
{
$CLICKHOUSE_CLIENT --wait_changes_become_visible_after_commit_mode=wait_unknown --multiquery --query "
BEGIN TRANSACTION;
SET TRANSACTION SNAPSHOT 42;
ROLLBACK;"
}
function begin_insert_commit()
{
$CLICKHOUSE_CLIENT --wait_changes_become_visible_after_commit_mode=async --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO mt VALUES ($RANDOM);
COMMIT;" 2>&1| grep -Fa "Exception: " | grep -Fv UNKNOWN_STATUS_OF_TRANSACTION
}
function introspection()
{
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.transactions FORMAT Null"
$CLICKHOUSE_CLIENT -q "SELECT transactionLatestSnapshot(), transactionOldestSnapshot() FORMAT Null"
}
export -f begin_commit_readonly
export -f begin_rollback_readonly
export -f begin_insert_commit
export -f introspection
TIMEOUT=20
clickhouse_client_loop_timeout $TIMEOUT begin_commit_readonly &
clickhouse_client_loop_timeout $TIMEOUT begin_rollback_readonly &
clickhouse_client_loop_timeout $TIMEOUT begin_insert_commit &
clickhouse_client_loop_timeout $TIMEOUT introspection &
wait
$CLICKHOUSE_CLIENT --query "DROP TABLE mt";

View File

@ -42,7 +42,13 @@ rollback;
system flush logs;
select indexOf((select arraySort(groupUniqArray(tid)) from system.transactions_info_log where database=currentDatabase() and table='txn_counters'), tid),
(toDecimal64(now64(6), 6) - toDecimal64(event_time, 6)) < 100, type, thread_id!=0, length(query_id)=length(queryID()), tid_hash!=0, csn=0, part
(toDecimal64(now64(6), 6) - toDecimal64(event_time, 6)) < 100,
type,
thread_id!=0,
length(query_id)=length(queryID()) or type='Commit' and query_id='', -- ignore fault injection after commit
tid_hash!=0,
csn=0,
part
from system.transactions_info_log
where tid in (select tid from system.transactions_info_log where database=currentDatabase() and table='txn_counters' and not (tid.1=1 and tid.2=1))
or (database=currentDatabase() and table='txn_counters') order by event_time;

View File

@ -485,6 +485,7 @@ CREATE TABLE system.parts
`projections` Array(String),
`visible` UInt8,
`creation_tid` Tuple(UInt64, UInt64, UUID),
`removal_tid_lock` UInt64,
`removal_tid` Tuple(UInt64, UInt64, UUID),
`creation_csn` UInt64,
`removal_csn` UInt64,