ClickHouse/src/Interpreters/MergeTreeTransaction.cpp

186 lines
5.9 KiB
C++
Raw Normal View History

2021-03-31 17:55:04 +00:00
#include <Interpreters/MergeTreeTransaction.h>
2021-04-08 17:20:45 +00:00
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
2021-04-09 12:53:51 +00:00
#include <Interpreters/TransactionLog.h>
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-12-14 20:06:34 +00:00
namespace ErrorCodes
{
extern const int INVALID_TRANSACTION;
extern const int LOGICAL_ERROR;
}
2021-03-31 17:55:04 +00:00
MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id)
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
2021-04-09 12:53:51 +00:00
, csn(Tx::UnknownCSN)
2021-03-31 17:55:04 +00:00
{
}
2021-04-09 12:53:51 +00:00
MergeTreeTransaction::State MergeTreeTransaction::getState() const
{
2021-12-14 20:06:34 +00:00
CSN c = csn.load();
if (c == Tx::UnknownCSN || c == Tx::CommittingCSN)
2021-04-09 12:53:51 +00:00
return RUNNING;
2021-12-14 20:06:34 +00:00
if (c == Tx::RolledBackCSN)
2021-04-09 12:53:51 +00:00
return ROLLED_BACK;
return COMMITTED;
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
new_part->versions.setMinTID(tid);
if (txn)
2021-06-04 09:26:47 +00:00
txn->addNewPart(storage, new_part);
2021-04-08 17:20:45 +00:00
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
String error_context = fmt::format("Table: {}, part name: {}",
part_to_remove->storage.getStorageID().getNameForLogs(),
part_to_remove->name);
part_to_remove->versions.lockMaxTID(tid, error_context);
if (txn)
2021-06-04 09:26:47 +00:00
txn->removeOldPart(storage, part_to_remove);
2021-04-08 17:20:45 +00:00
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
new_part->versions.setMinTID(tid);
if (txn)
2021-06-04 09:26:47 +00:00
txn->addNewPart(storage, new_part);
2021-04-08 17:20:45 +00:00
String error_context = fmt::format("Table: {}, covering part name: {}",
new_part->storage.getStorageID().getNameForLogs(),
new_part->name);
error_context += ", part_name: {}";
2021-06-02 20:03:44 +00:00
for (const auto & covered : covered_parts)
2021-04-08 17:20:45 +00:00
{
covered->versions.lockMaxTID(tid, fmt::format(error_context, covered->name));
if (txn)
2021-06-04 09:26:47 +00:00
txn->removeOldPart(storage, covered);
2021-04-08 17:20:45 +00:00
}
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
2021-04-08 17:20:45 +00:00
{
2021-12-14 20:06:34 +00:00
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
2021-06-04 09:26:47 +00:00
storages.insert(storage);
2021-04-08 17:20:45 +00:00
creating_parts.push_back(new_part);
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove)
2021-04-08 17:20:45 +00:00
{
2021-12-14 20:06:34 +00:00
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
2021-12-15 21:49:06 +00:00
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");//FIXME
2021-12-14 20:06:34 +00:00
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
2021-06-04 09:26:47 +00:00
storages.insert(storage);
2021-04-08 17:20:45 +00:00
removing_parts.push_back(part_to_remove);
}
2021-12-14 20:06:34 +00:00
void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & mutation_id)
{
mutations.emplace_back(table, mutation_id);
}
2021-04-08 17:20:45 +00:00
bool MergeTreeTransaction::isReadOnly() const
{
return creating_parts.empty() && removing_parts.empty();
}
2021-12-14 20:06:34 +00:00
void MergeTreeTransaction::beforeCommit()
2021-04-08 17:20:45 +00:00
{
2021-12-14 20:06:34 +00:00
for (const auto & table_and_mutation : mutations)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
CSN expected = Tx::UnknownCSN;
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
if (can_commit)
return;
if (expected == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
2021-04-08 17:20:45 +00:00
}
2021-04-09 12:53:51 +00:00
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
2021-04-08 17:20:45 +00:00
{
2021-12-14 20:06:34 +00:00
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
assert(prev_value == Tx::CommittingCSN);
2021-04-08 17:20:45 +00:00
for (const auto & part : creating_parts)
part->versions.mincsn.store(csn);
for (const auto & part : removing_parts)
part->versions.maxcsn.store(csn);
}
2021-12-14 20:06:34 +00:00
bool MergeTreeTransaction::rollback() noexcept
2021-04-08 17:20:45 +00:00
{
2021-12-14 20:06:34 +00:00
CSN expected = Tx::UnknownCSN;
bool need_rollback = csn.compare_exchange_strong(expected, Tx::RolledBackCSN);
if (!need_rollback)
return false;
for (const auto & table_and_mutation : mutations)
table_and_mutation.first->killMutation(table_and_mutation.second);
2021-04-08 17:20:45 +00:00
for (const auto & part : creating_parts)
part->versions.mincsn.store(Tx::RolledBackCSN);
2021-06-04 09:26:47 +00:00
2021-04-08 17:20:45 +00:00
for (const auto & part : removing_parts)
part->versions.unlockMaxTID(tid);
2021-06-04 09:26:47 +00:00
/// FIXME const_cast
for (const auto & part : creating_parts)
2021-06-08 18:17:18 +00:00
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(nullptr, {part}, true);
2021-06-04 09:26:47 +00:00
for (const auto & part : removing_parts)
if (part->versions.getMinTID() != tid)
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
2021-12-14 20:06:34 +00:00
return true;
2021-04-08 17:20:45 +00:00
}
2021-03-31 17:55:04 +00:00
2021-04-09 12:53:51 +00:00
void MergeTreeTransaction::onException()
{
TransactionLog::instance().rollbackTransaction(shared_from_this());
}
2021-05-18 17:07:29 +00:00
String MergeTreeTransaction::dumpDescription() const
{
String res = "\ncreating parts:\n";
for (const auto & part : creating_parts)
{
res += part->name;
res += "\n";
}
res += "removing parts:\n";
for (const auto & part : removing_parts)
{
res += part->name;
res += fmt::format(" (created by {}, {})\n", part->versions.getMinTID(), part->versions.mincsn);
assert(!part->versions.mincsn || part->versions.mincsn <= snapshot);
assert(!part->versions.maxcsn);
}
return res;
}
2021-03-31 17:55:04 +00:00
}