2021-03-31 17:55:04 +00:00
|
|
|
#include <Interpreters/MergeTreeTransaction.h>
|
2021-04-08 17:20:45 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeData.h>
|
2021-04-09 12:53:51 +00:00
|
|
|
#include <Interpreters/TransactionLog.h>
|
2022-02-14 19:50:08 +00:00
|
|
|
#include <Interpreters/TransactionsInfoLog.h>
|
2021-03-31 17:55:04 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-12-14 20:06:34 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int INVALID_TRANSACTION;
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id)
|
|
|
|
: tid({snapshot_, local_tid_, host_id})
|
|
|
|
, snapshot(snapshot_)
|
2021-04-09 12:53:51 +00:00
|
|
|
, csn(Tx::UnknownCSN)
|
2021-03-31 17:55:04 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
MergeTreeTransaction::State MergeTreeTransaction::getState() const
|
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
CSN c = csn.load();
|
|
|
|
if (c == Tx::UnknownCSN || c == Tx::CommittingCSN)
|
2021-04-09 12:53:51 +00:00
|
|
|
return RUNNING;
|
2021-12-14 20:06:34 +00:00
|
|
|
if (c == Tx::RolledBackCSN)
|
2021-04-09 12:53:51 +00:00
|
|
|
return ROLLED_BACK;
|
|
|
|
return COMMITTED;
|
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
|
|
|
|
2022-02-14 19:50:08 +00:00
|
|
|
/// Now we know actual part name and can write it to system log table.
|
|
|
|
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
|
|
|
|
|
|
|
|
new_part->assertHasVersionMetadata(txn);
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->addNewPart(storage, new_part);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
2022-01-14 14:03:00 +00:00
|
|
|
TransactionInfoContext context{storage->getStorageID(), part_to_remove->name};
|
2022-01-28 17:47:37 +00:00
|
|
|
part_to_remove->version.lockMaxTID(tid, context);
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->removeOldPart(storage, part_to_remove);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
|
|
|
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
|
|
|
|
2022-01-14 14:03:00 +00:00
|
|
|
TransactionInfoContext context{storage->getStorageID(), new_part->name};
|
2022-02-14 19:50:08 +00:00
|
|
|
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, context);
|
|
|
|
new_part->assertHasVersionMetadata(txn);
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->addNewPart(storage, new_part);
|
2021-04-08 17:20:45 +00:00
|
|
|
|
2022-01-14 14:03:00 +00:00
|
|
|
context.covering_part = std::move(context.part_name);
|
2021-06-02 20:03:44 +00:00
|
|
|
for (const auto & covered : covered_parts)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2022-01-14 14:03:00 +00:00
|
|
|
context.part_name = covered->name;
|
2022-01-28 17:47:37 +00:00
|
|
|
covered->version.lockMaxTID(tid, context);
|
2021-04-08 17:20:45 +00:00
|
|
|
if (txn)
|
2021-06-04 09:26:47 +00:00
|
|
|
txn->removeOldPart(storage, covered);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
CSN c = csn.load();
|
|
|
|
if (c == Tx::RolledBackCSN)
|
|
|
|
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
|
|
|
|
else if (c != Tx::UnknownCSN)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
storages.insert(storage);
|
2021-04-08 17:20:45 +00:00
|
|
|
creating_parts.push_back(new_part);
|
2022-02-17 21:26:37 +00:00
|
|
|
//new_part->storeVersionMetadata();
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove)
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
CSN c = csn.load();
|
|
|
|
if (c == Tx::RolledBackCSN)
|
2021-12-15 21:49:06 +00:00
|
|
|
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");//FIXME
|
2021-12-14 20:06:34 +00:00
|
|
|
else if (c != Tx::UnknownCSN)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
|
|
|
|
|
2021-06-04 09:26:47 +00:00
|
|
|
storages.insert(storage);
|
2021-04-08 17:20:45 +00:00
|
|
|
removing_parts.push_back(part_to_remove);
|
2021-12-30 13:15:28 +00:00
|
|
|
part_to_remove->storeVersionMetadata();
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 20:06:34 +00:00
|
|
|
void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & mutation_id)
|
|
|
|
{
|
2022-02-14 19:50:08 +00:00
|
|
|
storages.insert(table);
|
2021-12-14 20:06:34 +00:00
|
|
|
mutations.emplace_back(table, mutation_id);
|
|
|
|
}
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
bool MergeTreeTransaction::isReadOnly() const
|
|
|
|
{
|
2022-02-14 19:50:08 +00:00
|
|
|
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
|
|
|
|
return storages.empty();
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 20:06:34 +00:00
|
|
|
void MergeTreeTransaction::beforeCommit()
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
for (const auto & table_and_mutation : mutations)
|
|
|
|
table_and_mutation.first->waitForMutation(table_and_mutation.second);
|
|
|
|
|
|
|
|
CSN expected = Tx::UnknownCSN;
|
|
|
|
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
|
|
|
|
if (can_commit)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (expected == Tx::RolledBackCSN)
|
|
|
|
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
|
|
|
|
assert(prev_value == Tx::CommittingCSN);
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : creating_parts)
|
2021-12-30 13:15:28 +00:00
|
|
|
{
|
2022-01-28 17:47:37 +00:00
|
|
|
part->version.creation_csn.store(csn);
|
2022-02-17 21:26:37 +00:00
|
|
|
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::CREATION);
|
2021-12-30 13:15:28 +00:00
|
|
|
}
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : removing_parts)
|
2021-12-30 13:15:28 +00:00
|
|
|
{
|
2022-01-28 17:47:37 +00:00
|
|
|
part->version.removal_csn.store(csn);
|
2022-02-17 21:26:37 +00:00
|
|
|
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::REMOVAL);
|
2021-12-30 13:15:28 +00:00
|
|
|
}
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 20:06:34 +00:00
|
|
|
bool MergeTreeTransaction::rollback() noexcept
|
2021-04-08 17:20:45 +00:00
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
CSN expected = Tx::UnknownCSN;
|
|
|
|
bool need_rollback = csn.compare_exchange_strong(expected, Tx::RolledBackCSN);
|
|
|
|
|
|
|
|
if (!need_rollback)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
for (const auto & table_and_mutation : mutations)
|
|
|
|
table_and_mutation.first->killMutation(table_and_mutation.second);
|
|
|
|
|
2021-04-08 17:20:45 +00:00
|
|
|
for (const auto & part : creating_parts)
|
2022-01-28 17:47:37 +00:00
|
|
|
part->version.creation_csn.store(Tx::RolledBackCSN);
|
2021-06-04 09:26:47 +00:00
|
|
|
|
2022-01-19 18:29:31 +00:00
|
|
|
for (const auto & part : removing_parts) /// TODO update metadata file
|
2022-01-28 17:47:37 +00:00
|
|
|
part->version.unlockMaxTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
|
2021-06-04 09:26:47 +00:00
|
|
|
|
|
|
|
/// FIXME const_cast
|
|
|
|
for (const auto & part : creating_parts)
|
2021-06-08 18:17:18 +00:00
|
|
|
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(nullptr, {part}, true);
|
2021-06-04 09:26:47 +00:00
|
|
|
|
|
|
|
for (const auto & part : removing_parts)
|
2022-01-28 17:47:37 +00:00
|
|
|
if (part->version.getCreationTID() != tid)
|
2021-06-04 09:26:47 +00:00
|
|
|
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
|
|
|
|
|
2021-12-14 20:06:34 +00:00
|
|
|
return true;
|
2021-04-08 17:20:45 +00:00
|
|
|
}
|
2021-03-31 17:55:04 +00:00
|
|
|
|
2021-04-09 12:53:51 +00:00
|
|
|
void MergeTreeTransaction::onException()
|
|
|
|
{
|
|
|
|
TransactionLog::instance().rollbackTransaction(shared_from_this());
|
|
|
|
}
|
|
|
|
|
2021-05-18 17:07:29 +00:00
|
|
|
String MergeTreeTransaction::dumpDescription() const
|
|
|
|
{
|
2022-02-14 19:50:08 +00:00
|
|
|
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), snapshot);
|
|
|
|
|
|
|
|
if (isReadOnly())
|
2021-05-18 17:07:29 +00:00
|
|
|
{
|
2022-02-14 19:50:08 +00:00
|
|
|
res += ", readonly";
|
|
|
|
return res;
|
2021-05-18 17:07:29 +00:00
|
|
|
}
|
|
|
|
|
2022-02-14 19:50:08 +00:00
|
|
|
res += fmt::format(", affects {} tables:", storages.size());
|
|
|
|
|
|
|
|
using ChangesInTable = std::tuple<Strings, Strings, Strings>;
|
|
|
|
std::unordered_map<const IStorage *, ChangesInTable> storage_to_changes;
|
|
|
|
|
|
|
|
for (const auto & part : creating_parts)
|
|
|
|
std::get<0>(storage_to_changes[&(part->storage)]).push_back(part->name);
|
|
|
|
|
2021-05-18 17:07:29 +00:00
|
|
|
for (const auto & part : removing_parts)
|
|
|
|
{
|
2022-02-14 19:50:08 +00:00
|
|
|
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
|
|
|
|
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
|
2022-01-28 17:47:37 +00:00
|
|
|
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
|
2022-02-14 19:50:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for (const auto & mutation : mutations)
|
|
|
|
std::get<2>(storage_to_changes[mutation.first.get()]).push_back(mutation.second);
|
|
|
|
|
|
|
|
for (const auto & storage_changes : storage_to_changes)
|
|
|
|
{
|
|
|
|
res += fmt::format("\n\t{}:", storage_changes.first->getStorageID().getNameForLogs());
|
|
|
|
const auto & creating_info = std::get<0>(storage_changes.second);
|
|
|
|
const auto & removing_info = std::get<1>(storage_changes.second);
|
|
|
|
const auto & mutations_info = std::get<2>(storage_changes.second);
|
|
|
|
|
|
|
|
if (!creating_info.empty())
|
|
|
|
res += fmt::format("\n\t\tcreating parts:\n\t\t\t{}", fmt::join(creating_info, "\n\t\t\t"));
|
|
|
|
if (!removing_info.empty())
|
|
|
|
res += fmt::format("\n\t\tremoving parts:\n\t\t\t{}", fmt::join(removing_info, "\n\t\t\t"));
|
|
|
|
if (!mutations_info.empty())
|
|
|
|
res += fmt::format("\n\t\tmutations:\n\t\t\t{}", fmt::join(mutations_info, "\n\t\t\t"));
|
2021-05-18 17:07:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2021-03-31 17:55:04 +00:00
|
|
|
}
|