log cleanup, more comments

This commit is contained in:
Alexander Tokmakov 2022-03-14 21:43:34 +01:00
parent 645629a01b
commit 278d779a01
17 changed files with 417 additions and 152 deletions

View File

@ -1455,6 +1455,8 @@ if (ThreadFuzzer::instance().isEffective())
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
}
global_context->setServerCompletelyStarted();
LOG_INFO(log, "Ready for connections.");
}

View File

@ -11,7 +11,6 @@ class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
class MergeTreeTransaction;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough
using CSN = UInt64;
using Snapshot = CSN;
using LocalTID = UInt64;
@ -21,8 +20,8 @@ namespace Tx
{
const CSN UnknownCSN = 0;
const CSN PrehistoricCSN = 1;
const CSN CommittingCSN = 2; /// TODO do we really need it?
const CSN MaxReservedCSN = 2;
const CSN CommittingCSN = 2;
const CSN MaxReservedCSN = 16;
const LocalTID PrehistoricLocalTID = 1;
const LocalTID DummyLocalTID = 1;
@ -31,10 +30,19 @@ namespace Tx
struct TransactionID
{
/// Global sequential number, the newest commit timestamp the we saw when this transaction began
CSN start_csn = 0;
/// Local sequential that is unique for each transaction started by this host within specific start_csn
LocalTID local_tid = 0;
/// UUID of host that has started this transaction
UUID host_id = UUIDHelpers::Nil;
/// NOTE Maybe we could just generate UUIDv4 for each transaction, but it would be harder to debug.
/// Partial order is defined for this TransactionID structure:
/// (tid1.start_csn <= tid2.start_csn) <==> (tid1 <= tid2)
/// (tid1.start_csn == tid2.start_csn && tid1.host_id == tid2.host_id && tid1.local_tid < tid2.local_tid) ==> (tid1 < tid2)
/// If two transaction have the same start_csn, but were started by different hosts, then order is undefined.
bool operator == (const TransactionID & rhs) const
{
return start_csn == rhs.start_csn && local_tid == rhs.local_tid && host_id == rhs.host_id;

View File

@ -1184,4 +1184,14 @@ String extractZooKeeperPath(const String & path, bool check_starts_with_slash, P
return normalizeZooKeeperPath(path, check_starts_with_slash, log);
}
String getSequentialNodeName(const String & prefix, UInt64 number)
{
/// NOTE Sequential counter in ZooKeeper is Int32.
assert(number < std::numeric_limits<Int32>::max());
constexpr size_t seq_node_digits = 10;
String num_str = std::to_string(number);
String name = prefix + String(seq_node_digits - num_str.size(), '0') + num_str;
return name;
}
}

View File

@ -386,4 +386,6 @@ String extractZooKeeperName(const String & path);
String extractZooKeeperPath(const String & path, bool check_starts_with_slash, Poco::Logger * log = nullptr);
String getSequentialNodeName(const String & prefix, UInt64 number);
}

View File

@ -277,6 +277,8 @@ struct ContextSharedPart
Context::ConfigReloadCallback config_reload_callback;
bool is_server_completely_started = false;
ContextSharedPart()
: access_control(std::make_unique<AccessControl>())
, global_overcommit_tracker(&process_list)
@ -3053,6 +3055,22 @@ MergeTreeTransactionPtr Context::getCurrentTransaction() const
return merge_tree_transaction;
}
bool Context::isServerCompletelyStarted() const
{
auto lock = getLock();
assert(getApplicationType() == ApplicationType::SERVER);
return shared->is_server_completely_started;
}
void Context::setServerCompletelyStarted()
{
auto lock = getLock();
assert(global_context.lock().get() == this);
assert(!shared->is_server_completely_started);
assert(getApplicationType() == ApplicationType::SERVER);
shared->is_server_completely_started = true;
}
PartUUIDsPtr Context::getPartUUIDs() const
{
auto lock = getLock();

View File

@ -887,6 +887,9 @@ public:
void setCurrentTransaction(MergeTreeTransactionPtr txn);
MergeTreeTransactionPtr getCurrentTransaction() const;
bool isServerCompletelyStarted() const;
void setServerCompletelyStarted();
PartUUIDsPtr getPartUUIDs() const;
PartUUIDsPtr getIgnoredPartUUIDs() const;

View File

@ -390,12 +390,7 @@ ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_conte
String DDLTaskBase::getLogEntryName(UInt32 log_entry_number)
{
/// Sequential counter in ZooKeeper is Int32.
assert(log_entry_number < std::numeric_limits<Int32>::max());
constexpr size_t seq_node_digits = 10;
String number = toString(log_entry_number);
String name = "query-" + String(seq_node_digits - number.size(), '0') + number;
return name;
return zkutil::getSequentialNodeName("query-", log_entry_number);
}
UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)

View File

