try fix merge assignment

This commit is contained in:
Alexander Tokmakov 2022-03-18 20:31:44 +01:00
parent 0719d01d39
commit 418d52044a
2 changed files with 69 additions and 3 deletions

View File

@ -14,6 +14,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/MergeTask.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Processors/Transforms/TTLTransform.h>
#include <Processors/Transforms/TTLCalcTransform.h>
@ -29,6 +30,7 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/Context.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
@ -128,9 +130,67 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const MergeTreeTransactionPtr & txn,
String * out_disable_reason)
{
/// NOTE It will contain uncommitted parts and future parts.
/// But It's ok since merge predicate allows to include in range visible parts only.
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVectorForInternalUsage();
MergeTreeData::DataPartsVector data_parts;
if (txn)
{
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData::DataPartsVector active_parts;
MergeTreeData::DataPartsVector outdated_parts;
{
auto lock = data.lockParts();
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
}
ActiveDataPartSet active_parts_set{data.format_version};
for (const auto & part : active_parts)
active_parts_set.add(part->name);
for (const auto & part : outdated_parts)
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if (part->version.creation_csn == Tx::RolledBackCSN)
continue;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if (part->version.removal_csn != Tx::UnknownCSN)
continue;
active_parts_set.add(part->name);
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
{
return active_parts_set.getContainingPart(part->info) != part->name;
};
auto new_end_it = std::remove_if(active_parts.begin(), active_parts.end(), remove_pred);
active_parts.erase(new_end_it, active_parts.end());
new_end_it = std::remove_if(outdated_parts.begin(), outdated_parts.end(), remove_pred);
outdated_parts.erase(new_end_it, outdated_parts.end());
std::merge(active_parts.begin(), active_parts.end(),
outdated_parts.begin(), outdated_parts.end(),
std::back_inserter(data_parts), MergeTreeData::LessDataPart());
}
else
{
/// Simply get all active parts
data_parts = data.getDataPartsVectorForInternalUsage();
}
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();

View File

@ -795,6 +795,12 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
return false;
if (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false;
/// Do not try to merge parts that are locked for removal (merge will probably fail)
if (left && left->version.isRemovalTIDLocked())
return false;
if (right && right->version.isRemovalTIDLocked())
return false;
}
/// This predicate is checked for the first part of each range.