ClickHouse/src/Interpreters/MergeTreeTransaction.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

372 lines
14 KiB
C++
Raw Normal View History

2021-03-31 17:55:04 +00:00
#include <Interpreters/MergeTreeTransaction.h>
2021-04-08 17:20:45 +00:00
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
2021-04-09 12:53:51 +00:00
#include <Interpreters/TransactionLog.h>
2022-02-14 19:50:08 +00:00
#include <Interpreters/TransactionsInfoLog.h>
2022-06-16 17:41:32 +00:00
#include <Common/noexcept_scope.h>
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-12-14 20:06:34 +00:00
namespace ErrorCodes
{
extern const int INVALID_TRANSACTION;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
2021-12-14 20:06:34 +00:00
}
static void checkNotOrdinaryDatabase(const StoragePtr & storage)
2022-03-31 11:47:38 +00:00
{
if (storage->getStorageID().uuid != UUIDHelpers::Nil)
return;
2022-03-31 11:47:38 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table {} belongs to database with Ordinary engine. "
"This engine is deprecated and is not supported in transactions.", storage->getStorageID().getNameForLogs());
2022-03-31 11:47:38 +00:00
}
2022-06-27 20:48:27 +00:00
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id, std::list<CSN>::iterator snapshot_it_)
2021-03-31 17:55:04 +00:00
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
2022-06-27 20:48:27 +00:00
, snapshot_in_use_it(snapshot_it_)
2021-04-09 12:53:51 +00:00
, csn(Tx::UnknownCSN)
2021-03-31 17:55:04 +00:00
{
}
2022-03-18 13:33:59 +00:00
void MergeTreeTransaction::setSnapshot(CSN new_snapshot)
{
2022-06-27 20:48:27 +00:00
snapshot.store(new_snapshot, std::memory_order_relaxed);
2022-03-18 13:33:59 +00:00
}
2021-04-09 12:53:51 +00:00
MergeTreeTransaction::State MergeTreeTransaction::getState() const
{
2021-12-14 20:06:34 +00:00
CSN c = csn.load();
2022-05-20 15:35:29 +00:00
if (c == Tx::UnknownCSN)
2021-04-09 12:53:51 +00:00
return RUNNING;
2022-05-20 15:35:29 +00:00
if (c == Tx::CommittingCSN)
return COMMITTING;
2021-12-14 20:06:34 +00:00
if (c == Tx::RolledBackCSN)
2021-04-09 12:53:51 +00:00
return ROLLED_BACK;
return COMMITTED;
}
2022-05-23 18:53:33 +00:00
bool MergeTreeTransaction::waitStateChange(CSN current_state_csn) const
2022-05-20 20:08:46 +00:00
{
2022-05-23 18:53:33 +00:00
CSN current_value = current_state_csn;
while (current_value == current_state_csn && !TransactionLog::instance().isShuttingDown())
2022-05-20 20:08:46 +00:00
{
csn.wait(current_value);
current_value = csn.load();
}
2022-05-23 18:53:33 +00:00
return current_value != current_state_csn;
2022-05-20 20:08:46 +00:00
}
2022-03-10 21:29:58 +00:00
void MergeTreeTransaction::checkIsNotCancelled() const
{
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Creation TID was written to data part earlier on part creation.
/// We only need to ensure that it's written and add part to in-memory set of new parts.
2022-02-14 19:50:08 +00:00
new_part->assertHasVersionMetadata(txn);
2021-04-08 17:20:45 +00:00
if (txn)
2022-03-14 20:43:34 +00:00
{
2021-06-04 09:26:47 +00:00
txn->addNewPart(storage, new_part);
2022-03-14 20:43:34 +00:00
/// Now we know actual part name and can write it to system log table.
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, txn->tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
}
2021-04-08 17:20:45 +00:00
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
2022-03-16 19:16:26 +00:00
TransactionInfoContext transaction_context{storage->getStorageID(), part_to_remove->name};
2021-04-08 17:20:45 +00:00
if (txn)
2022-03-14 20:43:34 +00:00
{
/// Lock part for removal and write current TID into version metadata file.
/// If server crash just after committing transactions
/// we will find this TID in version metadata and will finally remove part.
2022-03-16 19:16:26 +00:00
txn->removeOldPart(storage, part_to_remove, transaction_context);
2022-03-14 20:43:34 +00:00
}
2022-03-09 20:38:18 +00:00
else
2022-03-14 20:43:34 +00:00
{
2022-03-16 19:16:26 +00:00
/// Lock part for removal with special TID, so transactions will not try to remove it concurrently.
/// We lock it only in memory if part was not involved in any transactions.
2022-03-16 19:16:26 +00:00
part_to_remove->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context);
if (part_to_remove->wasInvolvedInTransaction())
part_to_remove->appendRemovalTIDToVersionMetadata();
2022-03-14 20:43:34 +00:00
}
2021-04-08 17:20:45 +00:00
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
2021-04-08 17:20:45 +00:00
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
2022-03-16 19:16:26 +00:00
TransactionInfoContext transaction_context{storage->getStorageID(), new_part->name};
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, transaction_context);
transaction_context.covering_part = std::move(transaction_context.part_name);
2022-02-14 19:50:08 +00:00
new_part->assertHasVersionMetadata(txn);
2021-04-08 17:20:45 +00:00
if (txn)
2022-03-09 20:38:18 +00:00
{
2021-06-04 09:26:47 +00:00
txn->addNewPart(storage, new_part);
2022-03-09 20:38:18 +00:00
for (const auto & covered : covered_parts)
{
2022-03-16 19:16:26 +00:00
transaction_context.part_name = covered->name;
txn->removeOldPart(storage, covered, transaction_context);
2022-03-09 20:38:18 +00:00
}
}
else
2021-04-08 17:20:45 +00:00
{
2022-03-09 20:38:18 +00:00
for (const auto & covered : covered_parts)
{
2022-03-16 19:16:26 +00:00
transaction_context.part_name = covered->name;
covered->version.lockRemovalTID(tid, transaction_context);
2022-03-09 20:38:18 +00:00
}
2021-04-08 17:20:45 +00:00
}
}
2021-06-04 09:26:47 +00:00
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
2021-04-08 17:20:45 +00:00
{
checkNotOrdinaryDatabase(storage);
2022-03-10 21:29:58 +00:00
std::lock_guard lock{mutex};
checkIsNotCancelled();
2021-06-04 09:26:47 +00:00
storages.insert(storage);
2021-04-08 17:20:45 +00:00
creating_parts.push_back(new_part);
}
2022-03-09 20:38:18 +00:00
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, const TransactionInfoContext & context)
2021-04-08 17:20:45 +00:00
{
checkNotOrdinaryDatabase(storage);
2022-03-31 11:47:38 +00:00
2022-03-09 20:38:18 +00:00
{
2022-03-10 21:29:58 +00:00
std::lock_guard lock{mutex};
checkIsNotCancelled();
2022-03-14 20:43:34 +00:00
part_to_remove->version.lockRemovalTID(tid, context);
NOEXCEPT_SCOPE({
storages.insert(storage);
removing_parts.push_back(part_to_remove);
});
2022-03-09 20:38:18 +00:00
}
2022-03-10 21:29:58 +00:00
2022-03-08 19:11:47 +00:00
part_to_remove->appendRemovalTIDToVersionMetadata();
2021-04-08 17:20:45 +00:00
}
2021-12-14 20:06:34 +00:00
void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & mutation_id)
{
checkNotOrdinaryDatabase(table);
2022-03-10 21:29:58 +00:00
std::lock_guard lock{mutex};
checkIsNotCancelled();
2022-02-14 19:50:08 +00:00
storages.insert(table);
2021-12-14 20:06:34 +00:00
mutations.emplace_back(table, mutation_id);
}
2021-04-08 17:20:45 +00:00
bool MergeTreeTransaction::isReadOnly() const
{
2022-03-10 21:29:58 +00:00
std::lock_guard lock{mutex};
2022-05-20 10:41:44 +00:00
chassert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
2022-02-14 19:50:08 +00:00
return storages.empty();
2021-04-08 17:20:45 +00:00
}
2022-03-15 13:35:48 +00:00
scope_guard MergeTreeTransaction::beforeCommit()
2021-04-08 17:20:45 +00:00
{
2022-03-10 21:29:58 +00:00
RunningMutationsList mutations_to_wait;
{
std::lock_guard lock{mutex};
mutations_to_wait = mutations;
}
2022-03-14 20:43:34 +00:00
/// We should wait for mutations to finish before committing transaction, because some mutation may fail and cause rollback.
2022-03-10 21:29:58 +00:00
for (const auto & table_and_mutation : mutations_to_wait)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
2022-03-14 20:43:34 +00:00
2022-03-14 23:08:25 +00:00
assert([&]()
{
2022-03-14 20:43:34 +00:00
std::lock_guard lock{mutex};
return mutations == mutations_to_wait;
}());
CSN expected = Tx::UnknownCSN;
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
if (!can_commit)
{
/// Transaction was concurrently cancelled by KILL TRANSACTION or KILL MUTATION
if (expected == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
}
2022-03-15 13:35:48 +00:00
/// We should set CSN back to Unknown if we will fail to commit transaction for some reason (connection loss, etc)
return [this]()
{
CSN expected_value = Tx::CommittingCSN;
csn.compare_exchange_strong(expected_value, Tx::UnknownCSN);
};
2021-04-08 17:20:45 +00:00
}
2021-04-09 12:53:51 +00:00
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
2021-04-08 17:20:45 +00:00
{
LockMemoryExceptionInThread memory_tracker_lock(VariableContext::Global);
2022-03-14 20:43:34 +00:00
/// Write allocated CSN into version metadata, so we will know CSN without reading it from transaction log
/// and we will be able to remove old entries from transaction log in ZK.
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
2021-12-14 20:06:34 +00:00
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
2022-05-20 10:41:44 +00:00
chassert(prev_value == Tx::CommittingCSN);
2022-06-27 20:48:27 +00:00
DataPartsVector created_parts;
DataPartsVector removed_parts;
RunningMutationsList committed_mutations;
{
2022-06-28 14:25:29 +00:00
/// We don't really need mutex here, because no concurrent modifications of transaction object may happen after commit.
2022-06-27 20:48:27 +00:00
std::lock_guard lock{mutex};
created_parts = creating_parts;
removed_parts = removing_parts;
committed_mutations = mutations;
}
for (const auto & part : created_parts)
2021-12-30 13:15:28 +00:00
{
part->version.creation_csn.store(csn);
2022-02-17 21:26:37 +00:00
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::CREATION);
2021-12-30 13:15:28 +00:00
}
2022-06-27 20:48:27 +00:00
for (const auto & part : removed_parts)
2021-12-30 13:15:28 +00:00
{
part->version.removal_csn.store(csn);
2022-02-17 21:26:37 +00:00
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::REMOVAL);
2021-12-30 13:15:28 +00:00
}
2022-06-27 20:48:27 +00:00
for (const auto & storage_and_mutation : committed_mutations)
storage_and_mutation.first->setMutationCSN(storage_and_mutation.second, csn);
2021-04-08 17:20:45 +00:00
}
2021-12-14 20:06:34 +00:00
bool MergeTreeTransaction::rollback() noexcept
2021-04-08 17:20:45 +00:00
{
LockMemoryExceptionInThread memory_tracker_lock(VariableContext::Global);
2021-12-14 20:06:34 +00:00
CSN expected = Tx::UnknownCSN;
bool need_rollback = csn.compare_exchange_strong(expected, Tx::RolledBackCSN);
2022-03-14 20:43:34 +00:00
/// Check that it was not rolled back concurrently
2021-12-14 20:06:34 +00:00
if (!need_rollback)
return false;
2022-03-14 20:43:34 +00:00
/// It's not a problem if server crash at this point
/// because on startup we will see that TID is not committed and will simply discard these changes.
2022-03-15 13:35:48 +00:00
RunningMutationsList mutations_to_kill;
2022-03-15 16:51:53 +00:00
DataPartsVector parts_to_remove;
DataPartsVector parts_to_activate;
2022-03-15 13:35:48 +00:00
{
std::lock_guard lock{mutex};
mutations_to_kill = mutations;
2022-03-15 16:51:53 +00:00
parts_to_remove = creating_parts;
parts_to_activate = removing_parts;
2022-03-15 13:35:48 +00:00
}
2022-03-15 16:51:53 +00:00
/// Forcefully stop related mutations if any
2022-03-15 13:35:48 +00:00
for (const auto & table_and_mutation : mutations_to_kill)
2021-12-14 20:06:34 +00:00
table_and_mutation.first->killMutation(table_and_mutation.second);
2022-04-07 16:17:43 +00:00
/// Discard changes in active parts set
/// Remove parts that were created, restore parts that were removed (except parts that were created by this transaction too)
2022-03-14 20:43:34 +00:00
/// Kind of optimization: cleanup thread can remove these parts immediately
2022-03-15 16:51:53 +00:00
for (const auto & part : parts_to_remove)
{
part->version.creation_csn.store(Tx::RolledBackCSN);
/// Write special RolledBackCSN, so we will be able to cleanup transaction log
part->appendCSNToVersionMetadata(VersionMetadata::CREATION);
}
2021-06-04 09:26:47 +00:00
2022-04-11 14:25:59 +00:00
for (const auto & part : parts_to_remove)
{
/// NOTE It's possible that part is already removed from working set in the same transaction
/// (or, even worse, in a separate non-transactional query with PrehistoricTID),
/// but it's not a problem: removePartsFromWorkingSet(...) will do nothing in this case.
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part}, true);
}
for (const auto & part : parts_to_activate)
if (part->version.getCreationTID() != tid)
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
2022-03-15 16:51:53 +00:00
for (const auto & part : parts_to_activate)
2022-03-08 19:11:47 +00:00
{
2022-03-14 20:43:34 +00:00
/// Clear removal_tid from version metadata file, so we will not need to distinguish TIDs that were not committed
/// and TIDs that were committed long time ago and were removed from the log on log cleanup.
2022-03-08 19:11:47 +00:00
part->appendRemovalTIDToVersionMetadata(/* clear */ true);
2022-03-14 20:43:34 +00:00
part->version.unlockRemovalTID(tid, TransactionInfoContext{part->storage.getStorageID(), part->name});
2022-03-08 19:11:47 +00:00
}
2021-06-04 09:26:47 +00:00
2022-03-15 16:51:53 +00:00
assert([&]()
{
std::lock_guard lock{mutex};
assert(mutations_to_kill == mutations);
assert(parts_to_remove == creating_parts);
assert(parts_to_activate == removing_parts);
return csn == Tx::RolledBackCSN;
}());
2021-12-14 20:06:34 +00:00
return true;
2021-04-08 17:20:45 +00:00
}
2021-03-31 17:55:04 +00:00
2021-04-09 12:53:51 +00:00
void MergeTreeTransaction::onException()
{
TransactionLog::instance().rollbackTransaction(shared_from_this());
}
2021-05-18 17:07:29 +00:00
String MergeTreeTransaction::dumpDescription() const
{
2022-06-27 20:48:27 +00:00
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), getSnapshot());
2022-02-14 19:50:08 +00:00
if (isReadOnly())
2021-05-18 17:07:29 +00:00
{
2022-02-14 19:50:08 +00:00
res += ", readonly";
return res;
2021-05-18 17:07:29 +00:00
}
2022-03-10 21:29:58 +00:00
std::lock_guard lock{mutex};
2022-02-14 19:50:08 +00:00
res += fmt::format(", affects {} tables:", storages.size());
using ChangesInTable = std::tuple<Strings, Strings, Strings>;
std::unordered_map<const IStorage *, ChangesInTable> storage_to_changes;
for (const auto & part : creating_parts)
std::get<0>(storage_to_changes[&(part->storage)]).push_back(part->name);
2021-05-18 17:07:29 +00:00
for (const auto & part : removing_parts)
{
2022-02-14 19:50:08 +00:00
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
2022-06-27 20:48:27 +00:00
chassert(!part->version.creation_csn || part->version.creation_csn <= getSnapshot());
2022-02-14 19:50:08 +00:00
}
for (const auto & mutation : mutations)
std::get<2>(storage_to_changes[mutation.first.get()]).push_back(mutation.second);
for (const auto & storage_changes : storage_to_changes)
{
res += fmt::format("\n\t{}:", storage_changes.first->getStorageID().getNameForLogs());
const auto & creating_info = std::get<0>(storage_changes.second);
const auto & removing_info = std::get<1>(storage_changes.second);
const auto & mutations_info = std::get<2>(storage_changes.second);
if (!creating_info.empty())
res += fmt::format("\n\t\tcreating parts:\n\t\t\t{}", fmt::join(creating_info, "\n\t\t\t"));
if (!removing_info.empty())
res += fmt::format("\n\t\tremoving parts:\n\t\t\t{}", fmt::join(removing_info, "\n\t\t\t"));
if (!mutations_info.empty())
res += fmt::format("\n\t\tmutations:\n\t\t\t{}", fmt::join(mutations_info, "\n\t\t\t"));
2021-05-18 17:07:29 +00:00
}
return res;
}
2021-03-31 17:55:04 +00:00
}