@ -13,7 +13,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id)
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id)
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
, csn(Tx::UnknownCSN)
@ -41,22 +41,33 @@ void MergeTreeTransaction::checkIsNotCancelled() const
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
/// Now we know actual part name and can write it to system log table.
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
/// Creation TID was written to data part earlier on part creation.
/// We only need to ensure that it's written and add part to in-memory set of new parts.
new_part->assertHasVersionMetadata(txn);
if (txn)
{
txn->addNewPart(storage, new_part);
/// Now we know actual part name and can write it to system log table.
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, txn->tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
}
}
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
{
TransactionInfoContext context{storage->getStorageID(), part_to_remove->name};
if (txn)
{
/// Lock part for removal and write current TID into version metadata file.
/// If server crash just after committing transactions
/// we will find this TID in version metadata and will finally remove part.
txn->removeOldPart(storage, part_to_remove, context);
}
else
part_to_remove->version.lockMaxTID(Tx::PrehistoricTID, context);
{
/// Lock part for removal with special TID, so transactions will no try to remove it concurrently.
/// We lock it only in memory.
part_to_remove->version.lockRemovalTID(Tx::PrehistoricTID, context);
}
}
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
@ -81,7 +92,7 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage
for (const auto & covered : covered_parts)
{
context.part_name = covered->name;
covered->version.lockMaxTID(tid, context);
covered->version.lockRemovalTID(tid, context);
}
}
}
@ -101,7 +112,7 @@ void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataP
checkIsNotCancelled();
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
part_to_remove->version.lockMaxTID(tid, context);
part_to_remove->version.lockRemovalTID(tid, context);
storages.insert(storage);
removing_parts.push_back(part_to_remove);
}
@ -126,27 +137,37 @@ bool MergeTreeTransaction::isReadOnly() const
void MergeTreeTransaction::beforeCommit()
{
CSN expected = Tx::UnknownCSN;
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
if (!can_commit)
{
if (expected == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
}
RunningMutationsList mutations_to_wait;
{
std::lock_guard lock{mutex};
mutations_to_wait = mutations;
}
/// We should wait for mutations to finish before committing transaction, because some mutation may fail and cause rollback.
for (const auto & table_and_mutation : mutations_to_wait)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
assert([&]() {
std::lock_guard lock{mutex};
return mutations == mutations_to_wait;
}());
CSN expected = Tx::UnknownCSN;
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
if (!can_commit)
{
/// Transaction was concurrently cancelled by KILL TRANSACTION or KILL MUTATION
if (expected == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
}
}
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
{
/// Write allocated CSN into version metadata, so we will know CSN without reading it from transaction log
/// and we will be able to remove old entries from transaction log in ZK.
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
assert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
@ -167,21 +188,31 @@ bool MergeTreeTransaction::rollback() noexcept
CSN expected = Tx::UnknownCSN;
bool need_rollback = csn.compare_exchange_strong(expected, Tx::RolledBackCSN);
/// Check that it was not rolled back concurrently
if (!need_rollback)
return false;
/// It's not a problem if server crash at this point
/// because on startup we will see that TID is not committed and will simply discard these changes.
/// Forcefully stop related mutations if any
for (const auto & table_and_mutation : mutations)
table_and_mutation.first->killMutation(table_and_mutation.second);
/// Kind of optimization: cleanup thread can remove these parts immediately
for (const auto & part : creating_parts)
part->version.creation_csn.store(Tx::RolledBackCSN);
for (const auto & part : removing_parts)
{
/// Clear removal_tid from version metadata file, so we will not need to distinguish TIDs that were not committed
/// and TIDs that were committed long time ago and were removed from the log on log cleanup.
part->appendRemovalTIDToVersionMetadata(/* clear */ true);
part->version.unlockMaxTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
part->version.unlockRemovalTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
}
/// Discard changes in active parts set
/// Remove parts that were created, restore parts that were removed (except parts that were created by this transaction too)
for (const auto & part : creating_parts)
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(nullptr, {part}, true);

View File

@ -25,12 +25,12 @@ public:
ROLLED_BACK,
};
Snapshot getSnapshot() const { return snapshot; }
CSN getSnapshot() const { return snapshot; }
State getState() const;
const TransactionID tid;
MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id);
MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id);
void addNewPart(const StoragePtr & storage, const DataPartPtr & new_part);
void removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, const TransactionInfoContext & context);
@ -58,7 +58,7 @@ private:
mutable std::mutex mutex;
Stopwatch elapsed;
Snapshot snapshot;
CSN snapshot;
std::unordered_set<StoragePtr> storages;
DataPartsVector creating_parts;
@ -66,7 +66,7 @@ private:
std::atomic<CSN> csn;
std::list<Snapshot>::iterator snapshot_in_use_it;
std::list<CSN>::iterator snapshot_in_use_it;
using RunningMutationsList = std::vector<std::pair<StoragePtr, String>>;
RunningMutationsList mutations;

View File

