2021-03-31 17:55:04 +00:00
|
|
|
#include <Interpreters/MergeTreeTransaction.h>
|
2021-04-08 17:20:45 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2021-04-09 12:53:51 +00:00
|
|
|
#include <Interpreters/TransactionLog.h>
|
2021-03-31 17:55:04 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id)
|
|
|
|
: tid({snapshot_, local_tid_, host_id})
|
|
|
|
, snapshot(snapshot_)
|
2021-04-09 12:53:51 +00:00
|
|
|
, csn(Tx::UnknownCSN)
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
MergeTreeTransaction::State MergeTreeTransaction::getState() const
|
|
|
|
{
|
|
|
|
if (csn == Tx::UnknownCSN)
|
|
|
|
return RUNNING;
|
|
|
|
if (csn == Tx::RolledBackCSN)
|
|
|
|
return ROLLED_BACK;
|
|
|
|
return COMMITTED;
|
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
|
|
|
|
|
|
|
new_part->versions.setMinTID(tid);
|
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->addNewPart(storage, new_part);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
|
|
|
String error_context = fmt::format("Table: {}, part name: {}",
|
|
|
|
part_to_remove->storage.getStorageID().getNameForLogs(),
|
|
|
|
part_to_remove->name);
|
|
|
|
part_to_remove->versions.lockMaxTID(tid, error_context);
|
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->removeOldPart(storage, part_to_remove);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
|
|
|
|
|
|
|
new_part->versions.setMinTID(tid);
|
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->addNewPart(storage, new_part);
|
2021-04-08 17:20:45 +00:00
|
|
|
|
|
|
|
String error_context = fmt::format("Table: {}, covering part name: {}",
|
|
|
|
new_part->storage.getStorageID().getNameForLogs(),
|
|
|
|
new_part->name);
|
|
|
|
error_context += ", part_name: {}";
|
2021-06-02 20:03:44 +00:00
|
|
|
for (const auto & covered : covered_parts)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
covered->versions.lockMaxTID(tid, fmt::format(error_context, covered->name));
|
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->removeOldPart(storage, covered);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-05-17 11:14:09 +00:00
|
|
|
assert(csn == Tx::UnknownCSN);
|
2021-06-04 09:26:47 +00:00
|
|
|
storages.insert(storage);
|
2021-04-08 17:20:45 +00:00
|
|
|
creating_parts.push_back(new_part);
|
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-05-17 11:14:09 +00:00
|
|
|
assert(csn == Tx::UnknownCSN);
|
2021-06-04 09:26:47 +00:00
|
|
|
storages.insert(storage);
|
2021-04-08 17:20:45 +00:00
|
|
|
removing_parts.push_back(part_to_remove);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool MergeTreeTransaction::isReadOnly() const
|
|
|
|
{
|
|
|
|
return creating_parts.empty() && removing_parts.empty();
|
|
|
|
}
|
|
|
|
|
2021-06-02 20:03:44 +00:00
|
|
|
void MergeTreeTransaction::beforeCommit() const
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-04-09 12:53:51 +00:00
|
|
|
assert(csn == Tx::UnknownCSN);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-04-09 12:53:51 +00:00
|
|
|
assert(csn == Tx::UnknownCSN);
|
|
|
|
csn = assigned_csn;
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : creating_parts)
|
|
|
|
part->versions.mincsn.store(csn);
|
|
|
|
for (const auto & part : removing_parts)
|
|
|
|
part->versions.maxcsn.store(csn);
|
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void MergeTreeTransaction::rollback() noexcept
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-04-09 12:53:51 +00:00
|
|
|
assert(csn == Tx::UnknownCSN);
|
|
|
|
csn = Tx::RolledBackCSN;
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : creating_parts)
|
|
|
|
part->versions.mincsn.store(Tx::RolledBackCSN);
|
2021-06-04 09:26:47 +00:00
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : removing_parts)
|
|
|
|
part->versions.unlockMaxTID(tid);
|
2021-06-04 09:26:47 +00:00
|
|
|
|
|
|
|
/// FIXME const_cast
|
|
|
|
for (const auto & part : creating_parts)
|
2021-06-08 18:17:18 +00:00
|
|
|
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(nullptr, {part}, true);
|
2021-06-04 09:26:47 +00:00
|
|
|
|
|
|
|
for (const auto & part : removing_parts)
|
|
|
|
if (part->versions.getMinTID() != tid)
|
|
|
|
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
|
|
|
|
|
|
|
|
/// FIXME seems like session holds shared_ptr to Transaction and transaction holds shared_ptr to parts preventing cleanup
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2021-03-31 17:55:04 +00:00
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void MergeTreeTransaction::onException()
|
|
|
|
{
|
|
|
|
if (csn)
|
|
|
|
return;
|
|
|
|
|
|
|
|
TransactionLog::instance().rollbackTransaction(shared_from_this());
|
|
|
|
}
|
|
|
|
|
2021-05-18 17:07:29 +00:00
|
|
|
String MergeTreeTransaction::dumpDescription() const
|
|
|
|
{
|
|
|
|
String res = "\ncreating parts:\n";
|
|
|
|
for (const auto & part : creating_parts)
|
|
|
|
{
|
|
|
|
res += part->name;
|
|
|
|
res += "\n";
|
|
|
|
}
|
|
|
|
|
|
|
|
res += "removing parts:\n";
|
|
|
|
for (const auto & part : removing_parts)
|
|
|
|
{
|
|
|
|
res += part->name;
|
|
|
|
res += fmt::format(" (created by {}, {})\n", part->versions.getMinTID(), part->versions.mincsn);
|
|
|
|
assert(!part->versions.mincsn || part->versions.mincsn <= snapshot);
|
|
|
|
assert(!part->versions.maxcsn);
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|