mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #49637 from ClickHouse/less_zookeeper_requests
Provide better partitions hint for merge selecting task
This commit is contained in:
commit
141a72d694
@ -489,7 +489,7 @@ size_t KafkaConsumer::filterMessageErrors()
|
||||
{
|
||||
assert(current == messages.begin());
|
||||
|
||||
auto new_end = std::remove_if(messages.begin(), messages.end(), [this](auto & message)
|
||||
size_t skipped = std::erase_if(messages, [this](auto & message)
|
||||
{
|
||||
if (auto error = message.get_error())
|
||||
{
|
||||
@ -500,12 +500,8 @@ size_t KafkaConsumer::filterMessageErrors()
|
||||
return false;
|
||||
});
|
||||
|
||||
size_t skipped = std::distance(new_end, messages.end());
|
||||
if (skipped)
|
||||
{
|
||||
LOG_ERROR(log, "There were {} messages with an error", skipped);
|
||||
messages.erase(new_end, messages.end());
|
||||
}
|
||||
|
||||
return skipped;
|
||||
}
|
||||
|
@ -11,6 +11,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IMergeTreeDataPart;
|
||||
|
||||
/** Interface of algorithm to select data parts to merge
|
||||
* (merge is also known as "compaction").
|
||||
* Following properties depend on it:
|
||||
@ -26,6 +28,7 @@ namespace DB
|
||||
*/
|
||||
class IMergeSelector
|
||||
{
|
||||
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
|
||||
public:
|
||||
/// Information about data part relevant to merge selecting strategy.
|
||||
struct Part
|
||||
@ -50,6 +53,11 @@ public:
|
||||
ASTPtr compression_codec_desc;
|
||||
|
||||
bool shall_participate_in_merges = true;
|
||||
|
||||
const DataPartPtr & getDataPartPtr() const
|
||||
{
|
||||
return *static_cast<const DataPartPtr *>(data);
|
||||
}
|
||||
};
|
||||
|
||||
/// Parts are belong to partitions. Only parts within same partition could be merged.
|
||||
|
@ -5820,11 +5820,10 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
|
||||
ActiveDataPartSet active_parts(format_version);
|
||||
|
||||
auto detached_parts = getDetachedParts();
|
||||
auto new_end_it = std::remove_if(detached_parts.begin(), detached_parts.end(), [&partition_id](const DetachedPartInfo & part_info)
|
||||
std::erase_if(detached_parts, [&partition_id](const DetachedPartInfo & part_info)
|
||||
{
|
||||
return !part_info.valid_name || !part_info.prefix.empty() || part_info.partition_id != partition_id;
|
||||
});
|
||||
detached_parts.resize(std::distance(detached_parts.begin(), new_end_it));
|
||||
|
||||
for (const auto & part_info : detached_parts)
|
||||
{
|
||||
|
@ -136,68 +136,11 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
bool merge_with_ttl_allowed,
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
String * out_disable_reason)
|
||||
String * out_disable_reason,
|
||||
const PartitionIdsHint * partitions_hint)
|
||||
{
|
||||
MergeTreeData::DataPartsVector data_parts;
|
||||
if (txn)
|
||||
{
|
||||
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
|
||||
/// So at the first glance we could just get all active parts.
|
||||
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
|
||||
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
|
||||
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
|
||||
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
|
||||
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
|
||||
MergeTreeData::DataPartsVector active_parts;
|
||||
MergeTreeData::DataPartsVector outdated_parts;
|
||||
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn, partitions_hint);
|
||||
|
||||
{
|
||||
auto lock = data.lockParts();
|
||||
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
|
||||
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
|
||||
}
|
||||
|
||||
ActiveDataPartSet active_parts_set{data.format_version};
|
||||
for (const auto & part : active_parts)
|
||||
active_parts_set.add(part->name);
|
||||
|
||||
for (const auto & part : outdated_parts)
|
||||
{
|
||||
/// We don't need rolled back parts.
|
||||
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
|
||||
/// and then remove part from working set, so there's no race condition
|
||||
if (part->version.creation_csn == Tx::RolledBackCSN)
|
||||
continue;
|
||||
|
||||
/// We don't need parts that are finally removed.
|
||||
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
|
||||
/// But it's not a problem if we will add such part to `data_parts`.
|
||||
if (part->version.removal_csn != Tx::UnknownCSN)
|
||||
continue;
|
||||
|
||||
active_parts_set.add(part->name);
|
||||
}
|
||||
|
||||
/// Restore "active" parts set from selected active and outdated parts
|
||||
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
|
||||
{
|
||||
return active_parts_set.getContainingPart(part->info) != part->name;
|
||||
};
|
||||
|
||||
std::erase_if(active_parts, remove_pred);
|
||||
|
||||
std::erase_if(outdated_parts, remove_pred);
|
||||
|
||||
std::merge(active_parts.begin(), active_parts.end(),
|
||||
outdated_parts.begin(), outdated_parts.end(),
|
||||
std::back_inserter(data_parts), MergeTreeData::LessDataPart());
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Simply get all active parts
|
||||
data_parts = data.getDataPartsVectorForInternalUsage();
|
||||
}
|
||||
const auto data_settings = data.getSettings();
|
||||
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
||||
|
||||
if (data_parts.empty())
|
||||
@ -207,9 +150,193 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
|
||||
time_t current_time = std::time(nullptr);
|
||||
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_disable_reason);
|
||||
|
||||
IMergeSelector::PartsRanges parts_ranges;
|
||||
if (info.parts_selected_precondition == 0)
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "No parts satisfy preconditions for merge";
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
|
||||
auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed,
|
||||
metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason);
|
||||
|
||||
if (res == SelectPartsDecision::SELECTED)
|
||||
return res;
|
||||
|
||||
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
|
||||
if (!best_partition_id_to_optimize.empty())
|
||||
{
|
||||
return selectAllPartsToMergeWithinPartition(
|
||||
future_part,
|
||||
can_merge_callback,
|
||||
best_partition_id_to_optimize,
|
||||
/*final=*/true,
|
||||
metadata_snapshot,
|
||||
txn,
|
||||
out_disable_reason,
|
||||
/*optimize_skip_merged_partitions=*/true);
|
||||
}
|
||||
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
|
||||
MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPartitionsThatMayBeMerged(
|
||||
size_t max_total_size_to_merge,
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
bool merge_with_ttl_allowed,
|
||||
const MergeTreeTransactionPtr & txn) const
|
||||
{
|
||||
PartitionIdsHint res;
|
||||
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn);
|
||||
if (data_parts.empty())
|
||||
return res;
|
||||
|
||||
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
||||
|
||||
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn);
|
||||
|
||||
if (info.parts_selected_precondition == 0)
|
||||
return res;
|
||||
|
||||
Strings all_partition_ids;
|
||||
std::vector<IMergeSelector::PartsRanges> ranges_per_partition;
|
||||
|
||||
String curr_partition;
|
||||
for (auto & range : info.parts_ranges)
|
||||
{
|
||||
if (range.empty())
|
||||
continue;
|
||||
const String & partition_id = range.front().getDataPartPtr()->info.partition_id;
|
||||
if (partition_id != curr_partition)
|
||||
{
|
||||
curr_partition = partition_id;
|
||||
all_partition_ids.push_back(curr_partition);
|
||||
ranges_per_partition.emplace_back();
|
||||
}
|
||||
ranges_per_partition.back().emplace_back(std::move(range));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < all_partition_ids.size(); ++i)
|
||||
{
|
||||
auto future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
String out_disable_reason;
|
||||
/// This method should have been const, but something went wrong... it's const with dry_run = true
|
||||
auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges(
|
||||
future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed,
|
||||
metadata_snapshot, ranges_per_partition[i], info.current_time, &out_disable_reason,
|
||||
/* dry_run */ true);
|
||||
if (status == SelectPartsDecision::SELECTED)
|
||||
res.insert(all_partition_ids[i]);
|
||||
else
|
||||
LOG_TEST(log, "Nothing to merge in partition {}: {}", all_partition_ids[i], out_disable_reason);
|
||||
}
|
||||
|
||||
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
|
||||
if (!best_partition_id_to_optimize.empty())
|
||||
res.emplace(std::move(best_partition_id_to_optimize));
|
||||
|
||||
LOG_TRACE(log, "Checked {} partitions, found {} partitions with parts that may be merged: {}",
|
||||
all_partition_ids.size(), res.size(), fmt::join(res, ", "));
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(
|
||||
const MergeTreeTransactionPtr & txn, const PartitionIdsHint * partitions_hint) const
|
||||
{
|
||||
auto res = getDataPartsToSelectMergeFrom(txn);
|
||||
if (!partitions_hint)
|
||||
return res;
|
||||
|
||||
std::erase_if(res, [partitions_hint](const auto & part)
|
||||
{
|
||||
return !partitions_hint->contains(part->info.partition_id);
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const
|
||||
{
|
||||
MergeTreeData::DataPartsVector res;
|
||||
if (!txn)
|
||||
{
|
||||
/// Simply get all active parts
|
||||
return data.getDataPartsVectorForInternalUsage();
|
||||
}
|
||||
|
||||
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
|
||||
/// So at the first glance we could just get all active parts.
|
||||
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
|
||||
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
|
||||
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
|
||||
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
|
||||
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
|
||||
MergeTreeData::DataPartsVector active_parts;
|
||||
MergeTreeData::DataPartsVector outdated_parts;
|
||||
|
||||
{
|
||||
auto lock = data.lockParts();
|
||||
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
|
||||
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
|
||||
}
|
||||
|
||||
ActiveDataPartSet active_parts_set{data.format_version};
|
||||
for (const auto & part : active_parts)
|
||||
active_parts_set.add(part->name);
|
||||
|
||||
for (const auto & part : outdated_parts)
|
||||
{
|
||||
/// We don't need rolled back parts.
|
||||
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
|
||||
/// and then remove part from working set, so there's no race condition
|
||||
if (part->version.creation_csn == Tx::RolledBackCSN)
|
||||
continue;
|
||||
|
||||
/// We don't need parts that are finally removed.
|
||||
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
|
||||
/// But it's not a problem if we will add such part to `data_parts`.
|
||||
if (part->version.removal_csn != Tx::UnknownCSN)
|
||||
continue;
|
||||
|
||||
active_parts_set.add(part->name);
|
||||
}
|
||||
|
||||
/// Restore "active" parts set from selected active and outdated parts
|
||||
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
|
||||
{
|
||||
return active_parts_set.getContainingPart(part->info) != part->name;
|
||||
};
|
||||
|
||||
std::erase_if(active_parts, remove_pred);
|
||||
|
||||
std::erase_if(outdated_parts, remove_pred);
|
||||
|
||||
MergeTreeData::DataPartsVector data_parts;
|
||||
std::merge(
|
||||
active_parts.begin(),
|
||||
active_parts.end(),
|
||||
outdated_parts.begin(),
|
||||
outdated_parts.end(),
|
||||
std::back_inserter(data_parts),
|
||||
MergeTreeData::LessDataPart());
|
||||
|
||||
return data_parts;
|
||||
}
|
||||
|
||||
MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPossibleMergeRanges(
|
||||
const MergeTreeData::DataPartsVector & data_parts,
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
String * out_disable_reason) const
|
||||
{
|
||||
MergeSelectingInfo res;
|
||||
|
||||
res.current_time = std::time(nullptr);
|
||||
|
||||
IMergeSelector::PartsRanges & parts_ranges = res.parts_ranges;
|
||||
|
||||
StoragePolicyPtr storage_policy = data.getStoragePolicy();
|
||||
/// Volumes with stopped merges are extremely rare situation.
|
||||
@ -221,14 +348,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
const MergeTreeData::DataPartPtr * prev_part = nullptr;
|
||||
|
||||
/// collect min_age for each partition while iterating parts
|
||||
struct PartitionInfo
|
||||
{
|
||||
time_t min_age{std::numeric_limits<time_t>::max()};
|
||||
};
|
||||
PartitionsInfo & partitions_info = res.partitions_info;
|
||||
|
||||
std::unordered_map<std::string, PartitionInfo> partitions_info;
|
||||
|
||||
size_t parts_selected_precondition = 0;
|
||||
for (const MergeTreeData::DataPartPtr & part : data_parts)
|
||||
{
|
||||
const String & partition_id = part->info.partition_id;
|
||||
@ -284,7 +405,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
|
||||
IMergeSelector::Part part_info;
|
||||
part_info.size = part->getBytesOnDisk();
|
||||
part_info.age = current_time - part->modification_time;
|
||||
part_info.age = res.current_time - part->modification_time;
|
||||
part_info.level = part->info.level;
|
||||
part_info.data = ∂
|
||||
part_info.ttl_infos = &part->ttl_infos;
|
||||
@ -294,7 +415,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
auto & partition_info = partitions_info[partition_id];
|
||||
partition_info.min_age = std::min(partition_info.min_age, part_info.age);
|
||||
|
||||
++parts_selected_precondition;
|
||||
++res.parts_selected_precondition;
|
||||
|
||||
parts_ranges.back().emplace_back(part_info);
|
||||
|
||||
@ -311,13 +432,21 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
prev_part = ∂
|
||||
}
|
||||
|
||||
if (parts_selected_precondition == 0)
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "No parts satisfy preconditions for merge";
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
|
||||
FutureMergedMutatedPartPtr future_part,
|
||||
bool aggressive,
|
||||
size_t max_total_size_to_merge,
|
||||
bool merge_with_ttl_allowed,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IMergeSelector::PartsRanges & parts_ranges,
|
||||
const time_t & current_time,
|
||||
String * out_disable_reason,
|
||||
bool dry_run)
|
||||
{
|
||||
const auto data_settings = data.getSettings();
|
||||
IMergeSelector::PartsRange parts_to_merge;
|
||||
|
||||
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
|
||||
@ -327,7 +456,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
next_delete_ttl_merge_times_by_partition,
|
||||
current_time,
|
||||
data_settings->merge_with_ttl_timeout,
|
||||
true);
|
||||
/*only_drop_parts*/ true,
|
||||
dry_run);
|
||||
|
||||
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
|
||||
parts_to_merge = drop_ttl_selector.select(parts_ranges, data_settings->max_bytes_to_merge_at_max_space_in_pool);
|
||||
@ -341,7 +471,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
next_delete_ttl_merge_times_by_partition,
|
||||
current_time,
|
||||
data_settings->merge_with_ttl_timeout,
|
||||
false);
|
||||
/*only_drop_parts*/ false,
|
||||
dry_run);
|
||||
|
||||
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
|
||||
if (!parts_to_merge.empty())
|
||||
@ -354,7 +485,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
next_recompress_ttl_merge_times_by_partition,
|
||||
current_time,
|
||||
data_settings->merge_with_recompression_ttl_timeout,
|
||||
metadata_snapshot->getRecompressionTTLs());
|
||||
metadata_snapshot->getRecompressionTTLs(),
|
||||
dry_run);
|
||||
|
||||
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge);
|
||||
if (!parts_to_merge.empty())
|
||||
@ -382,29 +514,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
|
||||
if (parts_to_merge.empty())
|
||||
{
|
||||
if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds)
|
||||
{
|
||||
auto best_partition_it = std::max_element(
|
||||
partitions_info.begin(),
|
||||
partitions_info.end(),
|
||||
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
|
||||
|
||||
assert(best_partition_it != partitions_info.end());
|
||||
|
||||
if (static_cast<size_t>(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds)
|
||||
return selectAllPartsToMergeWithinPartition(
|
||||
future_part,
|
||||
can_merge_callback,
|
||||
best_partition_it->first,
|
||||
/*final=*/true,
|
||||
metadata_snapshot,
|
||||
txn,
|
||||
out_disable_reason,
|
||||
/*optimize_skip_merged_partitions=*/true);
|
||||
}
|
||||
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
|
||||
*out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
}
|
||||
@ -413,7 +524,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
parts.reserve(parts_to_merge.size());
|
||||
for (IMergeSelector::Part & part_info : parts_to_merge)
|
||||
{
|
||||
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
|
||||
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
|
||||
parts.push_back(part);
|
||||
}
|
||||
|
||||
@ -422,6 +533,28 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
return SelectPartsDecision::SELECTED;
|
||||
}
|
||||
|
||||
String MergeTreeDataMergerMutator::getBestPartitionToOptimizeEntire(
|
||||
const PartitionsInfo & partitions_info) const
|
||||
{
|
||||
const auto data_settings = data.getSettings();
|
||||
if (!data_settings->min_age_to_force_merge_on_partition_only)
|
||||
return {};
|
||||
if (!data_settings->min_age_to_force_merge_seconds)
|
||||
return {};
|
||||
|
||||
auto best_partition_it = std::max_element(
|
||||
partitions_info.begin(),
|
||||
partitions_info.end(),
|
||||
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
|
||||
|
||||
assert(best_partition_it != partitions_info.end());
|
||||
|
||||
if (static_cast<size_t>(best_partition_it->second.min_age) < data_settings->min_age_to_force_merge_seconds)
|
||||
return {};
|
||||
|
||||
return best_partition_it->first;
|
||||
}
|
||||
|
||||
SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
|
||||
FutureMergedMutatedPartPtr future_part,
|
||||
const AllowedMergingPredicate & can_merge,
|
||||
|
@ -62,6 +62,59 @@ public:
|
||||
*/
|
||||
UInt64 getMaxSourcePartSizeForMutation() const;
|
||||
|
||||
struct PartitionInfo
|
||||
{
|
||||
time_t min_age{std::numeric_limits<time_t>::max()};
|
||||
};
|
||||
using PartitionsInfo = std::unordered_map<std::string, PartitionInfo>;
|
||||
|
||||
using PartitionIdsHint = std::unordered_set<String>;
|
||||
|
||||
/// The first step of selecting parts to merge: returns a list of all active/visible parts
|
||||
MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const;
|
||||
|
||||
/// Same as above, but filters partitions according to partitions_hint
|
||||
MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
const PartitionIdsHint * partitions_hint) const;
|
||||
|
||||
struct MergeSelectingInfo
|
||||
{
|
||||
time_t current_time;
|
||||
PartitionsInfo partitions_info;
|
||||
IMergeSelector::PartsRanges parts_ranges;
|
||||
size_t parts_selected_precondition = 0;
|
||||
};
|
||||
|
||||
/// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback.
|
||||
/// All parts within a range can be merged without violating some invariants.
|
||||
MergeSelectingInfo getPossibleMergeRanges(
|
||||
const MergeTreeData::DataPartsVector & data_parts,
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
String * out_disable_reason = nullptr) const;
|
||||
|
||||
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
|
||||
SelectPartsDecision selectPartsToMergeFromRanges(
|
||||
FutureMergedMutatedPartPtr future_part,
|
||||
bool aggressive,
|
||||
size_t max_total_size_to_merge,
|
||||
bool merge_with_ttl_allowed,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IMergeSelector::PartsRanges & parts_ranges,
|
||||
const time_t & current_time,
|
||||
String * out_disable_reason = nullptr,
|
||||
bool dry_run = false);
|
||||
|
||||
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
|
||||
|
||||
/// Useful to quickly get a list of partitions that contain parts that we may want to merge
|
||||
PartitionIdsHint getPartitionsThatMayBeMerged(
|
||||
size_t max_total_size_to_merge,
|
||||
const AllowedMergingPredicate & can_merge_callback,
|
||||
bool merge_with_ttl_allowed,
|
||||
const MergeTreeTransactionPtr & txn) const;
|
||||
|
||||
/** Selects which parts to merge. Uses a lot of heuristics.
|
||||
*
|
||||
* can_merge - a function that determines if it is possible to merge a pair of adjacent parts.
|
||||
@ -76,7 +129,8 @@ public:
|
||||
const AllowedMergingPredicate & can_merge,
|
||||
bool merge_with_ttl_allowed,
|
||||
const MergeTreeTransactionPtr & txn,
|
||||
String * out_disable_reason = nullptr);
|
||||
String * out_disable_reason = nullptr,
|
||||
const PartitionIdsHint * partitions_hint = nullptr);
|
||||
|
||||
/** Select all the parts in the specified partition for merge, if possible.
|
||||
* final - choose to merge even a single part - that is, allow to merge one part "with itself",
|
||||
|
@ -41,6 +41,9 @@ struct ReplicatedMergeTreeMutationEntry
|
||||
using BlockNumbersType = std::map<String, Int64>;
|
||||
BlockNumbersType block_numbers;
|
||||
|
||||
/// List of partitions that do not have relevant uncommitted blocks to mutate
|
||||
mutable std::unordered_set<String> checked_partitions_cache;
|
||||
|
||||
/// Mutation commands which will give to MUTATE_PART entries
|
||||
MutationCommands commands;
|
||||
|
||||
|
@ -160,14 +160,13 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
|
||||
|
||||
Strings children = zookeeper->getChildren(queue_path);
|
||||
|
||||
auto to_remove_it = std::remove_if(
|
||||
children.begin(), children.end(), [&](const String & path)
|
||||
size_t removed_entries = std::erase_if(children,
|
||||
[&](const String & path)
|
||||
{
|
||||
return already_loaded_paths.count(path);
|
||||
});
|
||||
|
||||
LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", (to_remove_it - children.begin()), (children.end() - to_remove_it));
|
||||
children.erase(to_remove_it, children.end());
|
||||
LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", children.size(), removed_entries);
|
||||
|
||||
::sort(children.begin(), children.end());
|
||||
|
||||
@ -808,13 +807,15 @@ QueueRepresentation getQueueRepresentation(const std::list<ReplicatedMergeTreeLo
|
||||
///
|
||||
/// From the first glance it can sound that these two sets should be enough to understand which parts we have to mutate
|
||||
/// to finish mutation but it's not true:
|
||||
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished). We also have to account parts which we will
|
||||
/// get after queue execution.
|
||||
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue. So if we need to execute mutation 0000000001 for part all_0_0_0
|
||||
/// and we have already pulled entry to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
|
||||
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished).
|
||||
/// We also have to account parts which we will get after queue execution.
|
||||
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue.
|
||||
/// So if we need to execute mutation 0000000001 for part all_0_0_0 and we have already pulled entry
|
||||
/// to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
|
||||
///
|
||||
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts if they could be affected by mutation. Such approach is expensive
|
||||
/// but we do it only once since we get the mutation. After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
|
||||
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts
|
||||
/// if they could be affected by mutation. Such approach is expensive but we do it only once since we get the mutation.
|
||||
/// After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
|
||||
ActiveDataPartSet getPartNamesToMutate(
|
||||
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & current_parts,
|
||||
const QueueRepresentation & queue_representation, MergeTreeDataFormatVersion format_version)
|
||||
@ -1759,7 +1760,8 @@ size_t ReplicatedMergeTreeQueue::countUnfinishedMutations() const
|
||||
}
|
||||
|
||||
|
||||
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint)
|
||||
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper,
|
||||
std::optional<PartitionIdsHint> && partition_ids_hint)
|
||||
{
|
||||
return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint));
|
||||
}
|
||||
@ -1874,7 +1876,9 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
||||
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
|
||||
if (mutation.parts_to_do.size() != 0)
|
||||
{
|
||||
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
|
||||
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number. "
|
||||
"It's OK, tasks for rest parts will be skipped, but probably a lot of mutations "
|
||||
"were executed concurrently on different replicas.", znode);
|
||||
mutation.parts_to_do.clear();
|
||||
}
|
||||
}
|
||||
@ -1900,14 +1904,15 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
|
||||
PartitionIdsHint partition_ids_hint;
|
||||
for (const auto & candidate : candidates)
|
||||
for (const auto & partitions : candidate->block_numbers)
|
||||
partition_ids_hint.insert(partitions.first);
|
||||
if (!candidate->checked_partitions_cache.contains(partitions.first))
|
||||
partition_ids_hint.insert(partitions.first);
|
||||
|
||||
auto merge_pred = getMergePredicate(zookeeper, std::move(partition_ids_hint));
|
||||
|
||||
std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
|
||||
for (const auto & candidate : candidates)
|
||||
{
|
||||
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers))
|
||||
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers, candidate->checked_partitions_cache))
|
||||
finished.push_back(candidate.get());
|
||||
}
|
||||
|
||||
@ -2098,9 +2103,15 @@ ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue()
|
||||
return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex);
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_)
|
||||
LocalMergePredicate::LocalMergePredicate(ReplicatedMergeTreeQueue & queue_)
|
||||
: queue(queue_)
|
||||
{
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, std::optional<PartitionIdsHint> && partition_ids_hint_)
|
||||
: nested_pred(queue_)
|
||||
, queue(queue_)
|
||||
, partition_ids_hint(std::move(partition_ids_hint_))
|
||||
, prev_virtual_parts(queue.format_version)
|
||||
{
|
||||
@ -2117,21 +2128,33 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
/// Dropped (or cleaned up by TTL) partitions are never removed from ZK,
|
||||
/// so without hint it can do a few thousands requests (if not using MultiRead).
|
||||
Strings partitions;
|
||||
if (partition_ids_hint.empty())
|
||||
if (!partition_ids_hint)
|
||||
partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
|
||||
else
|
||||
std::copy(partition_ids_hint.begin(), partition_ids_hint.end(), std::back_inserter(partitions));
|
||||
std::copy(partition_ids_hint->begin(), partition_ids_hint->end(), std::back_inserter(partitions));
|
||||
|
||||
std::vector<std::string> paths;
|
||||
paths.reserve(partitions.size());
|
||||
for (const String & partition : partitions)
|
||||
paths.push_back(fs::path(queue.zookeeper_path) / "block_numbers" / partition);
|
||||
|
||||
auto locks_children = zookeeper->getChildren(paths);
|
||||
auto locks_children = zookeeper->tryGetChildren(paths);
|
||||
|
||||
for (size_t i = 0; i < partitions.size(); ++i)
|
||||
{
|
||||
Strings partition_block_numbers = locks_children[i].names;
|
||||
auto & response = locks_children[i];
|
||||
if (response.error != Coordination::Error::ZOK && !partition_ids_hint)
|
||||
throw Coordination::Exception(response.error, paths[i]);
|
||||
|
||||
if (response.error != Coordination::Error::ZOK)
|
||||
{
|
||||
/// Probably a wrong hint was provided (it's ok if a user passed non-existing partition to OPTIMIZE)
|
||||
LOG_WARNING(queue.log, "Partition id '{}' was provided as a hint, but there's not such partition in ZooKeeper", partitions[i]);
|
||||
partition_ids_hint->erase(partitions[i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
Strings partition_block_numbers = response.names;
|
||||
for (const String & entry : partition_block_numbers)
|
||||
{
|
||||
if (!startsWith(entry, "block-"))
|
||||
@ -2166,6 +2189,18 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
inprogress_quorum_part.clear();
|
||||
}
|
||||
|
||||
bool LocalMergePredicate::operator()(
|
||||
const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
const MergeTreeTransaction *,
|
||||
String * out_reason) const
|
||||
{
|
||||
if (left)
|
||||
return canMergeTwoParts(left, right, out_reason);
|
||||
else
|
||||
return canMergeSinglePart(right, out_reason);
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeMergePredicate::operator()(
|
||||
const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
@ -2253,7 +2288,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
|
||||
|
||||
if (left_max_block + 1 < right_min_block)
|
||||
{
|
||||
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(left->info.partition_id))
|
||||
if (partition_ids_hint && !partition_ids_hint->contains(left->info.partition_id))
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
|
||||
@ -2277,6 +2312,17 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
|
||||
}
|
||||
}
|
||||
|
||||
return nested_pred.canMergeTwoParts(left, right, out_reason);
|
||||
}
|
||||
|
||||
bool LocalMergePredicate::canMergeTwoParts(
|
||||
const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
String * out_reason) const
|
||||
{
|
||||
Int64 left_max_block = left->info.max_block;
|
||||
Int64 right_min_block = right->info.min_block;
|
||||
|
||||
std::lock_guard lock(queue.state_mutex);
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : {left, right})
|
||||
@ -2354,6 +2400,11 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
|
||||
return false;
|
||||
}
|
||||
|
||||
return nested_pred.canMergeSinglePart(part, out_reason);
|
||||
}
|
||||
|
||||
bool LocalMergePredicate::canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const
|
||||
{
|
||||
std::lock_guard lock(queue.state_mutex);
|
||||
|
||||
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
|
||||
@ -2476,7 +2527,8 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const
|
||||
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
|
||||
std::unordered_set<String> & checked_partitions_cache) const
|
||||
{
|
||||
/// Check committing block numbers, maybe some affected inserts
|
||||
/// still not written to disk and committed to ZK.
|
||||
@ -2485,7 +2537,11 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
|
||||
const String & partition_id = kv.first;
|
||||
Int64 block_num = kv.second;
|
||||
|
||||
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(partition_id))
|
||||
/// Maybe we already know that there are no relevant uncommitted blocks
|
||||
if (checked_partitions_cache.contains(partition_id))
|
||||
continue;
|
||||
|
||||
if (partition_ids_hint && !partition_ids_hint->contains(partition_id))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id);
|
||||
|
||||
auto partition_it = committing_blocks.find(partition_id);
|
||||
@ -2499,6 +2555,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// There are no committing blocks less than block_num in that partition and there's no way they can appear
|
||||
/// TODO Why not to get committing blocks when pulling a mutation? We could get rid of finalization task or simplify it
|
||||
checked_partitions_cache.insert(partition_id);
|
||||
}
|
||||
|
||||
std::lock_guard lock(queue.state_mutex);
|
||||
|
@ -32,6 +32,7 @@ class ReplicatedMergeTreeQueue
|
||||
{
|
||||
private:
|
||||
friend class CurrentlyExecuting;
|
||||
friend class LocalMergePredicate;
|
||||
friend class ReplicatedMergeTreeMergePredicate;
|
||||
friend class MergeFromLogEntryTask;
|
||||
friend class ReplicatedMergeMutateTaskBase;
|
||||
@ -390,7 +391,8 @@ public:
|
||||
size_t countUnfinishedMutations() const;
|
||||
|
||||
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
|
||||
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
|
||||
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper,
|
||||
std::optional<PartitionIdsHint> && partition_ids_hint);
|
||||
|
||||
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version,
|
||||
Strings & mutation_ids) const;
|
||||
@ -489,10 +491,33 @@ public:
|
||||
void createLogEntriesToFetchBrokenParts();
|
||||
};
|
||||
|
||||
/// Lightweight version of ReplicatedMergeTreeMergePredicate that do not make any ZooKeeper requests,
|
||||
/// but may return false-positive results. Checks only a subset of required conditions.
|
||||
class LocalMergePredicate
|
||||
{
|
||||
public:
|
||||
LocalMergePredicate(ReplicatedMergeTreeQueue & queue_);
|
||||
|
||||
bool operator()(const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
const MergeTreeTransaction * txn,
|
||||
String * out_reason = nullptr) const;
|
||||
|
||||
bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left,
|
||||
const MergeTreeData::DataPartPtr & right,
|
||||
String * out_reason = nullptr) const;
|
||||
|
||||
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
|
||||
|
||||
private:
|
||||
const ReplicatedMergeTreeQueue & queue;
|
||||
};
|
||||
|
||||
class ReplicatedMergeTreeMergePredicate
|
||||
{
|
||||
public:
|
||||
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_);
|
||||
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper,
|
||||
std::optional<PartitionIdsHint> && partition_ids_hint_);
|
||||
|
||||
/// Depending on the existence of left part checks a merge predicate for two parts or for single part.
|
||||
bool operator()(const MergeTreeData::DataPartPtr & left,
|
||||
@ -523,7 +548,8 @@ public:
|
||||
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
|
||||
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
|
||||
|
||||
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const;
|
||||
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
|
||||
std::unordered_set<String> & checked_partitions_cache) const;
|
||||
|
||||
/// The version of "log" node that is used to check that no new merges have appeared.
|
||||
int32_t getVersion() const { return merges_version; }
|
||||
@ -535,9 +561,11 @@ public:
|
||||
String getCoveringVirtualPart(const String & part_name) const;
|
||||
|
||||
private:
|
||||
LocalMergePredicate nested_pred;
|
||||
|
||||
const ReplicatedMergeTreeQueue & queue;
|
||||
|
||||
PartitionIdsHint partition_ids_hint;
|
||||
std::optional<PartitionIdsHint> partition_ids_hint;
|
||||
|
||||
/// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
|
||||
ActiveDataPartSet prev_virtual_parts;
|
||||
|
@ -11,7 +11,7 @@ namespace DB
|
||||
|
||||
const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info)
|
||||
{
|
||||
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
|
||||
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
|
||||
return part->info.partition_id;
|
||||
}
|
||||
|
||||
@ -90,8 +90,11 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
|
||||
++best_end;
|
||||
}
|
||||
|
||||
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
|
||||
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
|
||||
if (!dry_run)
|
||||
{
|
||||
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
|
||||
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
|
||||
}
|
||||
|
||||
return PartsRange(best_begin, best_end);
|
||||
}
|
||||
|
@ -21,10 +21,11 @@ class ITTLMergeSelector : public IMergeSelector
|
||||
public:
|
||||
using PartitionIdToTTLs = std::map<String, time_t>;
|
||||
|
||||
ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_)
|
||||
ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool dry_run_)
|
||||
: current_time(current_time_)
|
||||
, merge_due_times(merge_due_times_)
|
||||
, merge_cooldown_time(merge_cooldown_time_)
|
||||
, dry_run(dry_run_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -46,6 +47,7 @@ protected:
|
||||
private:
|
||||
PartitionIdToTTLs & merge_due_times;
|
||||
Int64 merge_cooldown_time;
|
||||
bool dry_run;
|
||||
};
|
||||
|
||||
|
||||
@ -56,8 +58,9 @@ class TTLDeleteMergeSelector : public ITTLMergeSelector
|
||||
public:
|
||||
using PartitionIdToTTLs = std::map<String, time_t>;
|
||||
|
||||
TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_)
|
||||
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_)
|
||||
TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
|
||||
bool only_drop_parts_, bool dry_run_)
|
||||
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
|
||||
, only_drop_parts(only_drop_parts_) {}
|
||||
|
||||
time_t getTTLForPart(const IMergeSelector::Part & part) const override;
|
||||
@ -75,8 +78,9 @@ private:
|
||||
class TTLRecompressMergeSelector : public ITTLMergeSelector
|
||||
{
|
||||
public:
|
||||
TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, const TTLDescriptions & recompression_ttls_)
|
||||
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_)
|
||||
TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
|
||||
const TTLDescriptions & recompression_ttls_, bool dry_run_)
|
||||
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
|
||||
, recompression_ttls(recompression_ttls_)
|
||||
{}
|
||||
|
||||
|
@ -3243,7 +3243,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
|
||||
auto zookeeper = getZooKeeperAndAssertNotReadonly();
|
||||
|
||||
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, getAllPartitionIds());
|
||||
std::optional<ReplicatedMergeTreeMergePredicate> merge_pred;
|
||||
|
||||
/// If many merges is already queued, then will queue only small enough merges.
|
||||
/// Otherwise merge queue could be filled with only large merges,
|
||||
@ -3280,8 +3280,22 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
if (storage_settings.get()->assign_part_uuids)
|
||||
future_merged_part->uuid = UUIDHelpers::generateV4();
|
||||
|
||||
if (max_source_parts_size_for_merge > 0 &&
|
||||
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr) == SelectPartsDecision::SELECTED)
|
||||
bool can_assign_merge = max_source_parts_size_for_merge > 0;
|
||||
PartitionIdsHint partitions_to_merge_in;
|
||||
if (can_assign_merge)
|
||||
{
|
||||
auto lightweight_merge_pred = LocalMergePredicate(queue);
|
||||
partitions_to_merge_in = merger_mutator.getPartitionsThatMayBeMerged(
|
||||
max_source_parts_size_for_merge, lightweight_merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR);
|
||||
if (partitions_to_merge_in.empty())
|
||||
can_assign_merge = false;
|
||||
else
|
||||
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
|
||||
}
|
||||
|
||||
if (can_assign_merge &&
|
||||
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred,
|
||||
merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
|
||||
{
|
||||
create_result = createLogEntryToMergeParts(
|
||||
zookeeper,
|
||||
@ -3293,13 +3307,17 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
deduplicate_by_columns,
|
||||
cleanup,
|
||||
nullptr,
|
||||
merge_pred.getVersion(),
|
||||
merge_pred->getVersion(),
|
||||
future_merged_part->merge_type);
|
||||
}
|
||||
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
|
||||
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
|
||||
&& merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue)
|
||||
{
|
||||
/// We don't need the list of committing blocks to choose a part to mutate
|
||||
if (!merge_pred)
|
||||
merge_pred.emplace(queue.getMergePredicate(zookeeper, PartitionIdsHint{}));
|
||||
|
||||
/// Choose a part to mutate.
|
||||
DataPartsVector data_parts = getDataPartsVectorForInternalUsage();
|
||||
for (const auto & part : data_parts)
|
||||
@ -3307,7 +3325,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
|
||||
continue;
|
||||
|
||||
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
|
||||
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part);
|
||||
if (!desired_mutation_version)
|
||||
continue;
|
||||
|
||||
@ -3316,7 +3334,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
future_merged_part->uuid,
|
||||
desired_mutation_version->first,
|
||||
desired_mutation_version->second,
|
||||
merge_pred.getVersion());
|
||||
merge_pred->getVersion());
|
||||
|
||||
if (create_result == CreateMergeEntryResult::Ok ||
|
||||
create_result == CreateMergeEntryResult::LogUpdated)
|
||||
|
@ -0,0 +1 @@
|
||||
/test/02439/s1/default/block_numbers/123
|
@ -0,0 +1,28 @@
|
||||
|
||||
drop table if exists rmt;
|
||||
|
||||
create table rmt (n int, m int) engine=ReplicatedMergeTree('/test/02439/{shard}/{database}', '{replica}') partition by n order by n;
|
||||
insert into rmt select number, number from numbers(50);
|
||||
insert into rmt values (1, 2);
|
||||
insert into rmt values (1, 3);
|
||||
insert into rmt values (1, 4);
|
||||
insert into rmt values (1, 5);
|
||||
insert into rmt values (1, 6);
|
||||
insert into rmt values (1, 7);
|
||||
insert into rmt values (1, 8);
|
||||
insert into rmt values (1, 9);
|
||||
-- there's nothing to merge in all partitions but '1'
|
||||
|
||||
optimize table rmt partition tuple(123);
|
||||
|
||||
set optimize_throw_if_noop=1;
|
||||
optimize table rmt partition tuple(123); -- { serverError CANNOT_ASSIGN_OPTIMIZE }
|
||||
|
||||
select sleepEachRow(3) as higher_probability_of_reproducing_the_issue format Null;
|
||||
system flush logs;
|
||||
|
||||
-- it should not list unneeded partitions where we cannot merge anything
|
||||
select distinct path from system.zookeeper_log where path like '/test/02439/s1/' || currentDatabase() || '/block_numbers/%'
|
||||
and op_num in ('List', 'SimpleList', 'FilteredList') and path not like '%/block_numbers/1';
|
||||
|
||||
drop table rmt;
|
Loading…
Reference in New Issue
Block a user