@ -10,6 +10,11 @@
#include <Core/ServerUUID.h>
#include <base/logger_useful.h>
/// It's used in critical places to exit on unexpected exceptions.
/// SIGABRT is usually better that broken state in memory with unpredictable consequences.
#define NOEXCEPT_SCOPE SCOPE_EXIT({ if (std::uncaught_exceptions()) { tryLogCurrentException("NOEXCEPT_SCOPE"); abort(); } })
namespace DB
{
@ -45,7 +50,8 @@ TransactionLog::TransactionLog()
global_context = Context::getGlobalContextInstance();
global_context->checkTransactionsAreAllowed();
zookeeper_path = "/test/clickhouse/txn_log";
zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/test/clickhouse/txn");
zookeeper_path_log = zookeeper_path + "/log";
loadLogFromZooKeeper();
@ -86,6 +92,11 @@ UInt64 TransactionLog::parseCSN(const String & csn_node_name)
return res;
}
String TransactionLog::writeCSN(CSN csn)
{
return zkutil::getSequentialNodeName("csn-", csn);
}
TransactionID TransactionLog::parseTID(const String & csn_node_content)
{
TransactionID tid = Tx::EmptyTID;
@ -114,12 +125,12 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
return;
String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path, *beg, last_entry);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path) / *it));
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path_log) / *it));
std::vector<std::pair<TIDHash, CSN>> loaded;
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
auto it = beg;
for (size_t i = 0; i < entries_count; ++i, ++it)
@ -127,25 +138,24 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
auto res = futures[i].get();
CSN csn = parseCSN(*it);
TransactionID tid = parseTID(res.data);
loaded.emplace_back(tid.getHash(), csn);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
}
futures.clear();
/// Use noexcept here to exit on unexpected exceptions (SIGABRT is better that broken state in memory)
auto insert = [&]() noexcept
{
for (const auto & entry : loaded)
if (entry.first != Tx::EmptyTID.getHash())
tid_to_csn.emplace(entry.first, entry.second);
last_loaded_entry = last_entry;
latest_snapshot = loaded.back().second;
local_tid_counter = Tx::MaxReservedLocalTID;
};
NOEXCEPT_SCOPE;
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
std::lock_guard lock{mutex};
insert();
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
tid_to_csn.emplace(entry.first, entry.second);
}
last_loaded_entry = last_entry;
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
void TransactionLog::loadLogFromZooKeeper()
@ -158,15 +168,19 @@ void TransactionLog::loadLogFromZooKeeper()
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
/// Create empty entry to allocate new CSN to safely start counting from the beginning and avoid TID duplication.
/// TODO It's possible to skip this step in come cases (especially for multi-host configuration).
Coordination::Error code = zookeeper->tryCreate(zookeeper_path + "/csn-", "", zkutil::CreateMode::PersistentSequential);
Coordination::Error code = zookeeper->tryCreate(zookeeper_path_log + "/csn-", "", zkutil::CreateMode::PersistentSequential);
if (code != Coordination::Error::ZOK)
{
/// Log probably does not exist, create it
assert(code == Coordination::Error::ZNONODE);
zookeeper->createAncestors(zookeeper_path);
zookeeper->createAncestors(zookeeper_path_log);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", writeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path_log, "", zkutil::CreateMode::Persistent));
/// Fast-forward sequential counter to skip reserved CSNs
for (size_t i = 0; i <= Tx::MaxReservedCSN; ++i)
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/csn-", "", zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path_log + "/csn-", "", zkutil::CreateMode::PersistentSequential));
Coordination::Responses res;
code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZNODEEXISTS)
@ -177,13 +191,15 @@ void TransactionLog::loadLogFromZooKeeper()
/// 1. fetch it more optimal way (avoid listing all CSNs on further incremental updates)
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
Strings entries_list = zookeeper->getChildren(zookeeper_path, nullptr, log_updated_event);
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
loadEntries(entries_list.begin(), entries_list.end());
assert(!last_loaded_entry.empty());
assert(latest_snapshot == parseCSN(last_loaded_entry));
local_tid_counter = Tx::MaxReservedLocalTID;
tail_ptr = parseCSN(zookeeper->get(zookeeper_path + "/tail_ptr"));
}
void TransactionLog::runUpdatingThread()
@ -204,6 +220,7 @@ void TransactionLog::runUpdatingThread()
}
loadNewEntries();
removeOldEntries();
}
catch (const Coordination::Exception & e)
{
@ -228,7 +245,7 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path, nullptr, log_updated_event);
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
@ -238,8 +255,64 @@ void TransactionLog::loadNewEntries()
latest_snapshot.notify_all();
}
void TransactionLog::removeOldEntries()
{
/// Try to update tail pointer. It's (almost) safe to set it to the oldest snapshot
/// because if a transaction released snapshot, then CSN is already written into metadata.
/// Why almost? Because on server startup we do not have the oldest snapshot (it's simply equal to the latest one),
/// but it's possible that some CSNs are not written into data parts (and we will write them during startup).
if (!global_context->isServerCompletelyStarted())
return;
Snapshot TransactionLog::getLatestSnapshot() const
/// Also similar problem is possible if some table was not attached during startup (for example, if table is detached permanently).
/// Also we write CSNs into data parts without fsync, so it's theoretically possible that we wrote CSN, finished transaction,
/// removed its entry from the log, but after that server restarts and CSN is not actually saved to metadata on disk.
/// We should store a bit more entries in ZK and keep outdated entries for a while.
/// TODO we will need a bit more complex logic for multiple hosts
Coordination::Stat stat;
CSN old_tail_ptr = parseCSN(zookeeper->get(zookeeper_path + "/tail_ptr", &stat));
CSN new_tail_ptr = getOldestSnapshot();
if (new_tail_ptr < old_tail_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug", old_tail_ptr, new_tail_ptr);
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE(log, "Updating tail_ptr from {} to {}", old_tail_ptr, new_tail_ptr);
zookeeper->set(zookeeper_path + "/tail_ptr", writeCSN(new_tail_ptr), stat.version);
tail_ptr.store(new_tail_ptr);
/// Now we can find and remove old entries
TIDMap tids;
{
std::lock_guard lock{mutex};
tids = tid_to_csn;
}
/// TODO support batching
std::vector<TIDHash> removed_entries;
CSN latest_entry_csn = latest_snapshot.load();
for (const auto & elem : tids)
{
/// Definitely not safe to remove
if (new_tail_ptr <= elem.second.tid.start_csn)
continue;
/// Keep at least one node (the latest one we fetched)
if (elem.second.csn == latest_entry_csn)
continue;
LOG_TEST(log, "Removing entry {} -> {}", elem.second.tid, elem.second.csn);
auto code = zookeeper->tryRemove(zookeeper_path_log + "/" + writeCSN(elem.second.csn));
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)
removed_entries.push_back(elem.first);
}
std::lock_guard lock{mutex};
for (const auto & tid_hash : removed_entries)
tid_to_csn.erase(tid_hash);
}
CSN TransactionLog::getLatestSnapshot() const
{
return latest_snapshot.load();
}
@ -249,7 +322,7 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
MergeTreeTransactionPtr txn;
{
std::lock_guard lock{running_list_mutex};
Snapshot snapshot = latest_snapshot.load();
CSN snapshot = latest_snapshot.load();
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get());
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
@ -266,11 +339,13 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
{
/// Some precommit checks, may throw
txn->beforeCommit();
CSN new_csn;
if (txn->isReadOnly())
{
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
LOG_TEST(log, "Closing readonly transaction {}", txn->tid);
new_csn = txn->snapshot;
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
@ -281,13 +356,17 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
/// TODO handle connection loss
/// TODO support batching
auto current_zookeeper = getZooKeeper();
String path_created = current_zookeeper->create(zookeeper_path + "/csn-", writeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point
new_csn = parseCSN(path_created.substr(zookeeper_path.size() + 1));
String path_created = current_zookeeper->create(zookeeper_path_log + "/csn-", writeTID(txn->tid), zkutil::CreateMode::PersistentSequential); /// Commit point
NOEXCEPT_SCOPE;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
new_csn = parseCSN(path_created.substr(zookeeper_path_log.size() + 1));
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
tryWriteEventToSystemLog(log, global_context, TransactionsInfoLogElement::COMMIT, txn->tid, new_csn);
/// Wait for committed changes to become actually visible, so the next transaction will see changes
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
/// TODO it's optional, add a setting for this
auto current_latest_snapshot = latest_snapshot.load();
while (current_latest_snapshot < new_csn && !stop_flag)
@ -297,9 +376,11 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
}
}
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
txn->afterCommit(new_csn);
{
/// Finally we can remove transaction from the list and release the snapshot
std::lock_guard lock{running_list_mutex};
bool removed = running_list.erase(txn->tid.getHash());
if (!removed)
@ -316,7 +397,10 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
std::uncaught_exceptions() ? fmt::format(" due to uncaught exception (code: {})", getCurrentExceptionCode()) : "");
if (!txn->rollback())
{
/// Transaction was cancelled concurrently, it's already rolled back.
return;
}
{
std::lock_guard lock{running_list_mutex};
@ -340,7 +424,10 @@ MergeTreeTransactionPtr TransactionLog::tryGetRunningTransaction(const TIDHash &
CSN TransactionLog::getCSN(const TransactionID & tid)
{
return getCSN(tid.getHash());
/// Avoid creation of the instance if transactions are not actually involved
if (tid == Tx::PrehistoricTID)
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid.getHash());
}
CSN TransactionLog::getCSN(const TIDHash & tid)
@ -348,23 +435,36 @@ CSN TransactionLog::getCSN(const TIDHash & tid)
/// Avoid creation of the instance if transactions are not actually involved
if (tid == Tx::PrehistoricTID.getHash())
return Tx::PrehistoricCSN;
return instance().getCSNImpl(tid);
}
CSN TransactionLog::getCSNImpl(const TIDHash & tid) const
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
{
assert(tid);
assert(tid != Tx::EmptyTID.getHash());
assert(tid_hash);
assert(tid_hash != Tx::EmptyTID.getHash());
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid);
if (it == tid_to_csn.end())
return Tx::UnknownCSN;
return it->second;
auto it = tid_to_csn.find(tid_hash);
if (it != tid_to_csn.end())
return it->second.csn;
return Tx::UnknownCSN;
}
Snapshot TransactionLog::getOldestSnapshot() const
void TransactionLog::assertTIDIsNotOutdated(const TransactionID & tid)
{
if (tid == Tx::PrehistoricTID)
return;
/// Ensure that we are not trying to get CSN for TID that was already removed from
CSN tail = instance().tail_ptr.load();
if (tail <= tid.start_csn)
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get CSN for too old TID {}, current tail_ptr is {}, probably it's a bug", tid, tail);
}
CSN TransactionLog::getOldestSnapshot() const
{
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())

