ClickHouse/src/Interpreters/TransactionLog.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

606 lines
22 KiB
C++
Raw Normal View History

2021-03-31 17:55:04 +00:00
#include <Interpreters/TransactionLog.h>
2021-11-08 18:56:09 +00:00
#include <Interpreters/TransactionVersionMetadata.h>
2021-12-28 11:23:35 +00:00
#include <Interpreters/Context.h>
2022-01-14 14:03:00 +00:00
#include <Interpreters/TransactionsInfoLog.h>
2021-12-28 11:23:35 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
2021-04-08 17:20:45 +00:00
#include <Common/Exception.h>
2021-12-28 11:23:35 +00:00
#include <Common/ZooKeeper/KeeperException.h>
2021-11-08 18:56:09 +00:00
#include <Core/ServerUUID.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2022-06-16 17:41:32 +00:00
#include <Common/noexcept_scope.h>
2021-03-31 17:55:04 +00:00
2022-03-14 20:43:34 +00:00
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-04-08 17:20:45 +00:00
namespace ErrorCodes
{
2022-01-14 14:03:00 +00:00
extern const int LOGICAL_ERROR;
2022-05-20 15:35:29 +00:00
extern const int UNKNOWN_STATUS_OF_TRANSACTION;
2022-01-14 14:03:00 +00:00
}
static void tryWriteEventToSystemLog(Poco::Logger * log, ContextPtr context,
TransactionsInfoLogElement::Type type, const TransactionID & tid, CSN csn = Tx::UnknownCSN)
try
{
auto system_log = context->getTransactionsInfoLog();
if (!system_log)
return;
TransactionsInfoLogElement elem;
elem.type = type;
elem.tid = tid;
elem.csn = csn;
elem.fillCommonFields(nullptr);
system_log->add(elem);
}
catch (...)
{
tryLogCurrentException(log);
2021-04-08 17:20:45 +00:00
}
2021-03-31 17:55:04 +00:00
TransactionLog::TransactionLog()
2022-06-27 20:48:27 +00:00
: global_context(Context::getGlobalContextInstance())
, log(&Poco::Logger::get("TransactionLog"))
, zookeeper_path(global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn"))
, zookeeper_path_log(zookeeper_path + "/log")
, fault_probability_before_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0))
, fault_probability_after_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0))
2021-03-31 17:55:04 +00:00
{
2021-12-28 11:23:35 +00:00
loadLogFromZooKeeper();
updating_thread = ThreadFromGlobalPool(&TransactionLog::runUpdatingThread, this);
}
TransactionLog::~TransactionLog()
{
2022-01-19 18:29:31 +00:00
shutdown();
}
void TransactionLog::shutdown()
{
if (stop_flag.exchange(true))
return;
2021-12-28 11:23:35 +00:00
log_updated_event->set();
2022-01-19 18:29:31 +00:00
latest_snapshot.notify_all();
2021-12-28 11:23:35 +00:00
updating_thread.join();
2022-01-20 18:15:23 +00:00
std::lock_guard lock{mutex};
/// This is required to... you'll never guess - avoid race condition inside Poco::Logger (Coordination::ZooKeeper::log)
zookeeper.reset();
}
ZooKeeperPtr TransactionLog::getZooKeeper() const
{
std::lock_guard lock{mutex};
return zookeeper;
2021-12-28 11:23:35 +00:00
}
2022-03-16 19:16:26 +00:00
UInt64 TransactionLog::deserializeCSN(const String & csn_node_name)
2021-12-28 11:23:35 +00:00
{
ReadBufferFromString buf{csn_node_name};
assertString("csn-", buf);
UInt64 res;
readText(res, buf);
assertEOF(buf);
return res;
}
2022-03-16 19:16:26 +00:00
String TransactionLog::serializeCSN(CSN csn)
2022-03-14 20:43:34 +00:00
{
return zkutil::getSequentialNodeName("csn-", csn);
}
2022-03-16 19:16:26 +00:00
TransactionID TransactionLog::deserializeTID(const String & csn_node_content)
2021-12-28 11:23:35 +00:00
{
TransactionID tid = Tx::EmptyTID;
if (csn_node_content.empty())
return tid;
ReadBufferFromString buf{csn_node_content};
2021-12-30 13:15:28 +00:00
tid = TransactionID::read(buf);
2021-12-28 11:23:35 +00:00
assertEOF(buf);
return tid;
}
2022-03-16 19:16:26 +00:00
String TransactionLog::serializeTID(const TransactionID & tid)
2021-12-28 11:23:35 +00:00
{
WriteBufferFromOwnString buf;
2021-12-30 13:15:28 +00:00
TransactionID::write(tid, buf);
2021-12-28 11:23:35 +00:00
return buf.str();
}
2022-09-26 09:31:27 +00:00
2022-09-29 09:19:47 +00:00
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
{
2022-09-26 09:31:27 +00:00
size_t entries_count = std::distance(beg, end);
2022-09-29 09:19:47 +00:00
if (!entries_count)
return;
2022-09-26 09:07:34 +00:00
2022-09-29 09:19:47 +00:00
String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
std::vector<std::string> entry_paths;
entry_paths.reserve(entries_count);
for (auto it = beg; it != end; ++it)
entry_paths.emplace_back(fs::path(zookeeper_path_log) / *it);
2022-09-26 09:07:34 +00:00
2022-09-29 09:19:47 +00:00
auto entries = TSA_READ_ONE_THREAD(zookeeper)->get(entry_paths);
2022-09-26 09:31:27 +00:00
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
2022-09-29 09:19:47 +00:00
auto it = beg;
2022-09-26 09:31:27 +00:00
for (size_t i = 0; i < entries_count; ++i, ++it)
2022-09-26 09:07:34 +00:00
{
2022-09-29 09:19:47 +00:00
auto res = entries[i];
2022-09-26 09:31:27 +00:00
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
2022-09-26 09:07:34 +00:00
}
NOEXCEPT_SCOPE_STRICT({
2022-05-20 10:41:44 +00:00
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
2022-03-14 20:43:34 +00:00
2022-05-20 10:41:44 +00:00
tid_to_csn.emplace(entry.first, entry.second);
}
last_loaded_entry = last_entry;
});
2022-05-20 10:41:44 +00:00
{
std::lock_guard lock{running_list_mutex};
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
2022-03-14 20:43:34 +00:00
}
2021-12-28 11:23:35 +00:00
}
void TransactionLog::loadLogFromZooKeeper()
{
2022-05-20 10:41:44 +00:00
chassert(!zookeeper);
chassert(tid_to_csn.empty());
chassert(last_loaded_entry.empty());
2021-12-28 11:23:35 +00:00
zookeeper = global_context->getZooKeeper();
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
/// Create empty entry to allocate new CSN to safely start counting from the beginning and avoid TID duplication.
/// TODO It's possible to skip this step in come cases (especially for multi-host configuration).
2022-03-14 20:43:34 +00:00
Coordination::Error code = zookeeper->tryCreate(zookeeper_path_log + "/csn-", "", zkutil::CreateMode::PersistentSequential);
2021-12-28 11:23:35 +00:00
if (code != Coordination::Error::ZOK)
{
2022-03-14 20:43:34 +00:00
/// Log probably does not exist, create it
2022-05-20 10:41:44 +00:00
chassert(code == Coordination::Error::ZNONODE);
2022-03-14 20:43:34 +00:00
zookeeper->createAncestors(zookeeper_path_log);
2021-12-28 11:23:35 +00:00
Coordination::Requests ops;
2022-03-16 19:16:26 +00:00
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", serializeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent));
2022-03-14 20:43:34 +00:00
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path_log, "", zkutil::CreateMode::Persistent));
/// Fast-forward sequential counter to skip reserved CSNs
2021-12-28 11:23:35 +00:00
for (size_t i = 0; i <= Tx::MaxReservedCSN; ++i)
2022-03-14 20:43:34 +00:00
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path_log + "/csn-", "", zkutil::CreateMode::PersistentSequential));
2021-12-28 11:23:35 +00:00
Coordination::Responses res;
code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZNODEEXISTS)
zkutil::KeeperMultiException::check(code, ops, res);
}
/// TODO Split log into "subdirectories" to:
/// 1. fetch it more optimal way (avoid listing all CSNs on further incremental updates)
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
2022-03-14 20:43:34 +00:00
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
2022-05-20 10:41:44 +00:00
chassert(!entries_list.empty());
2022-06-13 13:31:08 +00:00
::sort(entries_list.begin(), entries_list.end());
2021-12-28 11:23:35 +00:00
loadEntries(entries_list.begin(), entries_list.end());
2022-05-20 10:41:44 +00:00
chassert(!last_loaded_entry.empty());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
2021-11-08 18:56:09 +00:00
local_tid_counter = Tx::MaxReservedLocalTID;
2022-03-14 20:43:34 +00:00
2022-03-16 19:16:26 +00:00
tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr"));
2021-03-31 17:55:04 +00:00
}
2021-12-28 11:23:35 +00:00
void TransactionLog::runUpdatingThread()
{
while (true)
{
try
{
2022-05-20 20:08:46 +00:00
/// Do not wait if we have some transactions to finalize
2022-06-28 14:25:29 +00:00
if (TSA_READ_ONE_THREAD(unknown_state_list_loaded).empty())
2022-05-20 20:08:46 +00:00
log_updated_event->wait();
2021-12-28 11:23:35 +00:00
if (stop_flag.load())
return;
2022-05-20 15:35:29 +00:00
bool connection_loss = getZooKeeper()->expired();
if (connection_loss)
2022-01-20 18:15:23 +00:00
{
auto new_zookeeper = global_context->getZooKeeper();
2022-05-23 18:53:33 +00:00
{
std::lock_guard lock{mutex};
zookeeper = new_zookeeper;
}
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
2022-06-28 14:25:29 +00:00
TSA_READ_ONE_THREAD(zookeeper)->sync(zookeeper_path_log);
2022-01-20 18:15:23 +00:00
}
2021-12-28 11:23:35 +00:00
loadNewEntries();
2022-03-14 20:43:34 +00:00
removeOldEntries();
2022-05-23 19:17:52 +00:00
tryFinalizeUnknownStateTransactions();
2021-12-28 11:23:35 +00:00
}
2022-04-12 09:39:21 +00:00
catch (const Coordination::Exception &)
2021-12-28 11:23:35 +00:00
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException(log);
2021-12-28 11:23:35 +00:00
/// TODO better backoff
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
log_updated_event->set();
}
catch (...)
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException(log);
2021-12-28 11:23:35 +00:00
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
log_updated_event->set();
}
}
}
void TransactionLog::loadNewEntries()
{
2022-06-28 14:25:29 +00:00
Strings entries_list = TSA_READ_ONE_THREAD(zookeeper)->getChildren(zookeeper_path_log, nullptr, log_updated_event);
2022-05-20 10:41:44 +00:00
chassert(!entries_list.empty());
2022-06-13 13:31:08 +00:00
::sort(entries_list.begin(), entries_list.end());
2022-06-28 14:25:29 +00:00
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), TSA_READ_ONE_THREAD(last_loaded_entry));
2021-12-28 11:23:35 +00:00
loadEntries(it, entries_list.end());
2022-06-28 14:25:29 +00:00
chassert(TSA_READ_ONE_THREAD(last_loaded_entry) == entries_list.back());
chassert(latest_snapshot == deserializeCSN(TSA_READ_ONE_THREAD(last_loaded_entry)));
2021-12-28 11:23:35 +00:00
latest_snapshot.notify_all();
}
2022-03-14 20:43:34 +00:00
void TransactionLog::removeOldEntries()
{
/// Try to update tail pointer. It's (almost) safe to set it to the oldest snapshot
/// because if a transaction released snapshot, then CSN is already written into metadata.
/// Why almost? Because on server startup we do not have the oldest snapshot (it's simply equal to the latest one),
/// but it's possible that some CSNs are not written into data parts (and we will write them during startup).
if (!global_context->isServerCompletelyStarted())
return;
/// Also similar problem is possible if some table was not attached during startup (for example, if table is detached permanently).
/// Also we write CSNs into data parts without fsync, so it's theoretically possible that we wrote CSN, finished transaction,
/// removed its entry from the log, but after that server restarts and CSN is not actually saved to metadata on disk.
/// We should store a bit more entries in ZK and keep outdated entries for a while.
/// TODO we will need a bit more complex logic for multiple hosts
Coordination::Stat stat;
2022-06-28 14:25:29 +00:00
CSN old_tail_ptr = deserializeCSN(TSA_READ_ONE_THREAD(zookeeper)->get(zookeeper_path + "/tail_ptr", &stat));
2022-03-14 20:43:34 +00:00
CSN new_tail_ptr = getOldestSnapshot();
if (new_tail_ptr < old_tail_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug", old_tail_ptr, new_tail_ptr);
2022-03-18 13:33:59 +00:00
else if (new_tail_ptr == old_tail_ptr)
return;
2022-03-14 20:43:34 +00:00
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE(log, "Updating tail_ptr from {} to {}", old_tail_ptr, new_tail_ptr);
2022-06-28 14:25:29 +00:00
TSA_READ_ONE_THREAD(zookeeper)->set(zookeeper_path + "/tail_ptr", serializeCSN(new_tail_ptr), stat.version);
2022-03-14 20:43:34 +00:00
tail_ptr.store(new_tail_ptr);
/// Now we can find and remove old entries
TIDMap tids;
{
std::lock_guard lock{mutex};
tids = tid_to_csn;
}
2021-12-28 11:23:35 +00:00
2022-03-14 20:43:34 +00:00
/// TODO support batching
std::vector<TIDHash> removed_entries;
CSN latest_entry_csn = latest_snapshot.load();
for (const auto & elem : tids)
{
/// Definitely not safe to remove
if (new_tail_ptr <= elem.second.tid.start_csn)
continue;
/// Keep at least one node (the latest one we fetched)
if (elem.second.csn == latest_entry_csn)
continue;
LOG_TEST(log, "Removing entry {} -> {}", elem.second.tid, elem.second.csn);
2022-06-28 14:25:29 +00:00
auto code = TSA_READ_ONE_THREAD(zookeeper)->tryRemove(zookeeper_path_log + "/" + serializeCSN(elem.second.csn));
2022-03-14 20:43:34 +00:00
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)
removed_entries.push_back(elem.first);
}
std::lock_guard lock{mutex};
for (const auto & tid_hash : removed_entries)
tid_to_csn.erase(tid_hash);
}
2022-05-20 15:35:29 +00:00
void TransactionLog::tryFinalizeUnknownStateTransactions()
{
/// We just recovered connection to [Zoo]Keeper.
/// Check if transactions in unknown state were actually committed or not and finalize or rollback them.
UnknownStateList list;
{
2022-05-20 20:08:46 +00:00
/// We must be sure that the corresponding CSN entry is loaded from ZK.
/// Otherwise we may accidentally rollback committed transaction in case of race condition like this:
/// - runUpdatingThread: loaded some entries, ready to call tryFinalizeUnknownStateTransactions()
/// - commitTransaction: creates CSN entry in the log (txn is committed)
/// - [session expires]
/// - commitTransaction: catches Coordination::Exception (maybe due to fault injection), appends txn to unknown_state_list
/// - runUpdatingThread: calls tryFinalizeUnknownStateTransactions(), fails to find CSN for this txn, rolls it back
/// So all CSN entries that might exist at the moment of appending txn to unknown_state_list
/// must be loaded from ZK before we start finalize that txn.
/// That's why we use two lists here:
/// 1. At first we put txn into unknown_state_list
/// 2. We move it to unknown_state_list_loaded when runUpdatingThread done at least one iteration
/// 3. Then we can safely finalize txns from unknown_state_list_loaded, because all required entries are loaded
2022-05-20 15:35:29 +00:00
std::lock_guard lock{running_list_mutex};
std::swap(list, unknown_state_list);
2022-05-20 20:08:46 +00:00
std::swap(list, unknown_state_list_loaded);
2022-05-20 15:35:29 +00:00
}
for (auto & [txn, state_guard] : list)
{
/// CSNs must be already loaded, only need to check if the corresponding mapping exists.
if (auto csn = getCSN(txn->tid))
{
2022-05-21 00:32:35 +00:00
finalizeCommittedTransaction(txn, csn, state_guard);
2022-05-20 15:35:29 +00:00
}
else
{
assertTIDIsNotOutdated(txn->tid);
state_guard = {};
rollbackTransaction(txn->shared_from_this());
}
}
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog::getLatestSnapshot() const
2021-03-31 17:55:04 +00:00
{
2021-04-08 17:20:45 +00:00
return latest_snapshot.load();
2021-03-31 17:55:04 +00:00
}
MergeTreeTransactionPtr TransactionLog::beginTransaction()
{
2021-06-04 09:26:47 +00:00
MergeTreeTransactionPtr txn;
2021-04-08 17:20:45 +00:00
{
std::lock_guard lock{running_list_mutex};
2022-03-14 20:43:34 +00:00
CSN snapshot = latest_snapshot.load();
2021-06-04 09:26:47 +00:00
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
2022-06-27 20:48:27 +00:00
auto snapshot_lock = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get(), snapshot_lock);
2021-06-04 09:26:47 +00:00
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
2021-04-08 17:20:45 +00:00
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
}
2022-01-14 14:03:00 +00:00
LOG_TEST(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::BEGIN, txn->tid);
2021-04-08 17:20:45 +00:00
return txn;
2021-03-31 17:55:04 +00:00
}
2022-05-20 20:08:46 +00:00
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool throw_on_unknown_status)
2021-03-31 17:55:04 +00:00
{
2022-03-14 20:43:34 +00:00
/// Some precommit checks, may throw
2022-05-20 15:35:29 +00:00
auto state_guard = txn->beforeCommit();
2021-04-08 17:20:45 +00:00
2022-05-20 15:35:29 +00:00
CSN allocated_csn = Tx::UnknownCSN;
2021-04-08 17:20:45 +00:00
if (txn->isReadOnly())
{
2022-03-14 20:43:34 +00:00
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
2022-01-14 14:03:00 +00:00
LOG_TEST(log, "Closing readonly transaction {}", txn->tid);
2021-04-08 17:20:45 +00:00
}
else
{
2022-02-14 19:47:17 +00:00
LOG_TEST(log, "Committing transaction {}", txn->dumpDescription());
2021-12-28 11:23:35 +00:00
/// TODO support batching
2022-01-20 18:15:23 +00:00
auto current_zookeeper = getZooKeeper();
2022-05-20 15:35:29 +00:00
String csn_path_created;
try
{
2022-09-11 01:21:34 +00:00
if (unlikely(fault_probability_before_commit > 0.0))
2022-05-20 20:08:46 +00:00
{
std::bernoulli_distribution fault(fault_probability_before_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (before commit)", Coordination::Error::ZCONNECTIONLOSS);
}
2022-05-20 15:35:29 +00:00
/// Commit point
csn_path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", serializeTID(txn->tid), zkutil::CreateMode::PersistentSequential);
2022-05-20 20:08:46 +00:00
2022-09-11 01:21:34 +00:00
if (unlikely(fault_probability_after_commit > 0.0))
2022-05-20 20:08:46 +00:00
{
std::bernoulli_distribution fault(fault_probability_after_commit);
if (fault(thread_local_rng))
throw Coordination::Exception("Fault injected (after commit)", Coordination::Error::ZCONNECTIONLOSS);
}
2022-05-20 15:35:29 +00:00
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code))
throw;
2022-03-14 20:43:34 +00:00
2022-05-20 15:35:29 +00:00
/// We don't know if transaction has been actually committed or not.
/// The only thing we can do is to postpone its finalization.
{
std::lock_guard lock{running_list_mutex};
unknown_state_list.emplace_back(txn.get(), std::move(state_guard));
}
log_updated_event->set();
2022-05-20 20:08:46 +00:00
if (throw_on_unknown_status)
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION,
"Connection lost on attempt to commit transaction {}, will finalize it later: {}",
txn->tid, e.message());
LOG_INFO(log, "Connection lost on attempt to commit transaction {}, will finalize it later: {}", txn->tid, e.message());
return Tx::CommittingCSN;
2022-05-20 15:35:29 +00:00
}
2022-05-23 18:53:33 +00:00
/// Do not allow exceptions between commit point and the and of transaction finalization
/// (otherwise it may stuck in COMMITTING state holding snapshot).
NOEXCEPT_SCOPE_STRICT({
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1));
});
2022-05-20 15:35:29 +00:00
}
2022-01-14 14:03:00 +00:00
2022-05-21 00:32:35 +00:00
return finalizeCommittedTransaction(txn.get(), allocated_csn, state_guard);
2022-05-20 15:35:29 +00:00
}
2022-05-21 00:32:35 +00:00
CSN TransactionLog::finalizeCommittedTransaction(MergeTreeTransaction * txn, CSN allocated_csn, scope_guard & state_guard) noexcept
2022-05-20 15:35:29 +00:00
{
LockMemoryExceptionInThread memory_tracker_lock(VariableContext::Global);
2022-05-20 15:35:29 +00:00
chassert(!allocated_csn == txn->isReadOnly());
if (allocated_csn)
{
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, allocated_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
2021-12-28 11:23:35 +00:00
}
2022-05-20 15:35:29 +00:00
else
{
/// Transaction was readonly
allocated_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, allocated_csn);
}
2021-05-18 17:07:29 +00:00
2022-03-14 20:43:34 +00:00
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
2022-05-20 15:35:29 +00:00
txn->afterCommit(allocated_csn);
2022-05-21 00:32:35 +00:00
state_guard = {};
2021-04-09 12:53:51 +00:00
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Finally we can remove transaction from the list and release the snapshot
2021-04-08 17:20:45 +00:00
std::lock_guard lock{running_list_mutex};
2022-05-20 15:35:29 +00:00
snapshots_in_use.erase(txn->snapshot_in_use_it);
2021-04-08 17:20:45 +00:00
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
2022-05-20 15:35:29 +00:00
{
LOG_ERROR(log , "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
abort();
}
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
2022-05-20 15:35:29 +00:00
return allocated_csn;
2021-03-31 17:55:04 +00:00
}
2022-05-20 20:08:46 +00:00
bool TransactionLog::waitForCSNLoaded(CSN csn) const
{
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < csn && !stop_flag)
{
latest_snapshot.wait(current_latest_snapshot);
current_latest_snapshot = latest_snapshot.load();
}
return csn <= current_latest_snapshot;
}
2021-04-09 12:53:51 +00:00
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
2021-03-31 17:55:04 +00:00
{
LockMemoryExceptionInThread memory_tracker_lock(VariableContext::Global);
2022-02-04 18:18:20 +00:00
LOG_TRACE(log, "Rolling back transaction {}{}", txn->tid,
std::uncaught_exceptions() ? fmt::format(" due to uncaught exception (code: {})", getCurrentExceptionCode()) : "");
2022-01-14 14:03:00 +00:00
if (!txn->rollback())
2022-03-14 20:43:34 +00:00
{
2022-05-20 15:35:29 +00:00
/// Transaction was cancelled or committed concurrently
chassert(txn->csn != Tx::UnknownCSN);
2022-01-14 14:03:00 +00:00
return;
2022-03-14 20:43:34 +00:00
}
2022-01-14 14:03:00 +00:00
2021-04-08 17:20:45 +00:00
{
std::lock_guard lock{running_list_mutex};
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
2021-04-09 12:53:51 +00:00
abort();
2021-06-04 09:26:47 +00:00
snapshots_in_use.erase(txn->snapshot_in_use_it);
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::ROLLBACK, txn->tid);
2021-04-08 17:20:45 +00:00
}
MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash & tid)
{
std::lock_guard lock{running_list_mutex};
auto it = running_list.find(tid);
if (it == running_list.end())
2022-03-16 19:16:26 +00:00
return NO_TRANSACTION_PTR;
2021-04-08 17:20:45 +00:00
return it->second;
}
CSN TransactionLog::getCSN(const TransactionID & tid)
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Avoid creation of the instance if transactions are not actually involved
if (tid == Tx::PrehistoricTID)
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid.getHash());
2021-04-08 17:20:45 +00:00
}
CSN TransactionLog::getCSN(const TIDHash & tid)
2021-04-08 17:20:45 +00:00
{
/// Avoid creation of the instance if transactions are not actually involved
2021-04-08 17:20:45 +00:00
if (tid == Tx::PrehistoricTID.getHash())
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid);
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
{
2022-05-20 10:41:44 +00:00
chassert(tid_hash);
chassert(tid_hash != Tx::EmptyTID.getHash());
2022-01-20 18:15:23 +00:00
std::lock_guard lock{mutex};
2022-03-14 20:43:34 +00:00
auto it = tid_to_csn.find(tid_hash);
if (it != tid_to_csn.end())
return it->second.csn;
return Tx::UnknownCSN;
}
void TransactionLog::assertTIDIsNotOutdated(const TransactionID & tid)
{
if (tid == Tx::PrehistoricTID)
return;
2022-03-18 11:01:26 +00:00
/// Ensure that we are not trying to get CSN for TID that was already removed from the log
2022-03-14 20:43:34 +00:00
CSN tail = instance().tail_ptr.load();
if (tail <= tid.start_csn)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get CSN for too old TID {}, current tail_ptr is {}, probably it's a bug", tid, tail);
2021-03-31 17:55:04 +00:00
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog::getOldestSnapshot() const
2021-06-04 09:26:47 +00:00
{
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())
return getLatestSnapshot();
2022-05-20 10:41:44 +00:00
chassert(running_list.size() == snapshots_in_use.size());
chassert(snapshots_in_use.size() < 2 || snapshots_in_use.front() <= *++snapshots_in_use.begin());
2021-06-04 09:26:47 +00:00
return snapshots_in_use.front();
}
2022-03-10 21:29:58 +00:00
TransactionLog::TransactionsList TransactionLog::getTransactionsList() const
{
std::lock_guard lock{running_list_mutex};
return running_list;
}
2022-05-25 20:20:13 +00:00
void TransactionLog::sync() const
{
2022-06-27 20:48:27 +00:00
Strings entries_list = getZooKeeper()->getChildren(zookeeper_path_log);
2022-05-25 20:20:13 +00:00
chassert(!entries_list.empty());
2022-06-13 13:31:08 +00:00
::sort(entries_list.begin(), entries_list.end());
2022-05-25 20:20:13 +00:00
CSN newest_csn = deserializeCSN(entries_list.back());
waitForCSNLoaded(newest_csn);
}
2021-03-31 17:55:04 +00:00
}