2021-03-31 17:55:04 +00:00
|
|
|
#include <Interpreters/TransactionLog.h>
|
2021-11-08 18:56:09 +00:00
|
|
|
#include <Interpreters/TransactionVersionMetadata.h>
|
2021-04-08 17:20:45 +00:00
|
|
|
#include <Common/Exception.h>
|
2021-11-08 18:56:09 +00:00
|
|
|
#include <Core/ServerUUID.h>
|
2021-11-06 18:08:36 +00:00
|
|
|
#include <base/logger_useful.h>
|
2021-03-31 17:55:04 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
TransactionLog & TransactionLog::instance()
|
|
|
|
{
|
|
|
|
static TransactionLog inst;
|
|
|
|
return inst;
|
|
|
|
}
|
|
|
|
|
|
|
|
TransactionLog::TransactionLog()
|
2021-05-18 17:07:29 +00:00
|
|
|
: log(&Poco::Logger::get("TransactionLog"))
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
2021-11-08 18:56:09 +00:00
|
|
|
latest_snapshot = Tx::MaxReservedCSN;
|
|
|
|
csn_counter = Tx::MaxReservedCSN;
|
|
|
|
local_tid_counter = Tx::MaxReservedLocalTID;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Snapshot TransactionLog::getLatestSnapshot() const
|
|
|
|
{
|
2021-04-08 17:20:45 +00:00
|
|
|
return latest_snapshot.load();
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeTransactionPtr TransactionLog::beginTransaction()
|
|
|
|
{
|
2021-06-04 09:26:47 +00:00
|
|
|
MergeTreeTransactionPtr txn;
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
2021-06-04 09:26:47 +00:00
|
|
|
Snapshot snapshot = latest_snapshot.load();
|
|
|
|
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
|
2021-11-08 18:56:09 +00:00
|
|
|
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get());
|
2021-06-04 09:26:47 +00:00
|
|
|
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
|
2021-04-08 17:20:45 +00:00
|
|
|
if (!inserted)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2021-06-08 18:17:18 +00:00
|
|
|
LOG_TRACE(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
|
2021-04-08 17:20:45 +00:00
|
|
|
return txn;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
|
|
|
|
{
|
2021-04-08 17:20:45 +00:00
|
|
|
txn->beforeCommit();
|
|
|
|
|
|
|
|
CSN new_csn;
|
2021-03-31 17:55:04 +00:00
|
|
|
/// TODO Transactions: reset local_tid_counter
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn->isReadOnly())
|
|
|
|
{
|
2021-05-18 17:07:29 +00:00
|
|
|
LOG_TRACE(log, "Closing readonly transaction {}", txn->tid);
|
2021-04-08 17:20:45 +00:00
|
|
|
new_csn = txn->snapshot;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-05-18 17:07:29 +00:00
|
|
|
LOG_TRACE(log, "Committing transaction {}{}", txn->tid, txn->dumpDescription());
|
2021-04-08 17:20:45 +00:00
|
|
|
std::lock_guard lock{commit_mutex};
|
|
|
|
new_csn = 1 + csn_counter.fetch_add(1);
|
|
|
|
bool inserted = tid_to_csn.try_emplace(txn->tid.getHash(), new_csn).second; /// Commit point
|
|
|
|
if (!inserted)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
|
|
|
|
latest_snapshot.store(new_csn, std::memory_order_relaxed);
|
|
|
|
}
|
|
|
|
|
2021-05-18 17:07:29 +00:00
|
|
|
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
txn->afterCommit(new_csn);
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
bool removed = running_list.erase(txn->tid.getHash());
|
|
|
|
if (!removed)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
|
2021-06-04 09:26:47 +00:00
|
|
|
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2021-04-09 12:53:51 +00:00
|
|
|
return new_csn;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
2021-05-18 17:07:29 +00:00
|
|
|
LOG_TRACE(log, "Rolling back transaction {}", txn->tid);
|
2021-12-14 20:06:34 +00:00
|
|
|
if (txn->rollback())
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
bool removed = running_list.erase(txn->tid.getHash());
|
|
|
|
if (!removed)
|
2021-04-09 12:53:51 +00:00
|
|
|
abort();
|
2021-06-04 09:26:47 +00:00
|
|
|
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash & tid)
|
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
auto it = running_list.find(tid);
|
|
|
|
if (it == running_list.end())
|
|
|
|
return nullptr;
|
|
|
|
return it->second;
|
|
|
|
}
|
|
|
|
|
|
|
|
CSN TransactionLog::getCSN(const TransactionID & tid) const
|
|
|
|
{
|
|
|
|
return getCSN(tid.getHash());
|
|
|
|
}
|
|
|
|
|
|
|
|
CSN TransactionLog::getCSN(const TIDHash & tid) const
|
|
|
|
{
|
|
|
|
assert(tid);
|
|
|
|
assert(tid != Tx::EmptyTID.getHash());
|
|
|
|
if (tid == Tx::PrehistoricTID.getHash())
|
|
|
|
return Tx::PrehistoricCSN;
|
|
|
|
|
|
|
|
std::lock_guard lock{commit_mutex};
|
|
|
|
auto it = tid_to_csn.find(tid);
|
|
|
|
if (it == tid_to_csn.end())
|
|
|
|
return Tx::UnknownCSN;
|
|
|
|
return it->second;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
Snapshot TransactionLog::getOldestSnapshot() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
if (snapshots_in_use.empty())
|
|
|
|
return getLatestSnapshot();
|
|
|
|
return snapshots_in_use.front();
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|