View File

@ -50,6 +50,35 @@ class TransactionsInfoLog;
using TransactionsInfoLogPtr = std::shared_ptr<TransactionsInfoLog>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
/// This class maintains transaction log in ZooKeeper and a list of currently running transactions in memory.
///
/// Each transaction has unique ID (TID, see details below).
/// TransactionID is allocated when transaction begins.
///
/// We use TransactionID to associate changes (created/removed data parts) with transaction that has made/is going to make these changes.
/// To commit a transaction we create sequential node "/path_to_log/log/csn-" in ZK and write TID into this node.
/// Allocated sequential number is a commit timestamp or Commit Sequence Number (CSN). It indicates a (logical) point in time
/// when transaction is committed and all its changes became visible. So we have total order of all changes.
///
/// Also CSNs are used as snapshots: all changes that were made by a transaction that was committed with a CSN less or equal than some_csn
/// are visible in some_csn snapshot.
///
/// TransactionID consists of three parts: (start_csn, local_tid, host_id)
/// - start_csn is the newest CSN that existed when the transaction was started and also it's snapshot that is visible for this transaction
/// - local_tid is local sequential number of the transaction, each server allocates local_tids independently without requests to ZK
/// - host_id is persistent UUID of host that has started the transaction, it's kind of tie-breaker that makes ID unique across all servers
///
/// To check if some transaction is committed or not we fetch "csn-xxxxxx" nodes from ZK and construct TID -> CSN mapping,
/// so for committed transactions we know commit timestamps.
/// However, if we did not find a mapping for some TID, it means one of the following cases:
/// 1. Transaction is not committed (yet)
/// 2. Transaction is rolled back (quite similar to the first case, but it will never be committed)
/// 3. Transactions was committed a long time ago and we removed its entry from the log
/// To distinguish the third case we store a "tail pointer" in "/path_to_log/tail_ptr". It's a CSN such that it's safe to remove from log
/// entries with tid.start_csn < tail_ptr, because CSNs for those TIDs are already written into data parts
/// and we will not do a CSN lookup for those TIDs anymore.
///
/// (however, transactions involving multiple hosts and/or ReplicatedMergeTree tables are currently not supported)
class TransactionLog final : public SingletonHelper<TransactionLog>
{
public:
@ -60,22 +89,37 @@ public:
void shutdown();
Snapshot getLatestSnapshot() const;
Snapshot getOldestSnapshot() const;
/// Returns the newest snapshot available for reading
CSN getLatestSnapshot() const;
/// Returns the oldest snapshot that is visible for some running transaction
CSN getOldestSnapshot() const;
/// Allocated TID, returns transaction object
/// Allocates TID, returns new transaction object
MergeTreeTransactionPtr beginTransaction();
/// Tries to commit transaction. Returns Commit Sequence Number.
/// Throw if transaction was concurrently killed or if some precommit check failed.
/// May throw if ZK connection is lost. Transaction status is unknown in this case.
CSN commitTransaction(const MergeTreeTransactionPtr & txn);
/// Releases locks that that were acquired by transaction, releases snapshot, removes transaction from the list of active transactions.
/// Normally it should not throw, but if it does for some reason (global memory limit exceeded, disk failure, etc)
/// then we should terminate server and reinitialize it to avoid corruption of data structures. That's why it's noexcept.
void rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept;
/// Returns CSN if transaction with specified ID was committed and UnknownCSN if it was not.
/// Returns PrehistoricCSN for PrehistoricTID without creating a TransactionLog instance as a special case.
static CSN getCSN(const TransactionID & tid);
static CSN getCSN(const TIDHash & tid);
/// Ensures that getCSN returned UnknownCSN because transaction is not committed and not because entry was removed from the log.
static void assertTIDIsNotOutdated(const TransactionID & tid);
/// Returns a pointer to transaction object if it's running or nullptr.
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
using TransactionsList = std::unordered_map<TIDHash, MergeTreeTransactionPtr>;
/// Returns copy of list of running transactions.
TransactionsList getTransactionsList() const;
private:
@ -84,31 +128,52 @@ private:
void loadEntries(Strings::const_iterator beg, Strings::const_iterator end);
void loadNewEntries();
void removeOldEntries();
static UInt64 parseCSN(const String & csn_node_name);
static String writeCSN(CSN csn);
static TransactionID parseTID(const String & csn_node_content);
static String writeTID(const TransactionID & tid);
ZooKeeperPtr getZooKeeper() const;
CSN getCSNImpl(const TIDHash & tid) const;
CSN getCSNImpl(const TIDHash & tid_hash) const;
ContextPtr global_context;
Poco::Logger * log;
/// The newest snapshot available for reading
std::atomic<CSN> latest_snapshot;
/// Local part of TransactionID number. We reset this counter for each new snapshot.
std::atomic<LocalTID> local_tid_counter;
mutable std::mutex mutex;
std::unordered_map<TIDHash, CSN> tid_to_csn;
/// Mapping from TransactionID to CSN for recently committed transactions.
/// Allows to check if some transactions is committed.
struct CSNEntry
{
CSN csn;
TransactionID tid;
};
using TIDMap = std::unordered_map<TIDHash, CSNEntry>;
TIDMap tid_to_csn;
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list;
std::list<Snapshot> snapshots_in_use;
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use;
String zookeeper_path;
ZooKeeperPtr zookeeper;
String zookeeper_path;
String zookeeper_path_log;
/// Name of the newest entry that was loaded from log in ZK
String last_loaded_entry;
/// The oldest CSN such that we store in log entries with TransactionIDs containing this CSN.
std::atomic<CSN> tail_ptr = Tx::UnknownCSN;
zkutil::EventPtr log_updated_event = std::make_shared<Poco::Event>();
std::atomic_bool stop_flag = false;

View File

@ -20,6 +20,22 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_TEXT;
}
inline static CSN getCSNAndAssert(TIDHash tid_hash, std::atomic<CSN> & csn, const TransactionID * tid = nullptr)
{
CSN maybe_csn = TransactionLog::getCSN(tid_hash);
if (maybe_csn)
return maybe_csn;
maybe_csn = csn.load();
if (maybe_csn)
return maybe_csn;
if (tid)
TransactionLog::assertTIDIsNotOutdated(*tid);
return Tx::UnknownCSN;
}
VersionMetadata::VersionMetadata()
{
/// It would be better to make it static, but static loggers do not work for some reason (initialization order?)
@ -29,12 +45,12 @@ VersionMetadata::VersionMetadata()
/// It can be used for introspection purposes only
TransactionID VersionMetadata::getRemovalTID() const
{
TIDHash max_lock = removal_tid_lock.load();
if (max_lock)
TIDHash removal_lock = removal_tid_lock.load();
if (removal_lock)
{
if (max_lock == Tx::PrehistoricTID.getHash())
if (removal_lock == Tx::PrehistoricTID.getHash())
return Tx::PrehistoricTID;
if (auto txn = TransactionLog::instance().tryGetRunningTransaction(max_lock))
if (auto txn = TransactionLog::instance().tryGetRunningTransaction(removal_lock))
return txn->tid;
}
@ -47,11 +63,11 @@ TransactionID VersionMetadata::getRemovalTID() const
return Tx::EmptyTID;
}
void VersionMetadata::lockMaxTID(const TransactionID & tid, const TransactionInfoContext & context)
void VersionMetadata::lockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Trying to lock removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
TIDHash locked_by = 0;
if (tryLockMaxTID(tid, context, &locked_by))
if (tryLockRemovalTID(tid, context, &locked_by))
return;
String part_desc;
@ -66,16 +82,16 @@ void VersionMetadata::lockMaxTID(const TransactionID & tid, const TransactionInf
tid, part_desc, context.table.getNameForLogs(), getRemovalTID(), locked_by);
}
bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
{
assert(!tid.isEmpty());
assert(!creation_tid.isEmpty());
TIDHash max_lock_value = tid.getHash();
TIDHash expected_max_lock_value = 0;
bool locked = removal_tid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value);
TIDHash removal_lock_value = tid.getHash();
TIDHash expected_removal_lock_value = 0;
bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value);
if (!locked)
{
if (tid == Tx::PrehistoricTID && expected_max_lock_value == Tx::PrehistoricTID.getHash())
if (tid == Tx::PrehistoricTID && expected_removal_lock_value == Tx::PrehistoricTID.getHash())
{
/// Don't need to lock part for queries without transaction
LOG_TEST(log, "Assuming removal_tid is locked by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
@ -83,7 +99,7 @@ bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const Transaction
}
if (locked_by_id)
*locked_by_id = expected_max_lock_value;
*locked_by_id = expected_removal_lock_value;
return false;
}
@ -92,21 +108,21 @@ bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const Transaction
return true;
}
void VersionMetadata::unlockMaxTID(const TransactionID & tid, const TransactionInfoContext & context)
void VersionMetadata::unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
assert(!tid.isEmpty());
TIDHash max_lock_value = tid.getHash();
TIDHash removal_lock_value = tid.getHash();
TIDHash locked_by = removal_tid_lock.load();
auto throw_cannot_unlock = [&]()
{
auto locked_by_txn = TransactionLog::instance().tryGetRunningTransaction(locked_by);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unlock removal_tid, it's a bug. Current: {} {}, actual: {} {}",
max_lock_value, tid, locked_by, locked_by_txn ? locked_by_txn->tid : Tx::EmptyTID);
removal_lock_value, tid, locked_by, locked_by_txn ? locked_by_txn->tid : Tx::EmptyTID);
};
if (locked_by != max_lock_value)
if (locked_by != removal_lock_value)
throw_cannot_unlock();
removal_tid = Tx::EmptyTID;
@ -136,22 +152,20 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
return isVisible(txn.getSnapshot(), txn.tid);
}
bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current_tid)
bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
{
assert(!creation_tid.isEmpty());
CSN min = creation_csn.load(std::memory_order_relaxed);
TIDHash max_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN max = removal_csn.load(std::memory_order_relaxed);
CSN creation = creation_csn.load(std::memory_order_relaxed);
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN removal = removal_csn.load(std::memory_order_relaxed);
//LOG_TEST(log, "Checking if creation_tid {} creation_csn {} removal_tidhash {} removal_csn {} visible for {} {}", creation_tid, min, max_lock, max, snapshot_version, current_tid);
[[maybe_unused]] bool had_creation_csn = min;
[[maybe_unused]] bool had_removal_tid = max_lock;
[[maybe_unused]] bool had_removal_csn = max;
[[maybe_unused]] bool had_creation_csn = creation;
[[maybe_unused]] bool had_removal_tid = removal_lock;
[[maybe_unused]] bool had_removal_csn = removal;
assert(!had_removal_csn || had_removal_tid);
assert(!had_removal_csn || had_creation_csn);
assert(min == Tx::UnknownCSN || min == Tx::PrehistoricCSN || Tx::MaxReservedCSN < min);
assert(max == Tx::UnknownCSN || max == Tx::PrehistoricCSN || Tx::MaxReservedCSN < max);
assert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
assert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
/// Fast path:
@ -159,20 +173,20 @@ bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current
/// - creation was committed after we took the snapshot
/// - removal was committed before we took the snapshot
/// - current transaction is removing it
if (min && snapshot_version < min)
if (creation && snapshot_version < creation)
return false;
if (max && max <= snapshot_version)
if (removal && removal <= snapshot_version)
return false;
if (!current_tid.isEmpty() && max_lock && max_lock == current_tid.getHash())
if (!current_tid.isEmpty() && removal_lock && removal_lock == current_tid.getHash())
return false;
/// Otherwise, part is definitely visible if:
/// - creation was committed before we took the snapshot and nobody tried to remove the part
/// - creation was committed before and removal was committed after
/// - current transaction is creating it
if (min && min <= snapshot_version && !max_lock)
if (creation && creation <= snapshot_version && !removal_lock)
return true;
if (min && min <= snapshot_version && max && snapshot_version < max)
if (creation && creation <= snapshot_version && removal && snapshot_version < removal)
return true;
if (!current_tid.isEmpty() && creation_tid == current_tid)
return true;
@ -183,7 +197,7 @@ bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current
/// It means that some transaction is creating/removing the part right now or has done it recently
/// and we don't know if it was already committed or not.
assert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
assert(current_tid.isEmpty() || (creation_tid != current_tid && max_lock != current_tid.getHash()));
assert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
/// Before doing CSN lookup, let's check some extra conditions.
/// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid
@ -193,28 +207,30 @@ bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current
return false;
/// Check if creation_tid/removal_tid transactions are committed and write CSNs
/// TODO Transactions: we probably need some optimizations here
/// TODO Transactions: we probably need more optimizations here
/// to avoid some CSN lookups or make the lookups cheaper.
/// NOTE: Old enough committed parts always have written CSNs,
/// so we can determine their visibility through fast path.
/// But for long-running writing transactions we will always do
/// CNS lookup and get 0 (UnknownCSN) until the transaction is committer/rolled back.
min = TransactionLog::getCSN(creation_tid);
if (!min)
creation = getCSNAndAssert(creation_tid.getHash(), creation_csn, &creation_tid);
if (!creation)
{
return false; /// Part creation is not committed yet
}
/// We don't need to check if CSNs are already written or not,
/// because once written CSN cannot be changed, so it's safe to overwrite it (with the same value).
creation_csn.store(min, std::memory_order_relaxed);
creation_csn.store(creation, std::memory_order_relaxed);
if (max_lock)
if (removal_lock)
{
max = TransactionLog::getCSN(max_lock);
if (max)
removal_csn.store(max, std::memory_order_relaxed);
removal = getCSNAndAssert(removal_lock, removal_csn);
if (removal)
removal_csn.store(removal, std::memory_order_relaxed);
}
return min <= snapshot_version && (!max || snapshot_version < max);
return creation <= snapshot_version && (!removal || snapshot_version < removal);
}
bool VersionMetadata::canBeRemoved()
@ -223,56 +239,56 @@ bool VersionMetadata::canBeRemoved()
{
/// Avoid access to Transaction log if transactions are not involved
TIDHash max_lock = removal_tid_lock.load(std::memory_order_relaxed);
if (!max_lock)
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
if (!removal_lock)
return false;
if (max_lock == Tx::PrehistoricTID.getHash())
if (removal_lock == Tx::PrehistoricTID.getHash())
return true;
}
return canBeRemovedImpl(TransactionLog::instance().getOldestSnapshot());
}
bool VersionMetadata::canBeRemovedImpl(Snapshot oldest_snapshot_version)
bool VersionMetadata::canBeRemovedImpl(CSN oldest_snapshot_version)
{
CSN min = creation_csn.load(std::memory_order_relaxed);
CSN creation = creation_csn.load(std::memory_order_relaxed);
/// We can safely remove part if its creation was rolled back
if (min == Tx::RolledBackCSN)
if (creation == Tx::RolledBackCSN)
return true;
if (!min)
if (!creation)
{
/// Cannot remove part if its creation not committed yet
min = TransactionLog::getCSN(creation_tid);
if (min)
creation_csn.store(min, std::memory_order_relaxed);
creation = getCSNAndAssert(creation_tid.getHash(), creation_csn, &creation_tid);
if (creation)
creation_csn.store(creation, std::memory_order_relaxed);
else
return false;
}
/// Part is probably visible for some transactions (part is too new or the oldest snapshot is too old)
if (oldest_snapshot_version < min)
if (oldest_snapshot_version < creation)
return false;
TIDHash max_lock = removal_tid_lock.load(std::memory_order_relaxed);
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
/// Part is active
if (!max_lock)
if (!removal_lock)
return false;
CSN max = removal_csn.load(std::memory_order_relaxed);
if (!max)
CSN removal = removal_csn.load(std::memory_order_relaxed);
if (!removal)
{
/// Part removal is not committed yet
max = TransactionLog::getCSN(max_lock);
if (max)
removal_csn.store(max, std::memory_order_relaxed);
removal = getCSNAndAssert(removal_lock, removal_csn);
if (removal)
removal_csn.store(removal, std::memory_order_relaxed);
else
return false;
}
/// We can safely remove part if all running transactions were started after part removal was committed
return max <= oldest_snapshot_version;
return removal <= oldest_snapshot_version;
}
#define CREATION_TID_STR "creation_tid: "
@ -285,20 +301,20 @@ void VersionMetadata::writeCSN(WriteBuffer & buf, WhichCSN which_csn, bool inter
{
if (which_csn == CREATION)
{
if (CSN min = creation_csn.load())
if (CSN creation = creation_csn.load())
{
writeCString("\n" CREATION_CSN_STR, buf);
writeText(min, buf);
writeText(creation, buf);
}
else if (!internal)
throw Exception(ErrorCodes::LOGICAL_ERROR, "writeCSN called for creation_csn = 0, it's a bug");
}
else /// if (which_csn == REMOVAL)
{
if (CSN max = removal_csn.load())
if (CSN removal = removal_csn.load())
{
writeCString("\n" REMOVAL_CSN_STR, buf);
writeText(max, buf);
writeText(removal, buf);
}
else if (!internal)
throw Exception(ErrorCodes::LOGICAL_ERROR, "writeCSN called for removal_csn = 0, it's a bug");

View File

@ -31,14 +31,14 @@ struct VersionMetadata
/// Checks if an object is visible for transaction or not.
bool isVisible(const MergeTreeTransaction & txn);
bool isVisible(Snapshot snapshot_version, TransactionID current_tid = Tx::EmptyTID);
bool isVisible(CSN snapshot_version, TransactionID current_tid = Tx::EmptyTID);
TransactionID getCreationTID() const { return creation_tid; }
TransactionID getRemovalTID() const;
bool tryLockMaxTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id = nullptr);
void lockMaxTID(const TransactionID & tid, const TransactionInfoContext & context);
void unlockMaxTID(const TransactionID & tid, const TransactionInfoContext & context);
bool tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id = nullptr);
void lockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context);
void unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context);
bool isRemovalTIDLocked() const;
@ -47,7 +47,7 @@ struct VersionMetadata
/// Checks if it's safe to remove outdated version of an object
bool canBeRemoved();
bool canBeRemovedImpl(Snapshot oldest_snapshot_version);
bool canBeRemovedImpl(CSN oldest_snapshot_version);
void write(WriteBuffer & buf) const;
void read(ReadBuffer & buf);

