review fixes, write tid into mutation entry

This commit is contained in:
Alexander Tokmakov 2022-01-28 20:47:37 +03:00
parent e3b1397001
commit e0304c2a58
21 changed files with 247 additions and 191 deletions

View File

@ -1764,8 +1764,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& processing_stage == QueryProcessingStage::FetchColumns
&& query_analyzer->hasAggregation()
&& (query_analyzer->aggregates().size() == 1)
&& typeid_cast<const AggregateFunctionCount *>(query_analyzer->aggregates()[0].function.get())
&& !context->getCurrentTransaction();
&& typeid_cast<const AggregateFunctionCount *>(query_analyzer->aggregates()[0].function.get());
if (optimize_trivial_count)
{
@ -1773,7 +1772,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
const auto & func = desc.function;
std::optional<UInt64> num_rows{};
if (!query.prewhere() && !query.where())
if (!query.prewhere() && !query.where() && !context->getCurrentTransaction())
{
num_rows = storage->totalRows(settings);
}

View File

@ -33,7 +33,7 @@ void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPart
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
new_part->versions.setMinTID(tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
new_part->version.setCreationTID(tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
if (txn)
txn->addNewPart(storage, new_part);
}
@ -42,7 +42,7 @@ void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataP
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
TransactionInfoContext context{storage->getStorageID(), part_to_remove->name};
part_to_remove->versions.lockMaxTID(tid, context);
part_to_remove->version.lockMaxTID(tid, context);
if (txn)
txn->removeOldPart(storage, part_to_remove);
}
@ -52,7 +52,7 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
TransactionInfoContext context{storage->getStorageID(), new_part->name};
new_part->versions.setMinTID(tid, context);
new_part->version.setCreationTID(tid, context);
if (txn)
txn->addNewPart(storage, new_part);
@ -60,7 +60,7 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage
for (const auto & covered : covered_parts)
{
context.part_name = covered->name;
covered->versions.lockMaxTID(tid, context);
covered->version.lockMaxTID(tid, context);
if (txn)
txn->removeOldPart(storage, covered);
}
@ -123,13 +123,13 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
assert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
{
part->versions.mincsn.store(csn);
part->version.creation_csn.store(csn);
part->storeVersionMetadata();
}
for (const auto & part : removing_parts)
{
part->versions.maxcsn.store(csn);
part->version.removal_csn.store(csn);
part->storeVersionMetadata();
}
}
@ -146,17 +146,17 @@ bool MergeTreeTransaction::rollback() noexcept
table_and_mutation.first->killMutation(table_and_mutation.second);
for (const auto & part : creating_parts)
part->versions.mincsn.store(Tx::RolledBackCSN);
part->version.creation_csn.store(Tx::RolledBackCSN);
for (const auto & part : removing_parts) /// TODO update metadata file
part->versions.unlockMaxTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
part->version.unlockMaxTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
/// FIXME const_cast
for (const auto & part : creating_parts)
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(nullptr, {part}, true);
for (const auto & part : removing_parts)
if (part->versions.getMinTID() != tid)
if (part->version.getCreationTID() != tid)
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
return true;
@ -180,9 +180,9 @@ String MergeTreeTransaction::dumpDescription() const
for (const auto & part : removing_parts)
{
res += part->name;
res += fmt::format(" (created by {}, {})\n", part->versions.getMinTID(), part->versions.mincsn);
assert(!part->versions.mincsn || part->versions.mincsn <= snapshot);
assert(!part->versions.maxcsn);
res += fmt::format(" (created by {}, {})\n", part->version.getCreationTID(), part->version.creation_csn);
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
assert(!part->version.removal_csn);
}
return res;

View File

