fix race on TID allocation

This commit is contained in:
Alexander Tokmakov 2022-05-20 12:41:44 +02:00
parent 8dcee2813d
commit 12bbb7de87
11 changed files with 148 additions and 57 deletions

View File

@ -105,6 +105,25 @@
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
#if !defined(ABORT_ON_LOGICAL_ERROR)
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
#endif
/// chassert(x) is similar to assert(x), but:
/// - works in builds with sanitizers, not only in debug builds
/// - tries to print failed assertion into server log
/// It can be used for all assertions except heavy ones.
/// Heavy assertions (that run loops or call complex functions) are allowed in debug builds only.
#if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR)
#define chassert(x) static_cast<bool>(x) ? void(0) : abortOnFailedAssertion(#x)
#else
#define chassert(x) ((void)0)
#endif
#endif
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])

View File

@ -35,6 +35,18 @@ namespace ErrorCodes
extern const int CANNOT_MREMAP;
}
void abortOnFailedAssertion(const String & description)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", description);
/// This is to suppress -Wmissing-noreturn
volatile bool always_false = false;
if (always_false)
return;
abort();
}
/// - Aborts the process if error code is LOGICAL_ERROR.
/// - Increments error codes statistics.
void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool remote, const Exception::FramePointers & trace)
@ -44,8 +56,7 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
#ifdef ABORT_ON_LOGICAL_ERROR
if (code == ErrorCodes::LOGICAL_ERROR)
{
LOG_FATAL(&Poco::Logger::root(), "Logical error: '{}'.", msg);
abort();
abortOnFailedAssertion(msg);
}
#endif

View File

