2021-03-31 17:55:04 +00:00
|
|
|
#include <Interpreters/TransactionLog.h>
|
2021-11-08 18:56:09 +00:00
|
|
|
#include <Interpreters/TransactionVersionMetadata.h>
|
2021-12-28 11:23:35 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2022-01-14 14:03:00 +00:00
|
|
|
#include <Interpreters/TransactionsInfoLog.h>
|
2021-12-28 11:23:35 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <IO/ReadBufferFromString.h>
|
2021-04-08 17:20:45 +00:00
|
|
|
#include <Common/Exception.h>
|
2021-12-28 11:23:35 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
2021-11-08 18:56:09 +00:00
|
|
|
#include <Core/ServerUUID.h>
|
2021-11-06 18:08:36 +00:00
|
|
|
#include <base/logger_useful.h>
|
2021-03-31 17:55:04 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2022-01-14 14:03:00 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void tryWriteEventToSystemLog(Poco::Logger * log, ContextPtr context,
|
|
|
|
TransactionsInfoLogElement::Type type, const TransactionID & tid, CSN csn = Tx::UnknownCSN)
|
|
|
|
try
|
|
|
|
{
|
|
|
|
auto system_log = context->getTransactionsInfoLog();
|
|
|
|
if (!system_log)
|
|
|
|
return;
|
|
|
|
|
|
|
|
TransactionsInfoLogElement elem;
|
|
|
|
elem.type = type;
|
|
|
|
elem.tid = tid;
|
|
|
|
elem.csn = csn;
|
|
|
|
elem.fillCommonFields(nullptr);
|
|
|
|
system_log->add(elem);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
|
|
|
|
TransactionLog::TransactionLog()
|
2021-05-18 17:07:29 +00:00
|
|
|
: log(&Poco::Logger::get("TransactionLog"))
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
2021-12-28 11:23:35 +00:00
|
|
|
global_context = Context::getGlobalContextInstance();
|
2022-01-31 22:27:55 +00:00
|
|
|
global_context->checkTransactionsAreAllowed();
|
|
|
|
|
2021-12-28 11:23:35 +00:00
|
|
|
zookeeper_path = "/test/clickhouse/txn_log";
|
|
|
|
|
|
|
|
loadLogFromZooKeeper();
|
|
|
|
|
|
|
|
updating_thread = ThreadFromGlobalPool(&TransactionLog::runUpdatingThread, this);
|
|
|
|
}
|
|
|
|
|
|
|
|
TransactionLog::~TransactionLog()
|
|
|
|
{
|
2022-01-19 18:29:31 +00:00
|
|
|
shutdown();
|
|
|
|
}
|
|
|
|
|
|
|
|
void TransactionLog::shutdown()
|
|
|
|
{
|
|
|
|
if (stop_flag.exchange(true))
|
|
|
|
return;
|
2021-12-28 11:23:35 +00:00
|
|
|
log_updated_event->set();
|
2022-01-19 18:29:31 +00:00
|
|
|
latest_snapshot.notify_all();
|
2021-12-28 11:23:35 +00:00
|
|
|
updating_thread.join();
|
2022-01-20 18:15:23 +00:00
|
|
|
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
/// This is required to... you'll never guess - avoid race condition inside Poco::Logger (Coordination::ZooKeeper::log)
|
|
|
|
zookeeper.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeperPtr TransactionLog::getZooKeeper() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
return zookeeper;
|
2021-12-28 11:23:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
UInt64 TransactionLog::parseCSN(const String & csn_node_name)
|
|
|
|
{
|
|
|
|
ReadBufferFromString buf{csn_node_name};
|
|
|
|
assertString("csn-", buf);
|
|
|
|
UInt64 res;
|
|
|
|
readText(res, buf);
|
|
|
|
assertEOF(buf);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
TransactionID TransactionLog::parseTID(const String & csn_node_content)
|
|
|
|
{
|
|
|
|
TransactionID tid = Tx::EmptyTID;
|
|
|
|
if (csn_node_content.empty())
|
|
|
|
return tid;
|
|
|
|
|
|
|
|
ReadBufferFromString buf{csn_node_content};
|
2021-12-30 13:15:28 +00:00
|
|
|
tid = TransactionID::read(buf);
|
2021-12-28 11:23:35 +00:00
|
|
|
assertEOF(buf);
|
|
|
|
return tid;
|
|
|
|
}
|
|
|
|
|
|
|
|
String TransactionLog::writeTID(const TransactionID & tid)
|
|
|
|
{
|
|
|
|
WriteBufferFromOwnString buf;
|
2021-12-30 13:15:28 +00:00
|
|
|
TransactionID::write(tid, buf);
|
2021-12-28 11:23:35 +00:00
|
|
|
return buf.str();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
|
|
|
|
{
|
|
|
|
std::vector<std::future<Coordination::GetResponse>> futures;
|
|
|
|
size_t entries_count = std::distance(beg, end);
|
|
|
|
if (!entries_count)
|
|
|
|
return;
|
|
|
|
|
|
|
|
String last_entry = *std::prev(end);
|
|
|
|
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path, *beg, last_entry);
|
|
|
|
futures.reserve(entries_count);
|
|
|
|
for (auto it = beg; it != end; ++it)
|
|
|
|
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path) / *it));
|
|
|
|
|
|
|
|
std::vector<std::pair<TIDHash, CSN>> loaded;
|
|
|
|
loaded.reserve(entries_count);
|
|
|
|
auto it = beg;
|
|
|
|
for (size_t i = 0; i < entries_count; ++i, ++it)
|
|
|
|
{
|
|
|
|
auto res = futures[i].get();
|
|
|
|
CSN csn = parseCSN(*it);
|
|
|
|
TransactionID tid = parseTID(res.data);
|
|
|
|
loaded.emplace_back(tid.getHash(), csn);
|
|
|
|
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
|
|
|
|
}
|
|
|
|
futures.clear();
|
|
|
|
|
|
|
|
/// Use noexcept here to exit on unexpected exceptions (SIGABRT is better that broken state in memory)
|
|
|
|
auto insert = [&]() noexcept
|
|
|
|
{
|
|
|
|
for (const auto & entry : loaded)
|
|
|
|
if (entry.first != Tx::EmptyTID.getHash())
|
|
|
|
tid_to_csn.emplace(entry.first, entry.second);
|
|
|
|
last_loaded_entry = last_entry;
|
|
|
|
latest_snapshot = loaded.back().second;
|
|
|
|
};
|
|
|
|
|
2022-01-19 20:16:05 +00:00
|
|
|
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
|
2022-01-20 18:15:23 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2021-12-28 11:23:35 +00:00
|
|
|
insert();
|
|
|
|
}
|
|
|
|
|
|
|
|
void TransactionLog::loadLogFromZooKeeper()
|
|
|
|
{
|
|
|
|
assert(!zookeeper);
|
|
|
|
assert(tid_to_csn.empty());
|
|
|
|
assert(last_loaded_entry.empty());
|
|
|
|
zookeeper = global_context->getZooKeeper();
|
|
|
|
|
|
|
|
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
|
|
|
|
/// Create empty entry to allocate new CSN to safely start counting from the beginning and avoid TID duplication.
|
|
|
|
/// TODO It's possible to skip this step in come cases (especially for multi-host configuration).
|
|
|
|
Coordination::Error code = zookeeper->tryCreate(zookeeper_path + "/csn-", "", zkutil::CreateMode::PersistentSequential);
|
|
|
|
if (code != Coordination::Error::ZOK)
|
|
|
|
{
|
|
|
|
assert(code == Coordination::Error::ZNONODE);
|
|
|
|
zookeeper->createAncestors(zookeeper_path);
|
|
|
|
Coordination::Requests ops;
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
|
|
|
|
for (size_t i = 0; i <= Tx::MaxReservedCSN; ++i)
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/csn-", "", zkutil::CreateMode::PersistentSequential));
|
|
|
|
Coordination::Responses res;
|
|
|
|
code = zookeeper->tryMulti(ops, res);
|
|
|
|
if (code != Coordination::Error::ZNODEEXISTS)
|
|
|
|
zkutil::KeeperMultiException::check(code, ops, res);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO Split log into "subdirectories" to:
|
|
|
|
/// 1. fetch it more optimal way (avoid listing all CSNs on further incremental updates)
|
|
|
|
/// 2. simplify log rotation
|
|
|
|
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
|
|
|
|
Strings entries_list = zookeeper->getChildren(zookeeper_path, nullptr, log_updated_event);
|
|
|
|
assert(!entries_list.empty());
|
|
|
|
std::sort(entries_list.begin(), entries_list.end());
|
|
|
|
loadEntries(entries_list.begin(), entries_list.end());
|
|
|
|
assert(!last_loaded_entry.empty());
|
|
|
|
assert(latest_snapshot == parseCSN(last_loaded_entry));
|
2021-11-08 18:56:09 +00:00
|
|
|
local_tid_counter = Tx::MaxReservedLocalTID;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
2021-12-28 11:23:35 +00:00
|
|
|
void TransactionLog::runUpdatingThread()
|
|
|
|
{
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
log_updated_event->wait();
|
|
|
|
if (stop_flag.load())
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (!zookeeper)
|
2022-01-20 18:15:23 +00:00
|
|
|
{
|
|
|
|
auto new_zookeeper = global_context->getZooKeeper();
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
zookeeper = new_zookeeper;
|
|
|
|
}
|
2021-12-28 11:23:35 +00:00
|
|
|
|
|
|
|
loadNewEntries();
|
|
|
|
}
|
|
|
|
catch (const Coordination::Exception & e)
|
|
|
|
{
|
2022-02-03 18:57:09 +00:00
|
|
|
tryLogCurrentException(log);
|
2021-12-28 11:23:35 +00:00
|
|
|
/// TODO better backoff
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
|
|
if (Coordination::isHardwareError(e.code))
|
2022-01-20 18:15:23 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
zookeeper.reset();
|
|
|
|
}
|
2021-12-28 11:23:35 +00:00
|
|
|
log_updated_event->set();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2022-02-03 18:57:09 +00:00
|
|
|
tryLogCurrentException(log);
|
2021-12-28 11:23:35 +00:00
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
|
|
log_updated_event->set();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void TransactionLog::loadNewEntries()
|
|
|
|
{
|
|
|
|
Strings entries_list = zookeeper->getChildren(zookeeper_path, nullptr, log_updated_event);
|
|
|
|
assert(!entries_list.empty());
|
|
|
|
std::sort(entries_list.begin(), entries_list.end());
|
|
|
|
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
|
|
|
|
loadEntries(it, entries_list.end());
|
|
|
|
assert(last_loaded_entry == entries_list.back());
|
|
|
|
assert(latest_snapshot == parseCSN(last_loaded_entry));
|
|
|
|
latest_snapshot.notify_all();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
Snapshot TransactionLog::getLatestSnapshot() const
|
|
|
|
{
|
2021-04-08 17:20:45 +00:00
|
|
|
return latest_snapshot.load();
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeTransactionPtr TransactionLog::beginTransaction()
|
|
|
|
{
|
2021-06-04 09:26:47 +00:00
|
|
|
MergeTreeTransactionPtr txn;
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
2021-06-04 09:26:47 +00:00
|
|
|
Snapshot snapshot = latest_snapshot.load();
|
|
|
|
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
|
2021-11-08 18:56:09 +00:00
|
|
|
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get());
|
2021-06-04 09:26:47 +00:00
|
|
|
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
|
2021-04-08 17:20:45 +00:00
|
|
|
if (!inserted)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2022-01-14 14:03:00 +00:00
|
|
|
|
|
|
|
LOG_TEST(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
|
|
|
|
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::BEGIN, txn->tid);
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
return txn;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
|
|
|
|
{
|
2021-04-08 17:20:45 +00:00
|
|
|
txn->beforeCommit();
|
|
|
|
|
|
|
|
CSN new_csn;
|
2021-03-31 17:55:04 +00:00
|
|
|
/// TODO Transactions: reset local_tid_counter
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn->isReadOnly())
|
|
|
|
{
|
2022-01-14 14:03:00 +00:00
|
|
|
LOG_TEST(log, "Closing readonly transaction {}", txn->tid);
|
2021-04-08 17:20:45 +00:00
|
|
|
new_csn = txn->snapshot;
|
2022-01-14 14:03:00 +00:00
|
|
|
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-12-28 11:23:35 +00:00
|
|
|
LOG_TEST(log, "Committing transaction {}{}", txn->tid, txn->dumpDescription());
|
|
|
|
/// TODO handle connection loss
|
|
|
|
/// TODO support batching
|
2022-01-20 18:15:23 +00:00
|
|
|
auto current_zookeeper = getZooKeeper();
|
|
|
|
String path_created = current_zookeeper->create(zookeeper_path + "/csn-", writeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point
|
2021-12-28 11:23:35 +00:00
|
|
|
new_csn = parseCSN(path_created.substr(zookeeper_path.size() + 1));
|
2022-01-14 14:03:00 +00:00
|
|
|
|
2021-12-28 11:23:35 +00:00
|
|
|
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
|
2022-01-14 14:03:00 +00:00
|
|
|
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
|
2021-04-08 17:20:45 +00:00
|
|
|
|
2021-12-28 11:23:35 +00:00
|
|
|
/// Wait for committed changes to become actually visible, so the next transaction will see changes
|
|
|
|
/// TODO it's optional, add a setting for this
|
|
|
|
auto current_latest_snapshot = latest_snapshot.load();
|
2022-01-19 18:29:31 +00:00
|
|
|
while (current_latest_snapshot < new_csn && !stop_flag)
|
2021-12-28 11:23:35 +00:00
|
|
|
{
|
|
|
|
latest_snapshot.wait(current_latest_snapshot);
|
|
|
|
current_latest_snapshot = latest_snapshot.load();
|
|
|
|
}
|
|
|
|
}
|
2021-05-18 17:07:29 +00:00
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
txn->afterCommit(new_csn);
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
bool removed = running_list.erase(txn->tid.getHash());
|
|
|
|
if (!removed)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
|
2021-06-04 09:26:47 +00:00
|
|
|
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2022-01-14 14:03:00 +00:00
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
return new_csn;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
2022-02-04 18:18:20 +00:00
|
|
|
LOG_TRACE(log, "Rolling back transaction {}{}", txn->tid,
|
|
|
|
std::uncaught_exceptions() ? fmt::format(" due to uncaught exception (code: {})", getCurrentExceptionCode()) : "");
|
|
|
|
|
2022-01-14 14:03:00 +00:00
|
|
|
if (!txn->rollback())
|
|
|
|
return;
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
bool removed = running_list.erase(txn->tid.getHash());
|
|
|
|
if (!removed)
|
2021-04-09 12:53:51 +00:00
|
|
|
abort();
|
2021-06-04 09:26:47 +00:00
|
|
|
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2022-01-14 14:03:00 +00:00
|
|
|
|
|
|
|
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::ROLLBACK, txn->tid);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash & tid)
|
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
auto it = running_list.find(tid);
|
|
|
|
if (it == running_list.end())
|
|
|
|
return nullptr;
|
|
|
|
return it->second;
|
|
|
|
}
|
|
|
|
|
2022-01-31 22:27:55 +00:00
|
|
|
CSN TransactionLog::getCSN(const TransactionID & tid)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
return getCSN(tid.getHash());
|
|
|
|
}
|
|
|
|
|
2022-01-31 22:27:55 +00:00
|
|
|
CSN TransactionLog::getCSN(const TIDHash & tid)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2022-01-31 22:27:55 +00:00
|
|
|
/// Avoid creation of the instance if transactions are not actually involved
|
2021-04-08 17:20:45 +00:00
|
|
|
if (tid == Tx::PrehistoricTID.getHash())
|
|
|
|
return Tx::PrehistoricCSN;
|
|
|
|
|
2022-01-31 22:27:55 +00:00
|
|
|
return instance().getCSNImpl(tid);
|
|
|
|
}
|
|
|
|
|
|
|
|
CSN TransactionLog::getCSNImpl(const TIDHash & tid) const
|
|
|
|
{
|
|
|
|
assert(tid);
|
|
|
|
assert(tid != Tx::EmptyTID.getHash());
|
|
|
|
|
2022-01-20 18:15:23 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2021-04-08 17:20:45 +00:00
|
|
|
auto it = tid_to_csn.find(tid);
|
|
|
|
if (it == tid_to_csn.end())
|
|
|
|
return Tx::UnknownCSN;
|
|
|
|
return it->second;
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
Snapshot TransactionLog::getOldestSnapshot() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock{running_list_mutex};
|
|
|
|
if (snapshots_in_use.empty())
|
|
|
|
return getLatestSnapshot();
|
|
|
|
return snapshots_in_use.front();
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|