fix merges of uncommitted parts

This commit is contained in:
Alexander Tokmakov 2021-05-18 20:07:29 +03:00
parent f2ab5a05c6
commit 529d1aeb19
15 changed files with 83 additions and 27 deletions

View File

@ -787,7 +787,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
/// Setup signal handlers.
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP, SIGTRAP}, signalHandler, &handled_signals);
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP}, signalHandler, &handled_signals);
addSignalHandler({SIGHUP, SIGUSR1}, closeLogsSignalHandler, &handled_signals);
addSignalHandler({SIGINT, SIGQUIT, SIGTERM}, terminateRequestedSignalHandler, &handled_signals);

View File

@ -123,9 +123,12 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
return false;
/// Otherwise, part is definitely visible if:
/// - creation was committed after we took the snapshot and nobody tried to remove the part
/// - creation was committed before we took the snapshot and nobody tried to remove the part
/// - creation was committed before and removal was committed after
/// - current transaction is creating it
if (!max_lock && min && min <= snapshot_version)
if (min && min <= snapshot_version && !max_lock)
return true;
if (min && min <= snapshot_version && max && snapshot_version < max)
return true;
if (mintid == txn.tid)
return true;

View File

@ -112,4 +112,25 @@ void MergeTreeTransaction::onException()
TransactionLog::instance().rollbackTransaction(shared_from_this());
}
String MergeTreeTransaction::dumpDescription() const
{
String res = "\ncreating parts:\n";
for (const auto & part : creating_parts)
{
res += part->name;
res += "\n";
}
res += "removing parts:\n";
for (const auto & part : removing_parts)
{
res += part->name;
res += fmt::format(" (created by {}, {})\n", part->versions.getMinTID(), part->versions.mincsn);
assert(!part->versions.mincsn || part->versions.mincsn <= snapshot);
assert(!part->versions.maxcsn);
}
return res;
}
}

View File

@ -39,6 +39,8 @@ public:
void onException();
String dumpDescription() const;
private:
void beforeCommit();
void afterCommit(CSN assigned_csn) noexcept;

View File

@ -1,6 +1,7 @@
#include <Interpreters/TransactionLog.h>
#include <Common/TransactionMetadata.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
namespace DB
{
@ -17,6 +18,7 @@ TransactionLog & TransactionLog::instance()
}
TransactionLog::TransactionLog()
: log(&Poco::Logger::get("TransactionLog"))
{
latest_snapshot = 1;
csn_counter = 1;
@ -39,6 +41,7 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
}
LOG_TRACE(log, "Beginning transaction {}", txn->tid);
return txn;
}
@ -50,10 +53,12 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
/// TODO Transactions: reset local_tid_counter
if (txn->isReadOnly())
{
LOG_TRACE(log, "Closing readonly transaction {}", txn->tid);
new_csn = txn->snapshot;
}
else
{
LOG_TRACE(log, "Committing transaction {}{}", txn->tid, txn->dumpDescription());
std::lock_guard lock{commit_mutex};
new_csn = 1 + csn_counter.fetch_add(1);
bool inserted = tid_to_csn.try_emplace(txn->tid.getHash(), new_csn).second; /// Commit point
@ -62,6 +67,8 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
latest_snapshot.store(new_csn, std::memory_order_relaxed);
}
LOG_INFO(log, "Transaction {} committed with CSN={}", txn->tid, new_csn);
txn->afterCommit(new_csn);
{
@ -75,6 +82,7 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) noexcept
{
LOG_TRACE(log, "Rolling back transaction {}", txn->tid);
txn->rollback();
{
std::lock_guard lock{running_list_mutex};

View File

@ -30,6 +30,8 @@ public:
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
private:
Poco::Logger * log;
std::atomic<CSN> latest_snapshot;
std::atomic<CSN> csn_counter;
std::atomic<LocalTID> local_tid_counter;

View File

@ -1084,7 +1084,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
modifyPartState(it, DataPartState::Outdated);
(*it)->versions.maxtid = Tx::PrehistoricTID;
(*it)->versions.lockMaxTID(Tx::PrehistoricTID);
(*it)->versions.maxcsn.store(Tx::PrehistoricCSN, std::memory_order_relaxed);
removePartContributionToDataVolume(*it);
};
@ -3225,10 +3225,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me
auto it = maybe_visible_parts.begin();
auto it_last = maybe_visible_parts.end() - 1;
String visible_parts_str;
while (it <= it_last)
{
if ((*it)->versions.isVisible(*txn))
{
visible_parts_str += (*it)->name;
visible_parts_str += " ";
++it;
}
else
@ -3239,6 +3242,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me
}
size_t new_size = it_last - maybe_visible_parts.begin() + 1;
LOG_TRACE(log, "Got {} parts visible for {}: {}", new_size, txn->tid, visible_parts_str);
maybe_visible_parts.resize(new_size);
return maybe_visible_parts;
}