View File

@ -1366,6 +1366,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
TransactionLog::assertTIDIsNotOutdated(version.creation_tid);
min = Tx::RolledBackCSN;
}
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}",
@ -1384,10 +1385,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
else
{
TransactionLog::assertTIDIsNotOutdated(version.removal_tid);
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
part->name, version.removal_tid);
version.unlockMaxTID(version.removal_tid, TransactionInfoContext{getStorageID(), part->name});
version.unlockRemovalTID(version.removal_tid, TransactionInfoContext{getStorageID(), part->name});
}
version_updated = true;
}
@ -4030,14 +4032,14 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(CSN snapshot_version, TransactionID current_tid) const
{
auto res = getDataPartsVectorForInternalUsage({DataPartState::Active, DataPartState::Outdated});
filterVisibleDataParts(res, snapshot_version, current_tid);
return res;
}
void MergeTreeData::filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const
void MergeTreeData::filterVisibleDataParts(DataPartsVector & maybe_visible_parts, CSN snapshot_version, TransactionID current_tid) const
{
if (maybe_visible_parts.empty())
return;
@ -4559,7 +4561,7 @@ void MergeTreeData::Transaction::rollback()
DataPartPtr covering_part;
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, lock);
for (auto & covered : covered_parts)
covered->version.unlockMaxTID(Tx::PrehistoricTID, TransactionInfoContext{data.getStorageID(), covered->name});
covered->version.unlockRemovalTID(Tx::PrehistoricTID, TransactionInfoContext{data.getStorageID(), covered->name});
}
}