@ -12,16 +12,14 @@
#include <fmt/format.h>
#if !defined(NDEBUG) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER) || defined(UNDEFINED_BEHAVIOR_SANITIZER)
#define ABORT_ON_LOGICAL_ERROR
#endif
namespace Poco { class Logger; }
namespace DB
{
void abortOnFailedAssertion(const String & description);
class Exception : public Poco::Exception
{
public:

View File

@ -287,7 +287,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
/// The following message is too verbose, but it can be useful to debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={}, "

View File

@ -158,7 +158,7 @@ void MergeTreeTransaction::addMutation(const StoragePtr & table, const String &
bool MergeTreeTransaction::isReadOnly() const
{
std::lock_guard lock{mutex};
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
chassert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
return storages.empty();
}
@ -204,7 +204,7 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
/// and we will be able to remove old entries from transaction log in ZK.
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
assert(prev_value == Tx::CommittingCSN);
chassert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
{
part->version.creation_csn.store(csn);
@ -321,7 +321,7 @@ String MergeTreeTransaction::dumpDescription() const
{
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
}
for (const auto & mutation : mutations)

View File

@ -145,24 +145,29 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
NOEXCEPT_SCOPE;
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
{
if (entry.first == Tx::EmptyTID.getHash())
continue;
tid_to_csn.emplace(entry.first, entry.second);
tid_to_csn.emplace(entry.first, entry.second);
}
last_loaded_entry = last_entry;
}
{
std::lock_guard lock{running_list_mutex};
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
last_loaded_entry = last_entry;
latest_snapshot = loaded.back().second.csn;
local_tid_counter = Tx::MaxReservedLocalTID;
}
void TransactionLog::loadLogFromZooKeeper()
{
assert(!zookeeper);
assert(tid_to_csn.empty());
assert(last_loaded_entry.empty());
chassert(!zookeeper);
chassert(tid_to_csn.empty());
chassert(last_loaded_entry.empty());
zookeeper = global_context->getZooKeeper();
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
@ -172,7 +177,7 @@ void TransactionLog::loadLogFromZooKeeper()
if (code != Coordination::Error::ZOK)
{
/// Log probably does not exist, create it
assert(code == Coordination::Error::ZNONODE);
chassert(code == Coordination::Error::ZNONODE);
zookeeper->createAncestors(zookeeper_path_log);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/tail_ptr", serializeCSN(Tx::MaxReservedCSN), zkutil::CreateMode::Persistent));
@ -192,11 +197,11 @@ void TransactionLog::loadLogFromZooKeeper()
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
loadEntries(entries_list.begin(), entries_list.end());
assert(!last_loaded_entry.empty());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(!last_loaded_entry.empty());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
local_tid_counter = Tx::MaxReservedLocalTID;
tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr"));
@ -241,12 +246,12 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
assert(!entries_list.empty());
chassert(!entries_list.empty());
std::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
loadEntries(it, entries_list.end());
assert(last_loaded_entry == entries_list.back());
assert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(last_loaded_entry == entries_list.back());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
latest_snapshot.notify_all();
}
@ -396,7 +401,7 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
if (!txn->rollback())
{
/// Transaction was cancelled concurrently, it's already rolled back.
assert(txn->csn == Tx::RolledBackCSN);
chassert(txn->csn == Tx::RolledBackCSN);
return;
}
@ -438,8 +443,8 @@ CSN TransactionLog::getCSN(const TIDHash & tid)
CSN TransactionLog::getCSNImpl(const TIDHash & tid_hash) const
{
assert(tid_hash);
assert(tid_hash != Tx::EmptyTID.getHash());
chassert(tid_hash);
chassert(tid_hash != Tx::EmptyTID.getHash());
std::lock_guard lock{mutex};
auto it = tid_to_csn.find(tid_hash);
@ -467,6 +472,8 @@ CSN TransactionLog::getOldestSnapshot() const
std::lock_guard lock{running_list_mutex};
if (snapshots_in_use.empty())
return getLatestSnapshot();
chassert(running_list.size() == snapshots_in_use.size());
chassert(snapshots_in_use.size() < 2 || snapshots_in_use.front() <= *++snapshots_in_use.begin());
return snapshots_in_use.front();
}

View File

@ -88,8 +88,8 @@ void VersionMetadata::lockRemovalTID(const TransactionID & tid, const Transactio
bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
{
assert(!tid.isEmpty());
assert(!creation_tid.isEmpty());
chassert(!tid.isEmpty());
chassert(!creation_tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash expected_removal_lock_value = 0;
bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value);
@ -115,7 +115,7 @@ bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const Transac
void VersionMetadata::unlockRemovalTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
assert(!tid.isEmpty());
chassert(!tid.isEmpty());
TIDHash removal_lock_value = tid.getHash();
TIDHash locked_by = removal_tid_lock.load();
@ -145,7 +145,7 @@ bool VersionMetadata::isRemovalTIDLocked() const
void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context)
{
/// NOTE ReplicatedMergeTreeSink may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
chassert(creation_tid.isEmpty() || creation_tid == tid);
creation_tid = tid;
if (context)
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, *context);
@ -158,7 +158,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
{
assert(!creation_tid.isEmpty());
chassert(!creation_tid.isEmpty());
CSN creation = creation_csn.load(std::memory_order_relaxed);
TIDHash removal_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN removal = removal_csn.load(std::memory_order_relaxed);
@ -166,10 +166,10 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
[[maybe_unused]] bool had_creation_csn = creation;
[[maybe_unused]] bool had_removal_tid = removal_lock;
[[maybe_unused]] bool had_removal_csn = removal;
assert(!had_removal_csn || had_removal_tid);
assert(!had_removal_csn || had_creation_csn);
assert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
assert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
chassert(!had_removal_csn || had_removal_tid);
chassert(!had_removal_csn || had_creation_csn);
chassert(creation == Tx::UnknownCSN || creation == Tx::PrehistoricCSN || Tx::MaxReservedCSN < creation);
chassert(removal == Tx::UnknownCSN || removal == Tx::PrehistoricCSN || Tx::MaxReservedCSN < removal);
/// Special snapshot for introspection purposes
if (unlikely(snapshot_version == Tx::EverythingVisibleCSN))
@ -204,8 +204,8 @@ bool VersionMetadata::isVisible(CSN snapshot_version, TransactionID current_tid)
/// Data part has creation_tid/removal_tid, but does not have creation_csn/removal_csn.
/// It means that some transaction is creating/removing the part right now or has done it recently
/// and we don't know if it was already committed or not.
assert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
assert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
chassert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
chassert(current_tid.isEmpty() || (creation_tid != current_tid && removal_lock != current_tid.getHash()));
/// Before doing CSN lookup, let's check some extra conditions.
/// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid
@ -347,8 +347,8 @@ void VersionMetadata::write(WriteBuffer & buf) const
if (removal_tid_lock)
{
assert(!removal_tid.isEmpty());
assert(removal_tid.getHash() == removal_tid_lock);
chassert(!removal_tid.isEmpty());
chassert(removal_tid.getHash() == removal_tid_lock);
writeRemovalTID(buf);
writeCSN(buf, REMOVAL, /* internal */ true);
}
@ -384,7 +384,7 @@ void VersionMetadata::read(ReadBuffer & buf)
if (name == CREATION_CSN_STR)
{
assert(!creation_csn);
chassert(!creation_csn);
creation_csn = read_csn();
}
else if (name == REMOVAL_TID_STR)
@ -398,7 +398,7 @@ void VersionMetadata::read(ReadBuffer & buf)
{
if (removal_tid.isEmpty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found removal_csn in metadata file, but removal_tid is {}", removal_tid);
assert(!removal_csn);
chassert(!removal_csn);
removal_csn = read_csn();
}
else

View File

@ -1282,12 +1282,12 @@ void IMergeTreeDataPart::storeVersionMetadata() const
void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const
{
assert(!version.creation_tid.isEmpty());
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
assert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
assert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_tid.isPrehistoric()));
chassert(!(which_csn == VersionMetadata::WhichCSN::CREATION && version.creation_csn == 0));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && (version.removal_tid.isPrehistoric() || version.removal_tid.isEmpty())));
chassert(!(which_csn == VersionMetadata::WhichCSN::REMOVAL && version.removal_csn == 0));
chassert(isStoredOnDisk());
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
@ -1303,10 +1303,10 @@ void IMergeTreeDataPart::appendCSNToVersionMetadata(VersionMetadata::WhichCSN wh
void IMergeTreeDataPart::appendRemovalTIDToVersionMetadata(bool clear) const
{
assert(!version.creation_tid.isEmpty());
assert(version.removal_csn == 0);
assert(!version.removal_tid.isEmpty());
assert(isStoredOnDisk());
chassert(!version.creation_tid.isEmpty());
chassert(version.removal_csn == 0);
chassert(!version.removal_tid.isEmpty());
chassert(isStoredOnDisk());
if (version.creation_tid.isPrehistoric() && !clear)
{

View File

@ -1364,7 +1364,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Check if CSNs were witten after committing transaction, update and write if needed.
bool version_updated = false;
assert(!version.creation_tid.isEmpty());
chassert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
{
auto min = TransactionLog::getCSN(version.creation_tid);

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mt";
$CLICKHOUSE_CLIENT --query "CREATE TABLE mt (n Int64) ENGINE=MergeTree ORDER BY n SETTINGS old_parts_lifetime=0";
function begin_commit_readonly()
{
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
COMMIT;";
}
function begin_rollback_readonly()
{
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
ROLLBACK;";
}
function begin_insert_commit()
{
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO mt VALUES ($RANDOM);
COMMIT;";
}
function introspection()
{
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.transactions FORMAT Null"
$CLICKHOUSE_CLIENT -q "SELECT transactionLatestSnapshot(), transactionOldestSnapshot() FORMAT Null"
}
export -f begin_commit_readonly
export -f begin_rollback_readonly
export -f begin_insert_commit
export -f introspection
TIMEOUT=20
clickhouse_client_loop_timeout $TIMEOUT begin_commit_readonly &
clickhouse_client_loop_timeout $TIMEOUT begin_rollback_readonly &
clickhouse_client_loop_timeout $TIMEOUT begin_insert_commit &
clickhouse_client_loop_timeout $TIMEOUT introspection &
wait
$CLICKHOUSE_CLIENT --query "DROP TABLE mt";