View File

@ -216,10 +216,9 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const MergeTreeTransactionPtr & txn,
String * out_disable_reason)
{
MergeTreeData::DataPartsVector data_parts = data.getVisibleDataPartsVector(txn);
//FIXME get rid of sorting
std::sort(data_parts.begin(), data_parts.end(), MergeTreeData::LessDataPart());
/// NOTE It will contain uncommitted parts and future parts.
/// But It's ok since merge predicate allows to include in range visible parts only.
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();
@ -265,7 +264,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
* So we have to check if this part is currently being inserted with quorum and so on and so forth.
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts. */
if (!can_merge_callback(nullptr, part, nullptr))
if (!can_merge_callback(nullptr, part, txn.get(), nullptr))
continue;
}
@ -273,7 +272,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
{
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if (!can_merge_callback(*prev_part, part, nullptr))
if (!can_merge_callback(*prev_part, part, txn.get(), nullptr))
{
/// Starting new interval in the same partition
assert(!parts_ranges.back().empty());
@ -415,7 +414,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
while (it != parts.end())
{
/// For the case of one part, we check that it can be merged "with itself".
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason))
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, nullptr, out_disable_reason))
{
return SelectPartsDecision::CANNOT_SELECT;
}

View File

@ -61,7 +61,10 @@ struct FutureMergedMutatedPart
class MergeTreeDataMergerMutator
{
public:
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &, String *)>;
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &,
const MergeTreeData::DataPartPtr &,
const MergeTreeTransaction *,
String *)>;
MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size);

View File

@ -1811,6 +1811,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
bool ReplicatedMergeTreeMergePredicate::operator()(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction *,
String * out_reason) const
{
if (left)

View File

@ -451,6 +451,7 @@ public:
/// Depending on the existence of left part checks a merge predicate for two parts or for single part.
bool operator()(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction * txn,
String * out_reason = nullptr) const;
/// Can we assign a merge with these two parts?

View File

@ -698,8 +698,18 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
CurrentlyMergingPartsTaggerPtr merging_tagger;
MergeList::EntryPtr merge_entry;
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, String *) -> bool
auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String *) -> bool
{
if (tx)
{
/// Cannot merge parts if some of them is not visible in current snapshot
/// TODO We can use simplified visibility rules (without CSN lookup) here
if (left && !left->versions.isVisible(*tx))
return false;
if (right && !right->versions.isVisible(*tx))
return false;
}
/// This predicate is checked for the first part of each range.
/// (left = nullptr, right = "first part of partition")
if (!left)
@ -1022,28 +1032,35 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
if (merger_mutator.merges_blocker.isCancelled())
return {};
/// FIXME Transactions: do not begin transaction if we don't need it
auto txn = TransactionLog::instance().beginTransaction();
MergeTreeTransactionHolder autocommit{txn, true};
auto metadata_snapshot = getInMemoryMetadataPtr();
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, txn);
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, nullptr);
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
if (merge_entry || mutate_entry)
{
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock, holder = std::move(autocommit)] () mutable
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
merge_entry = {};
mutate_entry = {};
/// FIXME Transactions: do not begin transaction if we don't need it
auto txn = TransactionLog::instance().beginTransaction();
MergeTreeTransactionHolder autocommit{txn, true};
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, txn);
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
if (merge_entry)
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock, holder.getTransaction());
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock, txn);
else if (mutate_entry)
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
__builtin_unreachable();
return true;
//__builtin_unreachable();
}, PoolType::MERGE_MUTATE};
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))

View File

@ -2,8 +2,6 @@ drop table if exists txn_counters;
create table txn_counters (n Int64, mintid DEFAULT transactionID()) engine=MergeTree order by n;
system stop merges txn_counters; --FIXME
insert into txn_counters(n) values (1);
select transactionID();

View File

@ -3,8 +3,6 @@ drop table if exists mt2;
create table mt1 (n Int64) engine=MergeTree order by n;
create table mt2 (n Int64) engine=MergeTree order by n;
--system stop merges mt1; --FIXME
--system stop merges mt2; --FIXME
commit; -- { serverError 585 }
rollback; -- { serverError 585 }

View File

@ -8,7 +8,6 @@ set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mt";
$CLICKHOUSE_CLIENT --query "CREATE TABLE mt (n Int8, m Int8) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n";
#$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES mt"; #FIXME
function thread_insert_commit()
{
@ -59,4 +58,4 @@ $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM mt;";
#$CLICKHOUSE_CLIENT --query "DROP TABLE mt";
$CLICKHOUSE_CLIENT --query "DROP TABLE mt";