This commit is contained in:
Alexander Tokmakov 2022-02-03 21:57:09 +03:00
parent ca5f951558
commit fe30e0f162
12 changed files with 74 additions and 35 deletions

View File

@ -1,4 +1,4 @@
# rebuild in #24258
# rebuild in #33610
# docker build -t clickhouse/test-base .
ARG FROM_TAG=latest
FROM clickhouse/test-util:$FROM_TAG

View File

@ -1,4 +1,4 @@
# rebuild in #24258
# rebuild in #33610
# docker build -t clickhouse/integration-test .
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG

View File

@ -1,4 +1,4 @@
# rebuild in #24258
# rebuild in #33610
# docker build -t clickhouse/test-util .
FROM ubuntu:20.04

View File

@ -3000,7 +3000,8 @@ void Context::checkTransactionsAreAllowed(bool explicit_tcl_query /* = false */)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Experimental support for transactions is disabled, "
"however, some query or background task tried to access TransactionLog. Probably it's a bug.");
"however, some query or background task tried to access TransactionLog."
"If you have not enabled this feature explicitly, then it's a bug.");
}
void Context::initCurrentTransaction(MergeTreeTransactionPtr txn)

View File

@ -206,7 +206,7 @@ void TransactionLog::runUpdatingThread()
}
catch (const Coordination::Exception & e)
{
LOG_ERROR(log, getCurrentExceptionMessage(true));
tryLogCurrentException(log);
/// TODO better backoff
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (Coordination::isHardwareError(e.code))
@ -218,7 +218,7 @@ void TransactionLog::runUpdatingThread()
}
catch (...)
{
LOG_ERROR(log, getCurrentExceptionMessage(true));
tryLogCurrentException(log);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
log_updated_event->set();
}

View File

@ -711,10 +711,14 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
NameSet result = {"checksums.txt", "columns.txt"};
String default_codec_path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
String txn_version_path = fs::path(getFullRelativePath()) / TXN_VERSION_METADATA_FILE_NAME;
if (volume->getDisk()->exists(default_codec_path))
result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
if (volume->getDisk()->exists(txn_version_path))
result.emplace(TXN_VERSION_METADATA_FILE_NAME);
return result;
}

View File

@ -116,7 +116,7 @@ void MergePlainMergeTreeTask::prepare()
void MergePlainMergeTreeTask::finish()
{
new_part = merge_task->getFuture().get();
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr); //FIXME
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr);
write_part_log({});
}

View File