@ -47,9 +47,9 @@ VersionMetadata::VersionMetadata()
}
/// It can be used for introspection purposes only
TransactionID VersionMetadata::getMaxTID() const
TransactionID VersionMetadata::getRemovalTID() const
{
TIDHash max_lock = maxtid_lock.load();
TIDHash max_lock = removal_tid_lock.load();
if (max_lock)
{
if (auto txn = TransactionLog::instance().tryGetRunningTransaction(max_lock))
@ -58,10 +58,10 @@ TransactionID VersionMetadata::getMaxTID() const
return Tx::PrehistoricTID;
}
if (maxcsn.load(std::memory_order_relaxed))
if (removal_csn.load(std::memory_order_relaxed))
{
/// maxtid cannot be changed since we have maxcsn, so it's readonly
return maxtid;
/// removal_tid cannot be changed since we have removal_csn, so it's readonly
return removal_tid;
}
return Tx::EmptyTID;
@ -69,7 +69,7 @@ TransactionID VersionMetadata::getMaxTID() const
void VersionMetadata::lockMaxTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Trying to lock maxtid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
LOG_TEST(log, "Trying to lock removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
TIDHash locked_by = 0;
if (tryLockMaxTID(tid, context, &locked_by))
return;
@ -83,7 +83,7 @@ void VersionMetadata::lockMaxTID(const TransactionID & tid, const TransactionInf
"Serialization error: "
"Transaction {} tried to remove data part {} from {}, "
"but it's locked by another transaction (TID: {}, TIDH: {}) which is currently removing this part.",
tid, part_desc, context.table.getNameForLogs(), getMaxTID(), locked_by);
tid, part_desc, context.table.getNameForLogs(), getRemovalTID(), locked_by);
}
bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id)
@ -91,14 +91,14 @@ bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const Transaction
assert(!tid.isEmpty());
TIDHash max_lock_value = tid.getHash();
TIDHash expected_max_lock_value = 0;
bool locked = maxtid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value);
bool locked = removal_tid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value);
if (!locked)
{
if (tid == Tx::PrehistoricTID && expected_max_lock_value == Tx::PrehistoricTID.getHash())
{
/// Don't need to lock part for queries without transaction
//FIXME Transactions: why is it possible?
LOG_TEST(log, "Assuming maxtid is locked by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
LOG_TEST(log, "Assuming removal_tid is locked by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
return true;
}
@ -107,30 +107,30 @@ bool VersionMetadata::tryLockMaxTID(const TransactionID & tid, const Transaction
return false;
}
maxtid = tid;
removal_tid = tid;
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::LOCK_PART, tid, context);
return true;
}
void VersionMetadata::unlockMaxTID(const TransactionID & tid, const TransactionInfoContext & context)
{
LOG_TEST(log, "Unlocking maxtid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
LOG_TEST(log, "Unlocking removal_tid by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
assert(!tid.isEmpty());
TIDHash max_lock_value = tid.getHash();
TIDHash locked_by = maxtid_lock.load();
TIDHash locked_by = removal_tid_lock.load();
auto throw_cannot_unlock = [&]()
{
auto locked_by_txn = TransactionLog::instance().tryGetRunningTransaction(locked_by);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unlock maxtid, it's a bug. Current: {} {}, actual: {} {}",
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unlock removal_tid, it's a bug. Current: {} {}, actual: {} {}",
max_lock_value, tid, locked_by, locked_by_txn ? locked_by_txn->tid : Tx::EmptyTID);
};
if (locked_by != max_lock_value)
throw_cannot_unlock();
maxtid = Tx::EmptyTID;
bool unlocked = maxtid_lock.compare_exchange_strong(locked_by, 0);
removal_tid = Tx::EmptyTID;
bool unlocked = removal_tid_lock.compare_exchange_strong(locked_by, 0);
if (!unlocked)
throw_cannot_unlock();
@ -139,15 +139,15 @@ void VersionMetadata::unlockMaxTID(const TransactionID & tid, const TransactionI
bool VersionMetadata::isMaxTIDLocked() const
{
return maxtid_lock.load() != 0;
return removal_tid_lock.load() != 0;
}
void VersionMetadata::setMinTID(const TransactionID & tid, const TransactionInfoContext & context)
void VersionMetadata::setCreationTID(const TransactionID & tid, const TransactionInfoContext & context)
{
/// TODO Transactions: initialize it in constructor on part creation and remove this method
/// FIXME ReplicatedMergeTreeBlockOutputStream may add one part multiple times
assert(mintid.isEmpty() || mintid == tid);
const_cast<TransactionID &>(mintid) = tid;
assert(creation_tid.isEmpty() || creation_tid == tid);
const_cast<TransactionID &>(creation_tid) = tid;
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, context);
}
@ -159,18 +159,18 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current_tid)
{
assert(!mintid.isEmpty());
CSN min = mincsn.load(std::memory_order_relaxed);
TIDHash max_lock = maxtid_lock.load(std::memory_order_relaxed);
CSN max = maxcsn.load(std::memory_order_relaxed);
assert(!creation_tid.isEmpty());
CSN min = creation_csn.load(std::memory_order_relaxed);
TIDHash max_lock = removal_tid_lock.load(std::memory_order_relaxed);
CSN max = removal_csn.load(std::memory_order_relaxed);
//LOG_TEST(log, "Checking if mintid {} mincsn {} maxtidhash {} maxcsn {} visible for {} {}", mintid, min, max_lock, max, snapshot_version, current_tid);
//LOG_TEST(log, "Checking if creation_tid {} creation_csn {} removal_tidhash {} removal_csn {} visible for {} {}", creation_tid, min, max_lock, max, snapshot_version, current_tid);
[[maybe_unused]] bool had_mincsn = min;
[[maybe_unused]] bool had_maxtid = max_lock;
[[maybe_unused]] bool had_maxcsn = max;
assert(!had_maxcsn || had_maxtid);
assert(!had_maxcsn || had_mincsn);
[[maybe_unused]] bool had_creation_csn = min;
[[maybe_unused]] bool had_removal_tid = max_lock;
[[maybe_unused]] bool had_removal_csn = max;
assert(!had_removal_csn || had_removal_tid);
assert(!had_removal_csn || had_creation_csn);
assert(min == Tx::UnknownCSN || min == Tx::PrehistoricCSN || Tx::MaxReservedCSN < min);
assert(max == Tx::UnknownCSN || max == Tx::PrehistoricCSN || Tx::MaxReservedCSN < max);
@ -195,44 +195,44 @@ bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current
return true;
if (min && min <= snapshot_version && max && snapshot_version < max)
return true;
if (!current_tid.isEmpty() && mintid == current_tid)
if (!current_tid.isEmpty() && creation_tid == current_tid)
return true;
/// End of fast path.
/// Data part has mintid/maxtid, but does not have mincsn/maxcsn.
/// Data part has creation_tid/removal_tid, but does not have creation_csn/removal_csn.
/// It means that some transaction is creating/removing the part right now or has done it recently
/// and we don't know if it was already committed or not.
assert(!had_mincsn || (had_maxtid && !had_maxcsn));
assert(current_tid.isEmpty() || (mintid != current_tid && max_lock != current_tid.getHash()));
assert(!had_creation_csn || (had_removal_tid && !had_removal_csn));
assert(current_tid.isEmpty() || (creation_tid != current_tid && max_lock != current_tid.getHash()));
/// Before doing CSN lookup, let's check some extra conditions.
/// If snapshot_version <= some_tid.start_csn, then changes of the transaction with some_tid
/// are definitely not visible for us (because the transaction can be committed with greater CSN only),
/// so we don't need to check if it was committed.
if (snapshot_version <= mintid.start_csn)
if (snapshot_version <= creation_tid.start_csn)
return false;
/// Check if mintid/maxtid transactions are committed and write CSNs
/// Check if creation_tid/removal_tid transactions are committed and write CSNs
/// TODO Transactions: we probably need some optimizations here
/// to avoid some CSN lookups or make the lookups cheaper.
/// NOTE: Old enough committed parts always have written CSNs,
/// so we can determine their visibility through fast path.
/// But for long-running writing transactions we will always do
/// CNS lookup and get 0 (UnknownCSN) until the transaction is committer/rolled back.
min = TransactionLog::instance().getCSN(mintid);
min = TransactionLog::instance().getCSN(creation_tid);
if (!min)
return false; /// Part creation is not committed yet
/// We don't need to check if CSNs are already written or not,
/// because once written CSN cannot be changed, so it's safe to overwrite it (with the same value).
mincsn.store(min, std::memory_order_relaxed);
creation_csn.store(min, std::memory_order_relaxed);
if (max_lock)
{
max = TransactionLog::instance().getCSN(max_lock);
if (max)
maxcsn.store(max, std::memory_order_relaxed);
removal_csn.store(max, std::memory_order_relaxed);
}
return min <= snapshot_version && (!max || snapshot_version < max);
@ -240,7 +240,7 @@ bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current
bool VersionMetadata::canBeRemoved(Snapshot oldest_snapshot_version)
{
CSN min = mincsn.load(std::memory_order_relaxed);
CSN min = creation_csn.load(std::memory_order_relaxed);
/// We can safely remove part if its creation was rolled back
if (min == Tx::RolledBackCSN)
return true;
@ -248,9 +248,9 @@ bool VersionMetadata::canBeRemoved(Snapshot oldest_snapshot_version)
if (!min)
{
/// Cannot remove part if its creation not committed yet
min = TransactionLog::instance().getCSN(mintid);
min = TransactionLog::instance().getCSN(creation_tid);
if (min)
mincsn.store(min, std::memory_order_relaxed);
creation_csn.store(min, std::memory_order_relaxed);
else
return false;
}
@ -259,18 +259,18 @@ bool VersionMetadata::canBeRemoved(Snapshot oldest_snapshot_version)
if (oldest_snapshot_version < min)
return false;
TIDHash max_lock = maxtid_lock.load(std::memory_order_relaxed);
TIDHash max_lock = removal_tid_lock.load(std::memory_order_relaxed);
/// Part is active
if (!max_lock)
return false;
CSN max = maxcsn.load(std::memory_order_relaxed);
CSN max = removal_csn.load(std::memory_order_relaxed);
if (!max)
{
/// Part removal is not committed yet
max = TransactionLog::instance().getCSN(max_lock);
if (max)
maxcsn.store(max, std::memory_order_relaxed);
removal_csn.store(max, std::memory_order_relaxed);
else
return false;
}
@ -279,26 +279,31 @@ bool VersionMetadata::canBeRemoved(Snapshot oldest_snapshot_version)
return max <= oldest_snapshot_version;
}
#define CREATION_TID_STR "creation_tid: "
#define CREATION_CSN_STR "creation_csn: "
#define REMOVAL_TID_STR "removal_tid: "
#define REMOVAL_CSN_STR "removal_csn: "
void VersionMetadata::write(WriteBuffer & buf) const
{
writeCString("version: 1", buf);
writeCString("\nmintid: ", buf);
TransactionID::write(mintid, buf);
if (CSN min = mincsn.load())
writeCString("\n" CREATION_TID_STR, buf);
TransactionID::write(creation_tid, buf);
if (CSN min = creation_csn.load())
{
writeCString("\nmincsn: ", buf);
writeCString("\n" CREATION_CSN_STR, buf);
writeText(min, buf);
}
if (maxtid_lock)
if (removal_tid_lock)
{
assert(!maxtid.isEmpty());
assert(maxtid.getHash() == maxtid_lock);
writeCString("\nmaxtid: ", buf);
TransactionID::write(maxtid, buf);
if (CSN max = maxcsn.load())
assert(!removal_tid.isEmpty());
assert(removal_tid.getHash() == removal_tid_lock);
writeCString("\n" REMOVAL_TID_STR, buf);
TransactionID::write(removal_tid, buf);
if (CSN max = removal_csn.load())
{
writeCString("\nmaxcsn: ", buf);
writeCString("\n" REMOVAL_CSN_STR, buf);
writeText(max, buf);
}
}
@ -306,23 +311,27 @@ void VersionMetadata::write(WriteBuffer & buf) const
void VersionMetadata::read(ReadBuffer & buf)
{
constexpr size_t size = sizeof(CREATION_TID_STR) - 1;
static_assert(sizeof(CREATION_CSN_STR) - 1 == size);
static_assert(sizeof(REMOVAL_TID_STR) - 1 == size);
static_assert(sizeof(REMOVAL_CSN_STR) - 1 == size);
assertString("version: 1", buf);
assertString("\nmintid: ", buf);
mintid = TransactionID::read(buf);
assertString("\n" CREATION_TID_STR, buf);
creation_tid = TransactionID::read(buf);
if (buf.eof())
return;
String name;
constexpr size_t size = 8;
name.resize(size);
assertChar('\n', buf);
buf.readStrict(name.data(), size);
if (name == "mincsn: ")
if (name == CREATION_CSN_STR)
{
UInt64 min;
readText(min, buf);
mincsn = min;
creation_csn = min;
if (buf.eof())
return;
@ -330,10 +339,10 @@ void VersionMetadata::read(ReadBuffer & buf)
buf.readStrict(name.data(), size);
}
if (name == "maxtid: ")
if (name == REMOVAL_TID_STR)
{
maxtid = TransactionID::read(buf);
maxtid_lock = maxtid.getHash();
removal_tid = TransactionID::read(buf);
removal_tid_lock = removal_tid.getHash();
if (buf.eof())
return;
@ -341,13 +350,13 @@ void VersionMetadata::read(ReadBuffer & buf)
buf.readStrict(name.data(), size);
}
if (name == "maxcsn: ")
if (name == REMOVAL_CSN_STR)
{
if (maxtid.isEmpty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found maxcsn in metadata file, but maxtid is {}", maxtid);
if (removal_tid.isEmpty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Found removal_csn in metadata file, but removal_tid is {}", removal_tid);
UInt64 max;
readText(max, buf);
maxcsn = max;
removal_csn = max;
}
assertEOF(buf);

View File

@ -21,19 +21,19 @@ struct TransactionInfoContext
struct VersionMetadata
{
TransactionID mintid = Tx::EmptyTID;
TransactionID maxtid = Tx::EmptyTID;
TransactionID creation_tid = Tx::EmptyTID;
TransactionID removal_tid = Tx::EmptyTID;
std::atomic<TIDHash> maxtid_lock = 0;
std::atomic<TIDHash> removal_tid_lock = 0;
std::atomic<CSN> mincsn = Tx::UnknownCSN;
std::atomic<CSN> maxcsn = Tx::UnknownCSN;
std::atomic<CSN> creation_csn = Tx::UnknownCSN;
std::atomic<CSN> removal_csn = Tx::UnknownCSN;
bool isVisible(const MergeTreeTransaction & txn);
bool isVisible(Snapshot snapshot_version, TransactionID current_tid = Tx::EmptyTID);
TransactionID getMinTID() const { return mintid; }
TransactionID getMaxTID() const;
TransactionID getCreationTID() const { return creation_tid; }
TransactionID getRemovalTID() const;
bool tryLockMaxTID(const TransactionID & tid, const TransactionInfoContext & context, TIDHash * locked_by_id = nullptr);
void lockMaxTID(const TransactionID & tid, const TransactionInfoContext & context);
@ -42,7 +42,7 @@ struct VersionMetadata
bool isMaxTIDLocked() const;
/// It can be called only from MergeTreeTransaction or on server startup
void setMinTID(const TransactionID & tid, const TransactionInfoContext & context);
void setCreationTID(const TransactionID & tid, const TransactionInfoContext & context);
bool canBeRemoved(Snapshot oldest_snapshot_version);

View File

@ -1104,8 +1104,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
void IMergeTreeDataPart::storeVersionMetadata() const
{
assert(!versions.mintid.isEmpty());
if (versions.mintid.isPrehistoric() && (versions.maxtid.isEmpty() || versions.maxtid.isPrehistoric()))
assert(!version.creation_tid.isEmpty());
if (version.creation_tid.isPrehistoric() && (version.removal_tid.isEmpty() || version.removal_tid.isPrehistoric()))
return;
String version_file_name = fs::path(getFullRelativePath()) / TXN_VERSION_METADATA_FILE_NAME;
@ -1113,7 +1113,7 @@ void IMergeTreeDataPart::storeVersionMetadata() const
DiskPtr disk = volume->getDisk();
{
auto out = volume->getDisk()->writeFile(tmp_version_file_name, 4096, WriteMode::Rewrite);
versions.write(*out);
version.write(*out);
out->finalize();
out->sync();
}
@ -1145,7 +1145,7 @@ try
if (disk->exists(version_file_name))
{
auto buf = openForReading(disk, version_file_name);
versions.read(*buf);
version.read(*buf);
if (disk->exists(tmp_version_file_name))
remove_tmp_file();
return;
@ -1168,16 +1168,16 @@ try
/// We do not have version metadata and transactions history for old parts,
/// so let's consider that such parts were created by some ancient transaction
/// and were committed with some prehistoric CSN.
versions.setMinTID(Tx::PrehistoricTID, txn_context);
versions.mincsn = Tx::PrehistoricCSN;
version.setCreationTID(Tx::PrehistoricTID, txn_context);
version.creation_csn = Tx::PrehistoricCSN;
return;
}
/// Case 2.
/// Content of *.tmp file may be broken, just use fake TID.
/// Transaction was not committed if *.tmp file was not renamed, so we should complete rollback by removing part.
versions.setMinTID(Tx::DummyTID, txn_context);
versions.mincsn = Tx::RolledBackCSN;
version.setCreationTID(Tx::DummyTID, txn_context);
version.creation_csn = Tx::RolledBackCSN;
remove_tmp_file();
}
catch (Exception & e)

View File

@ -317,7 +317,7 @@ public:
CompressionCodecPtr default_codec;
mutable VersionMetadata versions;
mutable VersionMetadata version;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;

View File

@ -1314,55 +1314,55 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
const DataPartPtr & part = *iter;
part->loadVersionMetadata();
VersionMetadata & versions = part->versions;
VersionMetadata & versions = part->version;
/// Check if CSNs were witten after committing transaction, update and write if needed.
bool versions_updated = false;
if (!versions.mintid.isEmpty() && !part->versions.mincsn)
if (!versions.creation_tid.isEmpty() && !part->version.creation_csn)
{
auto min = TransactionLog::instance().getCSN(versions.mintid);
auto min = TransactionLog::instance().getCSN(versions.creation_tid);
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
min = Tx::RolledBackCSN;
}
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has mintid={}, setting mincsn={}",
part->name, versions.mintid, min);
versions.mincsn = min;
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}",
part->name, versions.creation_tid, min);
versions.creation_csn = min;
versions_updated = true;
}
if (!versions.maxtid.isEmpty() && !part->versions.maxcsn)
if (!versions.removal_tid.isEmpty() && !part->version.removal_csn)
{
auto max = TransactionLog::instance().getCSN(versions.maxtid);
auto max = TransactionLog::instance().getCSN(versions.removal_tid);
if (max)
{
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has maxtid={}, setting maxcsn={}",
part->name, versions.maxtid, max);
versions.maxcsn = max;
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}",
part->name, versions.removal_tid, max);
versions.removal_csn = max;
}
else
{
/// Transaction that tried to remove this part was not committed. Clear maxtid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing maxtid={}",
part->name, versions.maxtid);
versions.unlockMaxTID(versions.maxtid, TransactionInfoContext{getStorageID(), part->name});
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
part->name, versions.removal_tid);
versions.unlockMaxTID(versions.removal_tid, TransactionInfoContext{getStorageID(), part->name});
}
versions_updated = true;
}
/// Sanity checks
bool csn_order = !versions.maxcsn || versions.mincsn <= versions.maxcsn;
bool min_start_csn_order = versions.mintid.start_csn <= versions.mincsn;
bool max_start_csn_order = versions.maxtid.start_csn <= versions.maxcsn;
bool mincsn_known = versions.mincsn;
if (!csn_order || !min_start_csn_order || !max_start_csn_order || !mincsn_known)
bool csn_order = !versions.removal_csn || versions.creation_csn <= versions.removal_csn;
bool min_start_csn_order = versions.creation_tid.start_csn <= versions.creation_csn;
bool max_start_csn_order = versions.removal_tid.start_csn <= versions.removal_csn;
bool creation_csn_known = versions.creation_csn;
if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid versions metadata: {}", part->name, versions.toString());
if (versions_updated)
part->storeVersionMetadata();
/// Deactivate part if creation was not committed or if removal was.
if (versions.mincsn == Tx::RolledBackCSN || versions.maxcsn)
if (versions.creation_csn == Tx::RolledBackCSN || versions.removal_csn)
{
auto next_it = std::next(iter);
deactivate_part(iter);
@ -1527,15 +1527,19 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
const DataPartPtr & part = *it;
/// Do not remove outdated part if it may be visible for some transaction
if (!part->versions.canBeRemoved(TransactionLog::instance().getOldestSnapshot()))
if (!part->version.canBeRemoved(TransactionLog::instance().getOldestSnapshot()))
continue;
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
((part_remove_time < now &&
now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds()) || force
|| isInMemoryPart(part))) /// Remove in-memory parts immediately to not store excessive data in RAM
/// Grab only parts that are not used by anyone (SELECTs for example).
if (!part.unique())
continue;
if ((part_remove_time < now && now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds())
|| force
|| isInMemoryPart(part) /// Remove in-memory parts immediately to not store excessive data in RAM
|| (part->version.creation_csn == Tx::RolledBackCSN && getSettings()->remove_rolled_back_parts_immediately))
{
parts_to_delete.emplace_back(it);
}
@ -1626,7 +1630,7 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
return;
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVector();
DataPartsVector parts = getDataPartsVectorForInternalUsage();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
@ -1693,7 +1697,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
size_t MergeTreeData::clearOldWriteAheadLogs()
{
DataPartsVector parts = getDataPartsVector();
DataPartsVector parts = getDataPartsVectorForInternalUsage();
std::vector<std::pair<Int64, Int64>> all_block_numbers_on_disk;
std::vector<std::pair<Int64, Int64>> block_numbers_on_disk;
@ -1756,14 +1760,14 @@ size_t MergeTreeData::clearEmptyParts()
return 0;
size_t cleared_count = 0;
auto parts = getDataPartsVector();
auto parts = getDataPartsVectorForInternalUsage();
for (const auto & part : parts)
{
if (part->rows_count != 0)
continue;
/// Do not try to drop empty part if it's locked by some transaction and do not try to drop uncommitted parts.
if (part->versions.maxtid_lock.load() || !part->versions.isVisible(TransactionLog::instance().getLatestSnapshot()))
/// Do not try to drop uncommitted parts.
if (!part->version.isVisible(TransactionLog::instance().getLatestSnapshot()))
continue;
LOG_TRACE(log, "Will drop empty part {}", part->name);
@ -2241,7 +2245,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
}
}
for (const auto & part : getDataPartsVector())
for (const auto & part : getDataPartsVectorForInternalUsage())
{
bool at_least_one_column_rest = false;
for (const auto & column : part->getColumns())
@ -2630,8 +2634,8 @@ bool MergeTreeData::renameTempPartAndReplace(
part->renameTo(part_name, true);
auto part_it = data_parts_indexes.insert(part).first;
/// FIXME Transactions: it's not the best place for checking and setting maxtid,
/// because it's too optimistic. We should lock maxtid of covered parts at the beginning of operation.
/// FIXME Transactions: it's not the best place for checking and setting removal_tid,
/// because it's too optimistic. We should lock removal_tid of covered parts at the beginning of operation.
MergeTreeTransaction::addNewPartAndRemoveCovered(shared_from_this(), part, covered_parts, txn);
if (out_transaction)
@ -2704,7 +2708,7 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
for (const DataPartPtr & part : remove)
{
if (part->versions.mincsn != Tx::RolledBackCSN)
if (part->version.creation_csn != Tx::RolledBackCSN)
MergeTreeTransaction::removeOldPart(shared_from_this(), part, txn);
if (part->getState() == IMergeTreeDataPart::State::Active)
@ -2829,7 +2833,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
if (!part->versions.isVisible(tid.start_csn, tid))
if (!part->version.isVisible(tid.start_csn, tid))
continue;
parts_to_remove.emplace_back(part);
@ -3700,7 +3704,7 @@ BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_co
{
DataPartsVector data_parts;
if (partitions.empty())
data_parts = getDataPartsVector();
data_parts = getVisibleDataPartsVector(local_context);
else
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(partitions, local_context));
return backupDataParts(data_parts);
@ -3911,12 +3915,12 @@ DataPartsVector MergeTreeData::getVisibleDataPartsVector(ContextPtr local_contex
DataPartsVector res;
if (const auto * txn = local_context->getCurrentTransaction().get())
{
res = getDataPartsVector({DataPartState::Active, DataPartState::Outdated});
res = getDataPartsVectorForInternalUsage({DataPartState::Active, DataPartState::Outdated});
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
}
else
{
res = getDataPartsVector();
res = getDataPartsVectorForInternalUsage();
}
return res;
}
@ -3926,19 +3930,19 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me
DataPartsVector res;
if (txn)
{
res = getDataPartsVector({DataPartState::Active, DataPartState::Outdated});
res = getDataPartsVectorForInternalUsage({DataPartState::Active, DataPartState::Outdated});
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
}
else
{
res = getDataPartsVector();
res = getDataPartsVectorForInternalUsage();
}
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const
{
auto res = getDataPartsVector({DataPartState::Active, DataPartState::Outdated});
auto res = getDataPartsVectorForInternalUsage({DataPartState::Active, DataPartState::Outdated});
filterVisibleDataParts(res, snapshot_version, current_tid);
return res;
}
@ -3953,7 +3957,7 @@ void MergeTreeData::filterVisibleDataParts(DataPartsVector & maybe_visible_parts
String visible_parts_str;
while (it <= it_last)
{
if ((*it)->versions.isVisible(snapshot_version, current_tid))
if ((*it)->version.isVisible(snapshot_version, current_tid))
{
visible_parts_str += (*it)->name;
visible_parts_str += " ";
@ -4002,7 +4006,7 @@ std::set<String> MergeTreeData::getPartitionIdsAffectedByCommands(
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorForInternalUsage(
const DataPartStates & affordable_states, DataPartStateVector * out_states, bool require_projection_parts) const
{
DataPartsVector res;
@ -4410,9 +4414,9 @@ MergeTreeData::DataParts MergeTreeData::getDataPartsForInternalUsage() const
return getDataParts({DataPartState::Active});
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorForInternalUsage() const
{
return getDataPartsVector({DataPartState::Active});
return getDataPartsVectorForInternalUsage({DataPartState::Active});
}
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
@ -4464,7 +4468,7 @@ void MergeTreeData::Transaction::rollback()
DataPartPtr covering_part;
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, lock);
for (auto & covered : covered_parts)
covered->versions.unlockMaxTID(Tx::PrehistoricTID, TransactionInfoContext{data.getStorageID(), covered->name});
covered->version.unlockMaxTID(Tx::PrehistoricTID, TransactionInfoContext{data.getStorageID(), covered->name});
}
}

View File

@ -433,7 +433,7 @@ public:
/// Returns sorted list of the parts with specified states
/// out_states will contain snapshot of each part state
DataPartsVector getDataPartsVector(
DataPartsVector getDataPartsVectorForInternalUsage(
const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr, bool require_projection_parts = false) const;
/// Returns absolutely all parts (and snapshot of their states)
@ -441,8 +441,7 @@ public:
/// Returns parts in Active state
DataParts getDataPartsForInternalUsage() const;
DataPartsVector getDataPartsVector() const;
DataPartsVector getDataPartsVectorForInternalUsage() const;
void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const;

View File

@ -129,7 +129,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
{
/// NOTE It will contain uncommitted parts and future parts.
/// But It's ok since merge predicate allows to include in range visible parts only.
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVectorForInternalUsage();
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();

View File

@ -61,6 +61,12 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
auto out = disk->writeFile(path_prefix + file_name);
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time) << "\n";
if (!tid.isPrehistoric())
{
*out << "tid: ";
TransactionID::write(tid, *out);
*out << "\n";
}
*out << "commands: ";
commands.writeText(*out);
*out << "\n";
@ -112,6 +118,14 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
create_time_dt.year(), create_time_dt.month(), create_time_dt.day(),
create_time_dt.hour(), create_time_dt.minute(), create_time_dt.second());
assertNotEOF(*buf);
if (*(buf->position()) == 't')
{
*buf >> "tid: ";
tid = TransactionID::read(*buf);
*buf >> "\n";
}
*buf >> "commands: ";
commands.readText(*buf);
*buf >> "\n";

View File

@ -94,7 +94,7 @@ bool MergeTreePartsMover::selectPartsForMove(
unsigned parts_to_move_by_ttl_rules = 0;
double parts_to_move_total_size_bytes = 0.0;
MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector();
MergeTreeData::DataPartsVector data_parts = data->getDataPartsVectorForInternalUsage();
if (data_parts.empty())
return false;

View File

@ -61,6 +61,7 @@ struct Settings;
M(UInt64, merge_selecting_sleep_ms, 5000, "Sleep time for merge selecting when no part selected, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
M(UInt64, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -249,7 +249,7 @@ std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr local_context) const
{
auto parts = getDataPartsVector({DataPartState::Active});
auto parts = getVisibleDataPartsVector(local_context);
return totalRowsByPartitionPredicateImpl(query_info, local_context, parts);
}
@ -587,7 +587,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
else if (txn)
{
/// Part is locked by concurrent transaction, most likely it will never be mutated
TIDHash part_locked = data_part->versions.maxtid_lock.load();
TIDHash part_locked = data_part->version.removal_tid_lock.load();
if (part_locked && part_locked != mutation_entry.tid.getHash())
{
result.latest_failed_part = data_part->name;
@ -712,6 +712,19 @@ void StorageMergeTree::loadMutations()
MergeTreeMutationEntry entry(disk, relative_data_path, it->name());
UInt64 block_number = entry.block_number;
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
if (!entry.tid.isPrehistoric())
{
if (!TransactionLog::instance().getCSN(entry.tid))
{
LOG_DEBUG(log, "Mutation entry {} was created by transaction {}, but it was not committed. Removing mutation entry",
it->name(), entry.tid);
disk->removeFile(it->path());
continue;
}
/// Transaction is committed => mutation is finished, but let's load it anyway (so it will be shown in system.mutations)
}
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
@ -756,9 +769,9 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
{
/// Cannot merge parts if some of them is not visible in current snapshot
/// TODO We can use simplified visibility rules (without CSN lookup) here
if (left && !left->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID))
if (left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false;
if (right && !right->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID))
if (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false;
}
@ -939,7 +952,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
}
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
for (const auto & part : getDataPartsVectorForInternalUsage())
{
if (currently_merging_mutating_parts.count(part))
continue;
@ -961,8 +974,6 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn = tryGetTransactionForMutation(mutations_begin_it->second, log);
/// FIXME Transactions: we should kill mutations, but cannot do it here while holding currently_processing_in_background_mutex
/// TIDs are not persistent, so it cannot happen for now
assert(txn || first_mutation_tid.isPrehistoric());
if (txn)
@ -970,7 +981,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
/// Mutate visible parts only
/// NOTE Do not mutate visible parts in Outdated state, because it does not make sense:
/// mutation will fail anyway due to serialization error.
if (!part->versions.isVisible(*txn))
if (!part->version.isVisible(*txn))
continue;
}
@ -1257,7 +1268,7 @@ std::vector<StorageMergeTree::PartVersionWithName> StorageMergeTree::getSortedPa
std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock) const
{
std::vector<PartVersionWithName> part_versions_with_names;
auto data_parts = getDataPartsVector();
auto data_parts = getDataPartsVectorForInternalUsage();
part_versions_with_names.reserve(data_parts.size());
for (const auto & part : data_parts)
part_versions_with_names.emplace_back(PartVersionWithName{
@ -1290,7 +1301,7 @@ bool StorageMergeTree::optimize(
String disable_reason;
if (!partition && final)
{
DataPartsVector data_parts = getDataPartsVector();
DataPartsVector data_parts = getVisibleDataPartsVector(local_context);
std::unordered_set<String> partition_ids;
for (const DataPartPtr & part : data_parts)
@ -1681,7 +1692,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
else
data_parts = getDataPartsVector();
data_parts = getVisibleDataPartsVector(local_context);
for (auto & part : data_parts)
{

View File

@ -3034,7 +3034,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
&& merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue)
{
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector();
DataPartsVector data_parts = getDataPartsVectorForInternalUsage();
for (const auto & part : data_parts)
{
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
@ -4418,7 +4418,7 @@ bool StorageReplicatedMergeTree::optimize(
bool assigned = false;
if (!partition && final)
{
DataPartsVector data_parts = getDataPartsVector();
DataPartsVector data_parts = getVisibleDataPartsVector(query_context);
std::unordered_set<String> partition_ids;
for (const DataPartPtr & part : data_parts)
@ -7002,7 +7002,7 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context
data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
else
data_parts = getDataPartsVector();
data_parts = getVisibleDataPartsVector(local_context);
for (auto & part : data_parts)
{

View File

@ -84,10 +84,10 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"projections", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"visible", std::make_shared<DataTypeUInt8>()},
{"mintid", getTransactionIDDataType()},
{"maxtid", getTransactionIDDataType()},
{"mincsn", std::make_shared<DataTypeUInt64>()},
{"maxcsn", std::make_shared<DataTypeUInt64>()},
{"creation_tid", getTransactionIDDataType()},
{"removal_tid", getTransactionIDDataType()},
{"creation_csn", std::make_shared<DataTypeUInt64>()},
{"removal_csn", std::make_shared<DataTypeUInt64>()},
}
)
{
@ -283,7 +283,7 @@ void StorageSystemParts::processNextStorage(
{
auto txn = context->getCurrentTransaction();
if (txn)
columns[res_index++]->insert(part->versions.isVisible(*txn));
columns[res_index++]->insert(part->version.isVisible(*txn));
else
columns[res_index++]->insert(part_state == State::Active);
}
@ -294,13 +294,13 @@ void StorageSystemParts::processNextStorage(
};
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->versions.mintid));
columns[res_index++]->insert(get_tid_as_field(part->version.creation_tid));
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->versions.getMaxTID()));
columns[res_index++]->insert(get_tid_as_field(part->version.getRemovalTID()));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->versions.mincsn.load(std::memory_order_relaxed));
columns[res_index++]->insert(part->version.creation_csn.load(std::memory_order_relaxed));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->versions.maxcsn.load(std::memory_order_relaxed));
columns[res_index++]->insert(part->version.removal_csn.load(std::memory_order_relaxed));
/// _state column should be the latest.
/// Do not use part->getState*, it can be changed from different thread

View File

@ -57,12 +57,12 @@ StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_stat
{
/// If has_state_column is requested, return all states.
if (!has_state_column)
return data->getDataPartsVector({State::Active, State::Outdated}, &state, require_projection_parts);
return data->getDataPartsVectorForInternalUsage({State::Active, State::Outdated}, &state, require_projection_parts);
return data->getAllDataPartsVector(&state, require_projection_parts);
}
return data->getDataPartsVector({State::Active}, &state, require_projection_parts);
return data->getDataPartsVectorForInternalUsage({State::Active}, &state, require_projection_parts);
}
StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context)

View File

@ -1,6 +1,9 @@
<yandex>
<_enable_experimental_mvcc_prototype_test_helper_dev>42</_enable_experimental_mvcc_prototype_test_helper_dev>
<merge_tree>
</merge_tree>
<transactions_info_log>
<database>system</database>
<table>transactions_info_log</table>

View File

@ -3,6 +3,7 @@
<merge_tree>
<old_parts_lifetime>100500</old_parts_lifetime>
<remove_rolled_back_parts_immediately>0</remove_rolled_back_parts_immediately>
</merge_tree>
<transactions_info_log>

View File

@ -27,6 +27,8 @@ def test_rollback_unfinished_on_restart(start_cluster):
# it will hold a snapshot and avoid parts cleanup
tx(0, 'begin transaction')
tx(4, 'begin transaction')
tx(1, 'begin transaction')
tid1 = tx(1, 'select transactionID()').strip()
tx(1, "alter table mt drop partition id '1'")
@ -41,38 +43,51 @@ def test_rollback_unfinished_on_restart(start_cluster):
csn1 = node.query("select csn from system.transactions_info_log where type='Commit' and tid={}".format(tid1)).strip()
csn2 = node.query("select csn from system.transactions_info_log where type='Commit' and tid={}".format(tid2)).strip()
# insert a part before starting mutation and check that it will not be mutated
tx(4, 'insert into mt values (9, 90)')
# check that uncommitted mutation will be rolled back on restart
tx(1, 'begin transaction')
tid3 = tx(1, 'select transactionID()').strip()
tx(1, 'insert into mt values (5, 50)')
tx(1, "alter table mt update m = m+n in partition id '1' where 1")
# check that uncommitted merge will be rolled back on restart
tx(2, 'begin transaction')
tid4 = tx(2, 'select transactionID()').strip()
tx(2, "optimize table mt partition id '0' final settings optimize_throw_if_noop = 1")
# check that uncommitted insert will be rolled back on restart
tx(3, 'begin transaction')
tid5 = tx(3, 'select transactionID()').strip()
tx(3, 'insert into mt values (6, 70)')
tid6 = tx(4, 'select transactionID()').strip()
tx(4, 'commit')
node.query('system flush logs')
csn6 = node.query("select csn from system.transactions_info_log where type='Commit' and tid={}".format(tid6)).strip()
node.restart_clickhouse(kill=True)
assert node.query('select *, _part from mt order by n') == '2\t20\t0_2_2_0\n3\t30\t1_3_3_0\n4\t40\t0_4_4_0\n'
res = node.query("select name, active, mintid, 'csn' || toString(mincsn), maxtid, 'csn' || toString(maxcsn) from system.parts where table='mt' order by name")
assert node.query('select *, _part from mt order by n') == '2\t20\t0_2_2_0\n3\t30\t1_3_3_0\n4\t40\t0_4_4_0\n9\t90\t1_5_5_0\n'
res = node.query("select name, active, creation_tid, 'csn' || toString(creation_csn), removal_tid, 'csn' || toString(removal_csn) from system.parts where table='mt' order by name")
res = res.replace(tid0, 'tid0')
res = res.replace(tid1, 'tid1').replace('csn' + csn1, 'csn_1')
res = res.replace(tid2, 'tid2').replace('csn' + csn2, 'csn_2')
res = res.replace(tid3, 'tid3')
res = res.replace(tid4, 'tid4')
res = res.replace(tid5, 'tid5')
res = res.replace(tid6, 'tid6').replace('csn' + csn6, 'csn_6')
assert res == "0_2_2_0\t1\ttid0\tcsn1\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"0_2_4_1\t0\ttid4\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"0_4_4_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"0_7_7_0\t0\ttid5\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"0_8_8_0\t0\ttid5\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_1_1_0\t0\ttid0\tcsn1\ttid1\tcsn_1\n" \
"1_3_3_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_3_3_0_6\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_5_5_0\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_5_5_0_6\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n"
"1_3_3_0_7\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_5_5_0\t1\ttid6\tcsn_6\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_6_6_0\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n" \
"1_6_6_0_7\t0\ttid3\tcsn18446744073709551615\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0\n"

View File

@ -1,6 +1,6 @@
drop table if exists txn_counters;
create table txn_counters (n Int64, mintid DEFAULT transactionID()) engine=MergeTree order by n;
create table txn_counters (n Int64, creation_tid DEFAULT transactionID()) engine=MergeTree order by n;
insert into txn_counters(n) values (1);
select transactionID();
@ -10,14 +10,14 @@ system stop merges txn_counters;
begin transaction;
insert into txn_counters(n) values (2);
select 1, system.parts.name, txn_counters.mintid = system.parts.mintid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 2, name, mincsn, maxtid, maxcsn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 1, system.parts.name, txn_counters.creation_tid = system.parts.creation_tid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 2, name, creation_csn, removal_tid, removal_csn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
rollback;
begin transaction;
insert into txn_counters(n) values (3);
select 3, system.parts.name, txn_counters.mintid = system.parts.mintid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 4, name, mincsn, maxtid, maxcsn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 3, system.parts.name, txn_counters.creation_tid = system.parts.creation_tid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 4, name, creation_csn, removal_tid, removal_csn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 5, transactionID().3 == serverUUID();
commit;
@ -26,8 +26,8 @@ attach table txn_counters;
begin transaction;
insert into txn_counters(n) values (4);
select 6, system.parts.name, txn_counters.mintid = system.parts.mintid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 7, name, maxtid, maxcsn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 6, system.parts.name, txn_counters.creation_tid = system.parts.creation_tid from txn_counters join system.parts on txn_counters._part = system.parts.name where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 7, name, removal_tid, removal_csn from system.parts where database=currentDatabase() and table='txn_counters' order by system.parts.name;
select 8, transactionID().3 == serverUUID();
commit;
@ -43,4 +43,4 @@ from system.transactions_info_log
where tid in (select tid from system.transactions_info_log where database=currentDatabase() and table='txn_counters' and not (tid.1=1 and tid.2=1))
or (database=currentDatabase() and table='txn_counters') order by event_time;
drop table txn_counters;
--drop table txn_counters;

File diff suppressed because one or more lines are too long