ClickHouse/src/Interpreters/TransactionLog.cpp

136 lines
3.9 KiB
C++
Raw Normal View History

2021-03-31 17:55:04 +00:00
#include <Interpreters/TransactionLog.h>
2021-04-08 17:20:45 +00:00
#include <Common/TransactionMetadata.h>
#include <Common/Exception.h>
2021-05-18 17:07:29 +00:00
#include <common/logger_useful.h>
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-04-08 17:20:45 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2021-03-31 17:55:04 +00:00
TransactionLog & TransactionLog::instance()
{
static TransactionLog inst;
return inst;
}
TransactionLog::TransactionLog()
2021-05-18 17:07:29 +00:00
: log(&Poco::Logger::get("TransactionLog"))
2021-03-31 17:55:04 +00:00
{
2021-04-08 17:20:45 +00:00
latest_snapshot = 1;
2021-03-31 17:55:04 +00:00
csn_counter = 1;
local_tid_counter = 1;
}
Snapshot TransactionLog::getLatestSnapshot() const
{
2021-04-08 17:20:45 +00:00
return latest_snapshot.load();
2021-03-31 17:55:04 +00:00
}
MergeTreeTransactionPtr TransactionLog::beginTransaction()
{
2021-06-04 09:26:47 +00:00
MergeTreeTransactionPtr txn;
2021-04-08 17:20:45 +00:00
{
std::lock_guard lock{running_list_mutex};
2021-06-04 09:26:47 +00:00
Snapshot snapshot = latest_snapshot.load();
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, UUIDHelpers::Nil);
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
2021-04-08 17:20:45 +00:00
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
2021-06-04 09:26:47 +00:00
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
2021-04-08 17:20:45 +00:00
}
2021-06-08 18:17:18 +00:00
LOG_TRACE(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
2021-04-08 17:20:45 +00:00
return txn;
2021-03-31 17:55:04 +00:00
}
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
{
2021-04-08 17:20:45 +00:00
txn->beforeCommit();
CSN new_csn;
2021-03-31 17:55:04 +00:00
/// TODO Transactions: reset local_tid_counter
2021-04-08 17:20:45 +00:00
if (txn->isReadOnly())
{
2021-05-18 17:07:29 +00:00
LOG_TRACE(log, "Closing readonly transaction {}", txn->tid);
2021-04-08 17:20:45 +00:00
new_csn = txn->snapshot;
}
else
{
2021-05-18 17:07:29 +00:00
LOG_TRACE(log, "Committing transaction {}{}", txn->tid, txn->dumpDescription());
2021-04-08 17:20:45 +00:00
std::lock_guard lock{commit_mutex};
new_csn = 1 + csn_counter.fetch_add(1);
bool inserted = tid_to_csn.try_emplace(txn->tid.getHash(), new_csn).second; /// Commit point
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
latest_snapshot.store(new_csn, std::memory_order_relaxed);
}
2021-05-18 17:07:29 +00:00
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
2021-04-09 12:53:51 +00:00
txn->afterCommit(new_csn);
2021-04-08 17:20:45 +00:00
{
std::lock_guard lock{running_list_mutex};
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
2021-06-04 09:26:47 +00:00
snapshots_in_use.erase(txn->snapshot_in_use_it);
2021-04-08 17:20:45 +00:00
}
2021-04-09 12:53:51 +00:00
return new_csn;
2021-03-31 17:55:04 +00:00
}
2021-04-09 12:53:51 +00:00
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
2021-03-31 17:55:04 +00:00
{
2021-05-18 17:07:29 +00:00
LOG_TRACE(log, "Rolling back transaction {}", txn->tid);
2021-04-09 12:53:51 +00:00
txn->rollback();
2021-04-08 17:20:45 +00:00
{
std::lock_guard lock{running_list_mutex};
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
2021-04-09 12:53:51 +00:00
abort();
2021-06-04 09:26:47 +00:00
snapshots_in_use.erase(txn->snapshot_in_use_it);
2021-04-08 17:20:45 +00:00
}
}
MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash & tid)
{
std::lock_guard lock{running_list_mutex};
auto it = running_list.find(tid);
if (it == running_list.end())
return nullptr;
return it->second;
}
CSN TransactionLog::getCSN(const TransactionID & tid) const
{
return getCSN(tid.getHash());
}
CSN TransactionLog::getCSN(const TIDHash & tid) const
{
assert(tid);
assert(tid != Tx::EmptyTID.getHash());
if (tid == Tx::PrehistoricTID.getHash())
return Tx::PrehistoricCSN;
std::lock_guard lock{commit_mutex};
auto it = tid_to_csn.find(tid);
if (it == tid_to_csn.end())
return Tx::UnknownCSN;
return it->second;
2021-03-31 17:55:04 +00:00
}
2021-06-04 09:26:47 +00:00
Snapshot TransactionLog::getOldestSnapshot() const
{
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())
return getLatestSnapshot();
return snapshots_in_use.front();
}
2021-03-31 17:55:04 +00:00
}