@ -1315,67 +1315,78 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
removePartContributionToDataVolume(*it);
};
/// All parts are in "Committed" state after loading
/// All parts are in "Active" state after loading
assert(std::find_if(data_parts_by_state_and_info.begin(), data_parts_by_state_and_info.end(),
[](const auto & part)
{
return part->getState() != DataPartState::Active;
}) == data_parts_by_state_and_info.end());
bool have_parts_with_version_metadata = false;
auto iter = data_parts_by_state_and_info.begin();
while (iter != data_parts_by_state_and_info.end() && (*iter)->getState() == DataPartState::Active)
{
const DataPartPtr & part = *iter;
part->loadVersionMetadata();
VersionMetadata & versions = part->version;
VersionMetadata & version = part->version;
if (version.creation_tid.isPrehistoric() && (version.removal_tid.isEmpty() || version.removal_tid.isPrehistoric()))
{
++iter;
continue;
}
else
{
have_parts_with_version_metadata = true;
}
/// Check if CSNs were witten after committing transaction, update and write if needed.
bool versions_updated = false;
if (!versions.creation_tid.isEmpty() && !part->version.creation_csn)
bool version_updated = false;
assert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
{
auto min = TransactionLog::getCSN(versions.creation_tid);
auto min = TransactionLog::getCSN(version.creation_tid);
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
min = Tx::RolledBackCSN;
}
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}",
part->name, versions.creation_tid, min);
versions.creation_csn = min;
versions_updated = true;
part->name, version.creation_tid, min);
version.creation_csn = min;
version_updated = true;
}
if (!versions.removal_tid.isEmpty() && !part->version.removal_csn)
if (!version.removal_tid.isEmpty() && !part->version.removal_csn)
{
auto max = TransactionLog::getCSN(versions.removal_tid);
auto max = TransactionLog::getCSN(version.removal_tid);
if (max)
{
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}",
part->name, versions.removal_tid, max);
versions.removal_csn = max;
part->name, version.removal_tid, max);
version.removal_csn = max;
}
else
{
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
part->name, versions.removal_tid);
versions.unlockMaxTID(versions.removal_tid, TransactionInfoContext{getStorageID(), part->name});
part->name, version.removal_tid);
version.unlockMaxTID(version.removal_tid, TransactionInfoContext{getStorageID(), part->name});
}
versions_updated = true;
version_updated = true;
}
/// Sanity checks
bool csn_order = !versions.removal_csn || versions.creation_csn <= versions.removal_csn;
bool min_start_csn_order = versions.creation_tid.start_csn <= versions.creation_csn;
bool max_start_csn_order = versions.removal_tid.start_csn <= versions.removal_csn;
bool creation_csn_known = versions.creation_csn;
bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn;
bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn;
bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn;
bool creation_csn_known = version.creation_csn;
if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid versions metadata: {}", part->name, versions.toString());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", part->name, version.toString());
if (versions_updated)
if (version_updated)
part->storeVersionMetadata();
/// Deactivate part if creation was not committed or if removal was.
if (versions.creation_csn == Tx::RolledBackCSN || versions.removal_csn)
if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn)
{
auto next_it = std::next(iter);
deactivate_part(iter);
@ -1387,6 +1398,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
if (have_parts_with_version_metadata)
transactions_enabled.store(true);
/// Delete from the set of current parts those parts that are covered by another part (those parts that
/// were merged), but that for some reason are still not deleted from the filesystem.
/// Deletion of files will be performed later in the clearOldParts() method.
@ -2587,6 +2601,9 @@ bool MergeTreeData::renameTempPartAndReplace(
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
ErrorCodes::LOGICAL_ERROR);
if (txn)
transactions_enabled.store(true);
part->assertState({DataPartState::Temporary});
MergeTreePartInfo part_info = part->info;
@ -2734,6 +2751,9 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & /*acquired_lock*/)
{
if (txn)
transactions_enabled.store(true);
auto remove_time = clear_without_timeout ? 0 : time(nullptr);
for (const DataPartPtr & part : remove)

View File

@ -961,6 +961,9 @@ protected:
mutable std::shared_mutex pinned_part_uuids_mutex;
PinnedPartUUIDsPtr pinned_part_uuids;
/// True if at least one part was created/removed with transaction.
mutable std::atomic_bool transactions_enabled = false;
/// Work with data parts
struct TagByInfo{};

View File

@ -52,6 +52,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ABORTED;
}
/// Do not start to merge parts, if free space is less than sum size of parts times specified coefficient.
@ -516,6 +517,11 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
const MergeTreeTransactionPtr & txn,
MergeTreeData::Transaction * out_transaction)
{
/// Some of source parts was possibly created in transaction, so non-transactional merge may break isolation.
if (data.transactions_enabled.load(std::memory_order_relaxed) && !txn)
throw Exception(ErrorCodes::ABORTED, "Cancelling merge, because it was done without starting transaction,"
"but transactions were enabled for this table");
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, txn.get(), nullptr, out_transaction);

View File

@ -98,7 +98,7 @@ bool MutatePlainMergeTreeTask::executeStep()
if (merge_mutate_entry->txn)
merge_mutate_entry->txn->onException();
String exception_message = getCurrentExceptionMessage(false);
LOG_ERROR(&Poco::Logger::get("MutatePlainMergeTreeTask"), exception_message);
LOG_ERROR(&Poco::Logger::get("MutatePlainMergeTreeTask"), "{}", exception_message);
storage.updateMutationEntriesErrors(future_part, false, exception_message);
write_part_log(ExecutionStatus::fromCurrentException());
return false;

View File

@ -773,7 +773,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
if (tx)
{
/// Cannot merge parts if some of them is not visible in current snapshot
/// TODO We can use simplified visibility rules (without CSN lookup) here
/// TODO Transactions: We can use simplified visibility rules (without CSN lookup) here
if (left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false;
if (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
@ -1106,9 +1106,14 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// FIXME Transactions: do not begin transaction if we don't need it
auto txn = TransactionLog::instance().beginTransaction();
MergeTreeTransactionHolder autocommit{txn, true};
MergeTreeTransactionHolder transaction_for_merge;
MergeTreeTransactionPtr txn;
if (transactions_enabled.load(std::memory_order_relaxed))
{
/// TODO Transactions: avoid beginning transaction if there is nothing to merge.
txn = TransactionLog::instance().beginTransaction();
transaction_for_merge = MergeTreeTransactionHolder{txn, /* autocommit = */ true};
}
bool has_mutations = false;
{
@ -1134,7 +1139,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, false, Names{}, merge_entry, share_lock, common_assignee_trigger);
task->setCurrentTransaction(std::move(autocommit), std::move(txn));
task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn));
assignee.scheduleMergeMutateTask(task);
return true;
}