View File

@ -446,12 +446,12 @@ public:
DataParts getDataPartsForInternalUsage() const;
DataPartsVector getDataPartsVectorForInternalUsage() const;
void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const;
void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, CSN snapshot_version, TransactionID current_tid) const;
/// Returns parts that visible with current snapshot
DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const;
DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const;
DataPartsVector getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const;
DataPartsVector getVisibleDataPartsVector(CSN snapshot_version, TransactionID current_tid) const;
/// Returns a part in Active state with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name) const;

View File

@ -722,6 +722,7 @@ void StorageMergeTree::loadMutations()
{
if (!TransactionLog::getCSN(entry.tid))
{
TransactionLog::assertTIDIsNotOutdated(entry.tid);
LOG_DEBUG(log, "Mutation entry {} was created by transaction {}, but it was not committed. Removing mutation entry",
it->name(), entry.tid);
disk->removeFile(it->path());

View File

@ -201,6 +201,17 @@ def get_processlist(args):
else:
return clickhouse_execute_json(args, 'SHOW PROCESSLIST')
def get_transactions_list(args):
try:
if args.replicated_database:
return clickhouse_execute_json(args, """
SELECT materialize((hostName(), tcpPort())) as host, *
FROM clusterAllReplicas('test_cluster_database_replicated', system.transactions)
""")
else:
return clickhouse_execute_json(args, 'select * from system.transactions')
except Exception as e:
return f"Cannot get list of transactions: {e}"
# collect server stacktraces using gdb
def get_stacktraces_from_gdb(server_pid):
@ -1320,6 +1331,7 @@ def main(args):
if processlist:
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
print(json.dumps(processlist, indent=4))
print(get_transactions_list(args))
print_stacktraces()
exit_code.value = 1