Merge branch 'less_zookeeper_requests' into rmt_better_background_tasks_scheduling

This commit is contained in:
Alexander Tokmakov 2023-05-22 23:51:36 +02:00
commit ec6f07aa78
13 changed files with 487 additions and 152 deletions

View File

@ -489,7 +489,7 @@ size_t KafkaConsumer::filterMessageErrors()
{
assert(current == messages.begin());
auto new_end = std::remove_if(messages.begin(), messages.end(), [this](auto & message)
size_t skipped = std::erase_if(messages, [this](auto & message)
{
if (auto error = message.get_error())
{
@ -500,12 +500,8 @@ size_t KafkaConsumer::filterMessageErrors()
return false;
});
size_t skipped = std::distance(new_end, messages.end());
if (skipped)
{
LOG_ERROR(log, "There were {} messages with an error", skipped);
messages.erase(new_end, messages.end());
}
return skipped;
}

View File

@ -11,6 +11,8 @@
namespace DB
{
class IMergeTreeDataPart;
/** Interface of algorithm to select data parts to merge
* (merge is also known as "compaction").
* Following properties depend on it:
@ -26,6 +28,7 @@ namespace DB
*/
class IMergeSelector
{
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
public:
/// Information about data part relevant to merge selecting strategy.
struct Part
@ -50,6 +53,11 @@ public:
ASTPtr compression_codec_desc;
bool shall_participate_in_merges = true;
const DataPartPtr & getDataPartPtr() const
{
return *static_cast<const DataPartPtr *>(data);
}
};
/// Parts are belong to partitions. Only parts within same partition could be merged.

View File

@ -5844,11 +5844,10 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
ActiveDataPartSet active_parts(format_version);
auto detached_parts = getDetachedParts();
auto new_end_it = std::remove_if(detached_parts.begin(), detached_parts.end(), [&partition_id](const DetachedPartInfo & part_info)
std::erase_if(detached_parts, [&partition_id](const DetachedPartInfo & part_info)
{
return !part_info.valid_name || !part_info.prefix.empty() || part_info.partition_id != partition_id;
});
detached_parts.resize(std::distance(detached_parts.begin(), new_end_it));
for (const auto & part_info : detached_parts)
{

View File

@ -136,68 +136,11 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason)
String * out_disable_reason,
const PartitionIdsHint * partitions_hint)
{
MergeTreeData::DataPartsVector data_parts;
if (txn)
{
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData::DataPartsVector active_parts;
MergeTreeData::DataPartsVector outdated_parts;
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn, partitions_hint);
{
auto lock = data.lockParts();
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
}
ActiveDataPartSet active_parts_set{data.format_version};
for (const auto & part : active_parts)
active_parts_set.add(part->name);
for (const auto & part : outdated_parts)
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if (part->version.creation_csn == Tx::RolledBackCSN)
continue;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if (part->version.removal_csn != Tx::UnknownCSN)
continue;
active_parts_set.add(part->name);
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
{
return active_parts_set.getContainingPart(part->info) != part->name;
};
std::erase_if(active_parts, remove_pred);
std::erase_if(outdated_parts, remove_pred);
std::merge(active_parts.begin(), active_parts.end(),
outdated_parts.begin(), outdated_parts.end(),
std::back_inserter(data_parts), MergeTreeData::LessDataPart());
}
else
{
/// Simply get all active parts
data_parts = data.getDataPartsVectorForInternalUsage();
}
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();
if (data_parts.empty())
@ -207,9 +150,193 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::CANNOT_SELECT;
}
time_t current_time = std::time(nullptr);
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_disable_reason);
IMergeSelector::PartsRanges parts_ranges;
if (info.parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason);
if (res == SelectPartsDecision::SELECTED)
return res;
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
if (!best_partition_id_to_optimize.empty())
{
return selectAllPartsToMergeWithinPartition(
future_part,
can_merge_callback,
best_partition_id_to_optimize,
/*final=*/true,
metadata_snapshot,
txn,
out_disable_reason,
/*optimize_skip_merged_partitions=*/true);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;
}
MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPartitionsThatMayBeMerged(
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn) const
{
PartitionIdsHint res;
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn);
if (data_parts.empty())
return res;
auto metadata_snapshot = data.getInMemoryMetadataPtr();
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn);
if (info.parts_selected_precondition == 0)
return res;
Strings all_partition_ids;
std::vector<IMergeSelector::PartsRanges> ranges_per_partition;
String curr_partition;
for (auto & range : info.parts_ranges)
{
if (range.empty())
continue;
const String & partition_id = range.front().getDataPartPtr()->info.partition_id;
if (partition_id != curr_partition)
{
curr_partition = partition_id;
all_partition_ids.push_back(curr_partition);
ranges_per_partition.emplace_back();
}
ranges_per_partition.back().emplace_back(std::move(range));
}
for (size_t i = 0; i < all_partition_ids.size(); ++i)
{
auto future_part = std::make_shared<FutureMergedMutatedPart>();
String out_disable_reason;
/// This method should have been const, but something went wrong... it's const with dry_run = true
auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges(
future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, ranges_per_partition[i], info.current_time, &out_disable_reason,
/* dry_run */ true);
if (status == SelectPartsDecision::SELECTED)
res.insert(all_partition_ids[i]);
else
LOG_TEST(log, "Nothing to merge in partition {}: {}", all_partition_ids[i], out_disable_reason);
}
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
if (!best_partition_id_to_optimize.empty())
res.emplace(std::move(best_partition_id_to_optimize));
LOG_TRACE(log, "Checked {} partitions, found {} partitions with parts that may be merged: {}",
all_partition_ids.size(), res.size(), fmt::join(res, ", "));
return res;
}
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(
const MergeTreeTransactionPtr & txn, const PartitionIdsHint * partitions_hint) const
{
auto res = getDataPartsToSelectMergeFrom(txn);
if (!partitions_hint)
return res;
std::erase_if(res, [partitions_hint](const auto & part)
{
return !partitions_hint->contains(part->info.partition_id);
});
return res;
}
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const
{
MergeTreeData::DataPartsVector res;
if (!txn)
{
/// Simply get all active parts
return data.getDataPartsVectorForInternalUsage();
}
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData::DataPartsVector active_parts;
MergeTreeData::DataPartsVector outdated_parts;
{
auto lock = data.lockParts();
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
}
ActiveDataPartSet active_parts_set{data.format_version};
for (const auto & part : active_parts)
active_parts_set.add(part->name);
for (const auto & part : outdated_parts)
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if (part->version.creation_csn == Tx::RolledBackCSN)
continue;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if (part->version.removal_csn != Tx::UnknownCSN)
continue;
active_parts_set.add(part->name);
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
{
return active_parts_set.getContainingPart(part->info) != part->name;
};
std::erase_if(active_parts, remove_pred);
std::erase_if(outdated_parts, remove_pred);
MergeTreeData::DataPartsVector data_parts;
std::merge(
active_parts.begin(),
active_parts.end(),
outdated_parts.begin(),
outdated_parts.end(),
std::back_inserter(data_parts),
MergeTreeData::LessDataPart());
return data_parts;
}
MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPossibleMergeRanges(
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason) const
{
MergeSelectingInfo res;
res.current_time = std::time(nullptr);
IMergeSelector::PartsRanges & parts_ranges = res.parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy();
/// Volumes with stopped merges are extremely rare situation.
@ -221,14 +348,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const MergeTreeData::DataPartPtr * prev_part = nullptr;
/// collect min_age for each partition while iterating parts
struct PartitionInfo
{
time_t min_age{std::numeric_limits<time_t>::max()};
};
PartitionsInfo & partitions_info = res.partitions_info;
std::unordered_map<std::string, PartitionInfo> partitions_info;
size_t parts_selected_precondition = 0;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
const String & partition_id = part->info.partition_id;
@ -284,7 +405,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.age = current_time - part->modification_time;
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
part_info.ttl_infos = &part->ttl_infos;
@ -294,7 +415,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
auto & partition_info = partitions_info[partition_id];
partition_info.min_age = std::min(partition_info.min_age, part_info.age);
++parts_selected_precondition;
++res.parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
@ -311,13 +432,21 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = &part;
}
if (parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
return res;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part,
bool aggressive,
size_t max_total_size_to_merge,
bool merge_with_ttl_allowed,
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String * out_disable_reason,
bool dry_run)
{
const auto data_settings = data.getSettings();
IMergeSelector::PartsRange parts_to_merge;
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
@ -327,7 +456,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
true);
/*only_drop_parts*/ true,
dry_run);
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
parts_to_merge = drop_ttl_selector.select(parts_ranges, data_settings->max_bytes_to_merge_at_max_space_in_pool);
@ -341,7 +471,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
false);
/*only_drop_parts*/ false,
dry_run);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
@ -354,7 +485,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
next_recompress_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_recompression_ttl_timeout,
metadata_snapshot->getRecompressionTTLs());
metadata_snapshot->getRecompressionTTLs(),
dry_run);
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
@ -382,29 +514,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (parts_to_merge.empty())
{
if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds)
{
auto best_partition_it = std::max_element(
partitions_info.begin(),
partitions_info.end(),
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
assert(best_partition_it != partitions_info.end());
if (static_cast<size_t>(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds)
return selectAllPartsToMergeWithinPartition(
future_part,
can_merge_callback,
best_partition_it->first,
/*final=*/true,
metadata_snapshot,
txn,
out_disable_reason,
/*optimize_skip_merged_partitions=*/true);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
*out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
return SelectPartsDecision::CANNOT_SELECT;
}
}
@ -413,7 +524,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
parts.reserve(parts_to_merge.size());
for (IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
parts.push_back(part);
}
@ -422,6 +533,28 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::SELECTED;
}
String MergeTreeDataMergerMutator::getBestPartitionToOptimizeEntire(
const PartitionsInfo & partitions_info) const
{
const auto data_settings = data.getSettings();
if (!data_settings->min_age_to_force_merge_on_partition_only)
return {};
if (!data_settings->min_age_to_force_merge_seconds)
return {};
auto best_partition_it = std::max_element(
partitions_info.begin(),
partitions_info.end(),
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
assert(best_partition_it != partitions_info.end());
if (static_cast<size_t>(best_partition_it->second.min_age) < data_settings->min_age_to_force_merge_seconds)
return {};
return best_partition_it->first;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
FutureMergedMutatedPartPtr future_part,
const AllowedMergingPredicate & can_merge,

View File

@ -62,6 +62,59 @@ public:
*/
UInt64 getMaxSourcePartSizeForMutation() const;
struct PartitionInfo
{
time_t min_age{std::numeric_limits<time_t>::max()};
};
using PartitionsInfo = std::unordered_map<std::string, PartitionInfo>;
using PartitionIdsHint = std::unordered_set<String>;
/// The first step of selecting parts to merge: returns a list of all active/visible parts
MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const;
/// Same as above, but filters partitions according to partitions_hint
MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(
const MergeTreeTransactionPtr & txn,
const PartitionIdsHint * partitions_hint) const;
struct MergeSelectingInfo
{
time_t current_time;
PartitionsInfo partitions_info;
IMergeSelector::PartsRanges parts_ranges;
size_t parts_selected_precondition = 0;
};
/// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback.
/// All parts within a range can be merged without violating some invariants.
MergeSelectingInfo getPossibleMergeRanges(
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr) const;
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
SelectPartsDecision selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part,
bool aggressive,
size_t max_total_size_to_merge,
bool merge_with_ttl_allowed,
const StorageMetadataPtr & metadata_snapshot,
const IMergeSelector::PartsRanges & parts_ranges,
const time_t & current_time,
String * out_disable_reason = nullptr,
bool dry_run = false);
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
/// Useful to quickly get a list of partitions that contain parts that we may want to merge
PartitionIdsHint getPartitionsThatMayBeMerged(
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn) const;
/** Selects which parts to merge. Uses a lot of heuristics.
*
* can_merge - a function that determines if it is possible to merge a pair of adjacent parts.
@ -76,7 +129,8 @@ public:
const AllowedMergingPredicate & can_merge,
bool merge_with_ttl_allowed,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr);
String * out_disable_reason = nullptr,
const PartitionIdsHint * partitions_hint = nullptr);
/** Select all the parts in the specified partition for merge, if possible.
* final - choose to merge even a single part - that is, allow to merge one part "with itself",

View File

@ -41,6 +41,9 @@ struct ReplicatedMergeTreeMutationEntry
using BlockNumbersType = std::map<String, Int64>;
BlockNumbersType block_numbers;
/// List of partitions that do not have relevant uncommitted blocks to mutate
mutable std::unordered_set<String> checked_partitions_cache;
/// Mutation commands which will give to MUTATE_PART entries
MutationCommands commands;

View File

@ -160,14 +160,13 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
Strings children = zookeeper->getChildren(queue_path);
auto to_remove_it = std::remove_if(
children.begin(), children.end(), [&](const String & path)
size_t removed_entries = std::erase_if(children,
[&](const String & path)
{
return already_loaded_paths.count(path);
});
LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", (to_remove_it - children.begin()), (children.end() - to_remove_it));
children.erase(to_remove_it, children.end());
LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", children.size(), removed_entries);
::sort(children.begin(), children.end());
@ -808,13 +807,15 @@ QueueRepresentation getQueueRepresentation(const std::list<ReplicatedMergeTreeLo
///
/// From the first glance it can sound that these two sets should be enough to understand which parts we have to mutate
/// to finish mutation but it's not true:
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished). We also have to account parts which we will
/// get after queue execution.
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue. So if we need to execute mutation 0000000001 for part all_0_0_0
/// and we have already pulled entry to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished).
/// We also have to account parts which we will get after queue execution.
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue.
/// So if we need to execute mutation 0000000001 for part all_0_0_0 and we have already pulled entry
/// to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
///
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts if they could be affected by mutation. Such approach is expensive
/// but we do it only once since we get the mutation. After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts
/// if they could be affected by mutation. Such approach is expensive but we do it only once since we get the mutation.
/// After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
ActiveDataPartSet getPartNamesToMutate(
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & current_parts,
const QueueRepresentation & queue_representation, MergeTreeDataFormatVersion format_version)
@ -1759,7 +1760,8 @@ size_t ReplicatedMergeTreeQueue::countUnfinishedMutations() const
}
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint)
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper,
std::optional<PartitionIdsHint> && partition_ids_hint)
{
return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint));
}
@ -1874,7 +1876,9 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0)
{
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number. "
"It's OK, tasks for rest parts will be skipped, but probably a lot of mutations "
"were executed concurrently on different replicas.", znode);
mutation.parts_to_do.clear();
}
}
@ -1900,14 +1904,15 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
PartitionIdsHint partition_ids_hint;
for (const auto & candidate : candidates)
for (const auto & partitions : candidate->block_numbers)
partition_ids_hint.insert(partitions.first);
if (!candidate->checked_partitions_cache.contains(partitions.first))
partition_ids_hint.insert(partitions.first);
auto merge_pred = getMergePredicate(zookeeper, std::move(partition_ids_hint));
std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
for (const auto & candidate : candidates)
{
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers))
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers, candidate->checked_partitions_cache))
finished.push_back(candidate.get());
}
@ -2098,9 +2103,15 @@ ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue()
return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex);
}
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_)
LocalMergePredicate::LocalMergePredicate(ReplicatedMergeTreeQueue & queue_)
: queue(queue_)
{
}
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, std::optional<PartitionIdsHint> && partition_ids_hint_)
: nested_pred(queue_)
, queue(queue_)
, partition_ids_hint(std::move(partition_ids_hint_))
, prev_virtual_parts(queue.format_version)
{
@ -2117,21 +2128,33 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
/// Dropped (or cleaned up by TTL) partitions are never removed from ZK,
/// so without hint it can do a few thousands requests (if not using MultiRead).
Strings partitions;
if (partition_ids_hint.empty())
if (!partition_ids_hint)
partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
else
std::copy(partition_ids_hint.begin(), partition_ids_hint.end(), std::back_inserter(partitions));
std::copy(partition_ids_hint->begin(), partition_ids_hint->end(), std::back_inserter(partitions));
std::vector<std::string> paths;
paths.reserve(partitions.size());
for (const String & partition : partitions)
paths.push_back(fs::path(queue.zookeeper_path) / "block_numbers" / partition);
auto locks_children = zookeeper->getChildren(paths);
auto locks_children = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = locks_children[i].names;
auto & response = locks_children[i];
if (response.error != Coordination::Error::ZOK && !partition_ids_hint)
throw Coordination::Exception(response.error, paths[i]);
if (response.error != Coordination::Error::ZOK)
{
/// Probably a wrong hint was provided (it's ok if a user passed non-existing partition to OPTIMIZE)
LOG_WARNING(queue.log, "Partition id '{}' was provided as a hint, but there's not such partition in ZooKeeper", partitions[i]);
partition_ids_hint->erase(partitions[i]);
continue;
}
Strings partition_block_numbers = response.names;
for (const String & entry : partition_block_numbers)
{
if (!startsWith(entry, "block-"))
@ -2166,6 +2189,18 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
inprogress_quorum_part.clear();
}
bool LocalMergePredicate::operator()(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction *,
String * out_reason) const
{
if (left)
return canMergeTwoParts(left, right, out_reason);
else
return canMergeSinglePart(right, out_reason);
}
bool ReplicatedMergeTreeMergePredicate::operator()(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
@ -2253,7 +2288,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
if (left_max_block + 1 < right_min_block)
{
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(left->info.partition_id))
if (partition_ids_hint && !partition_ids_hint->contains(left->info.partition_id))
{
if (out_reason)
*out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
@ -2277,6 +2312,17 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
}
}
return nested_pred.canMergeTwoParts(left, right, out_reason);
}
bool LocalMergePredicate::canMergeTwoParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason) const
{
Int64 left_max_block = left->info.max_block;
Int64 right_min_block = right->info.min_block;
std::lock_guard lock(queue.state_mutex);
for (const MergeTreeData::DataPartPtr & part : {left, right})
@ -2354,6 +2400,11 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
return false;
}
return nested_pred.canMergeSinglePart(part, out_reason);
}
bool LocalMergePredicate::canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const
{
std::lock_guard lock(queue.state_mutex);
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
@ -2476,7 +2527,8 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
}
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const
{
/// Check committing block numbers, maybe some affected inserts
/// still not written to disk and committed to ZK.
@ -2485,7 +2537,11 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
const String & partition_id = kv.first;
Int64 block_num = kv.second;
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(partition_id))
/// Maybe we already know that there are no relevant uncommitted blocks
if (checked_partitions_cache.contains(partition_id))
continue;
if (partition_ids_hint && !partition_ids_hint->contains(partition_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id);
auto partition_it = committing_blocks.find(partition_id);
@ -2499,6 +2555,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
return false;
}
}
/// There are no committing blocks less than block_num in that partition and there's no way they can appear
/// TODO Why not to get committing blocks when pulling a mutation? We could get rid of finalization task or simplify it
checked_partitions_cache.insert(partition_id);
}
std::lock_guard lock(queue.state_mutex);

View File

@ -32,6 +32,7 @@ class ReplicatedMergeTreeQueue
{
private:
friend class CurrentlyExecuting;
friend class LocalMergePredicate;
friend class ReplicatedMergeTreeMergePredicate;
friend class MergeFromLogEntryTask;
friend class ReplicatedMergeMutateTaskBase;
@ -390,7 +391,8 @@ public:
size_t countUnfinishedMutations() const;
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper,
std::optional<PartitionIdsHint> && partition_ids_hint);
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version,
Strings & mutation_ids) const;
@ -489,10 +491,33 @@ public:
void createLogEntriesToFetchBrokenParts();
};
/// Lightweight version of ReplicatedMergeTreeMergePredicate that do not make any ZooKeeper requests,
/// but may return false-positive results. Checks only a subset of required conditions.
class LocalMergePredicate
{
public:
LocalMergePredicate(ReplicatedMergeTreeQueue & queue_);
bool operator()(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
const MergeTreeTransaction * txn,
String * out_reason = nullptr) const;
bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason = nullptr) const;
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
private:
const ReplicatedMergeTreeQueue & queue;
};
class ReplicatedMergeTreeMergePredicate
{
public:
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_);
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper,
std::optional<PartitionIdsHint> && partition_ids_hint_);
/// Depending on the existence of left part checks a merge predicate for two parts or for single part.
bool operator()(const MergeTreeData::DataPartPtr & left,
@ -523,7 +548,8 @@ public:
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const;
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const;
/// The version of "log" node that is used to check that no new merges have appeared.
int32_t getVersion() const { return merges_version; }
@ -535,9 +561,11 @@ public:
String getCoveringVirtualPart(const String & part_name) const;
private:
LocalMergePredicate nested_pred;
const ReplicatedMergeTreeQueue & queue;
PartitionIdsHint partition_ids_hint;
std::optional<PartitionIdsHint> partition_ids_hint;
/// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
ActiveDataPartSet prev_virtual_parts;

View File

@ -11,7 +11,7 @@ namespace DB
const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
return part->info.partition_id;
}
@ -90,8 +90,11 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
++best_end;
}
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
if (!dry_run)
{
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
}
return PartsRange(best_begin, best_end);
}

View File

@ -21,10 +21,11 @@ class ITTLMergeSelector : public IMergeSelector
public:
using PartitionIdToTTLs = std::map<String, time_t>;
ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_)
ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool dry_run_)
: current_time(current_time_)
, merge_due_times(merge_due_times_)
, merge_cooldown_time(merge_cooldown_time_)
, dry_run(dry_run_)
{
}
@ -46,6 +47,7 @@ protected:
private:
PartitionIdToTTLs & merge_due_times;
Int64 merge_cooldown_time;
bool dry_run;
};
@ -56,8 +58,9 @@ class TTLDeleteMergeSelector : public ITTLMergeSelector
public:
using PartitionIdToTTLs = std::map<String, time_t>;
TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_)
TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
bool only_drop_parts_, bool dry_run_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
, only_drop_parts(only_drop_parts_) {}
time_t getTTLForPart(const IMergeSelector::Part & part) const override;
@ -75,8 +78,9 @@ private:
class TTLRecompressMergeSelector : public ITTLMergeSelector
{
public:
TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, const TTLDescriptions & recompression_ttls_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_)
TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
const TTLDescriptions & recompression_ttls_, bool dry_run_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
, recompression_ttls(recompression_ttls_)
{}

View File

@ -3245,7 +3245,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto zookeeper = getZooKeeperAndAssertNotReadonly();
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, getAllPartitionIds());
std::optional<ReplicatedMergeTreeMergePredicate> merge_pred;
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
@ -3282,8 +3282,22 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (storage_settings.get()->assign_part_uuids)
future_merged_part->uuid = UUIDHelpers::generateV4();
if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr) == SelectPartsDecision::SELECTED)
bool can_assign_merge = max_source_parts_size_for_merge > 0;
PartitionIdsHint partitions_to_merge_in;
if (can_assign_merge)
{
auto lightweight_merge_pred = LocalMergePredicate(queue);
partitions_to_merge_in = merger_mutator.getPartitionsThatMayBeMerged(
max_source_parts_size_for_merge, lightweight_merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR);
if (partitions_to_merge_in.empty())
can_assign_merge = false;
else
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
}
if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
{
create_result = createLogEntryToMergeParts(
zookeeper,
@ -3295,13 +3309,17 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
deduplicate_by_columns,
cleanup,
nullptr,
merge_pred.getVersion(),
merge_pred->getVersion(),
future_merged_part->merge_type);
}
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
&& merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue)
{
/// We don't need the list of committing blocks to choose a part to mutate
if (!merge_pred)
merge_pred.emplace(queue.getMergePredicate(zookeeper, PartitionIdsHint{}));
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVectorForInternalUsage();
for (const auto & part : data_parts)
@ -3309,7 +3327,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
continue;
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
@ -3318,7 +3336,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
future_merged_part->uuid,
desired_mutation_version->first,
desired_mutation_version->second,
merge_pred.getVersion());
merge_pred->getVersion());
if (create_result == CreateMergeEntryResult::Ok ||
create_result == CreateMergeEntryResult::LogUpdated)

View File

@ -0,0 +1 @@
/test/02439/s1/default/block_numbers/123

View File

@ -0,0 +1,28 @@
drop table if exists rmt;
create table rmt (n int, m int) engine=ReplicatedMergeTree('/test/02439/{shard}/{database}', '{replica}') partition by n order by n;
insert into rmt select number, number from numbers(50);
insert into rmt values (1, 2);
insert into rmt values (1, 3);
insert into rmt values (1, 4);
insert into rmt values (1, 5);
insert into rmt values (1, 6);
insert into rmt values (1, 7);
insert into rmt values (1, 8);
insert into rmt values (1, 9);
-- there's nothing to merge in all partitions but '1'
optimize table rmt partition tuple(123);
set optimize_throw_if_noop=1;
optimize table rmt partition tuple(123); -- { serverError CANNOT_ASSIGN_OPTIMIZE }
select sleepEachRow(3) as higher_probability_of_reproducing_the_issue format Null;
system flush logs;
-- it should not list unneeded partitions where we cannot merge anything
select distinct path from system.zookeeper_log where path like '/test/02439/s1/' || currentDatabase() || '/block_numbers/%'
and op_num in ('List', 'SimpleList', 'FilteredList') and path not like '%/block_numbers/1';
drop table rmt;