provide hint for loading uncommitted blocks in merge predicate

This commit is contained in:
Alexander Tokmakov 2022-11-25 16:41:20 +01:00
parent 41d405476a
commit 5cc99312d7
5 changed files with 63 additions and 13 deletions

View File

@ -4714,6 +4714,21 @@ std::set<String> MergeTreeData::getPartitionIdsAffectedByCommands(
return affected_partition_ids;
}
std::unordered_set<String> MergeTreeData::getAllPartitionIds() const
{
std::unordered_set<String> res;
String prev_id;
for (const auto & part : getDataPartsStateRange(DataPartState::Active))
{
if (prev_id == part->info.partition_id)
continue;
res.insert(part->info.partition_id);
prev_id = part->info.partition_id;
}
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorForInternalUsage(
const DataPartStates & affordable_states, const DataPartsLock & /*lock*/, DataPartStateVector * out_states) const

View File

@ -801,6 +801,9 @@ public:
std::unordered_set<String> getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const;
/// Returns set of partition_ids of all Active parts
std::unordered_set<String> getAllPartitionIds() const;
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
/// Tables structure should be locked.

View File

@ -1772,9 +1772,9 @@ size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
}
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper)
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint)
{
return ReplicatedMergeTreeMergePredicate(*this, zookeeper);
return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint));
}
@ -1882,8 +1882,13 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
/// We need to check committing block numbers and new parts which could be committed.
/// Actually we don't need most of predicate logic here but it all the code related to committing blocks
/// and updatading queue state is implemented there.
auto merge_pred = getMergePredicate(zookeeper);
/// and updatating queue state is implemented there.
PartitionIdsHint partition_ids_hint;
for (const auto & candidate : candidates)
for (const auto & partitions : candidate->block_numbers)
partition_ids_hint.insert(partitions.first);
auto merge_pred = getMergePredicate(zookeeper, /* partition_ids_hint */ {});
std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
for (const auto & candidate : candidates)
@ -2081,8 +2086,9 @@ ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue()
}
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper)
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_)
: queue(queue_)
, partition_ids_hint(std::move(partition_ids_hint_))
, prev_virtual_parts(queue.format_version)
{
{
@ -2094,7 +2100,15 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
auto quorum_status_future = zookeeper->asyncTryGet(fs::path(queue.zookeeper_path) / "quorum" / "status");
/// Load current inserts
Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
/// Hint avoids listing partitions that we don't really need.
/// Dropped (or cleaned up by TTL) partitions are never removed from ZK,
/// so without hint it can do a few thousands requests (if not using MultiRead).
Strings partitions;
if (partition_ids_hint.empty())
partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
else
std::copy(partition_ids_hint.begin(), partition_ids_hint.end(), std::back_inserter(partitions));
std::vector<std::string> paths;
paths.reserve(partitions.size());
for (const String & partition : partitions)
@ -2226,6 +2240,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
if (left_max_block + 1 < right_min_block)
{
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(left->info.partition_id))
{
if (out_reason)
*out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id);
return false;
}
auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id);
if (committing_blocks_in_partition != committing_blocks.end())
{
@ -2419,6 +2440,9 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
const String & partition_id = kv.first;
Int64 block_num = kv.second;
if (!partition_ids_hint.empty() && !partition_ids_hint.contains(partition_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id);
auto partition_it = committing_blocks.find(partition_id);
if (partition_it != committing_blocks.end())
{

View File

@ -25,6 +25,7 @@ class MergeTreeDataMergerMutator;
class ReplicatedMergeTreeMergePredicate;
class ReplicatedMergeTreeMergeStrategyPicker;
using PartitionIdsHint = std::unordered_set<String>;
class ReplicatedMergeTreeQueue
{
@ -382,7 +383,7 @@ public:
size_t countFinishedMutations() const;
/// Returns functor which used by MergeTreeMergerMutator to select parts for merge
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint);
/// Return the version (block number) of the last mutation that we don't need to apply to the part
/// with getDataVersion() == data_version. (Either this mutation was already applied or the part
@ -486,7 +487,7 @@ public:
class ReplicatedMergeTreeMergePredicate
{
public:
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper);
ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_);
/// Depending on the existence of left part checks a merge predicate for two parts or for single part.
bool operator()(const MergeTreeData::DataPartPtr & left,
@ -531,6 +532,8 @@ public:
private:
const ReplicatedMergeTreeQueue & queue;
PartitionIdsHint partition_ids_hint;
/// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
ActiveDataPartSet prev_virtual_parts;
/// partition ID -> block numbers of the inserts and mutations that are about to commit

View File

@ -3128,7 +3128,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto zookeeper = getZooKeeperAndAssertNotReadonly();
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, getAllPartitionIds());
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
@ -4573,7 +4573,12 @@ bool StorageReplicatedMergeTree::optimize(
/// We must select parts for merge under merge_selecting_mutex because other threads
/// (merge_selecting_thread or OPTIMIZE queries) could assign new merges.
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper);
PartitionIdsHint partition_ids_hint;
if (partition_id.empty())
partition_ids_hint = getAllPartitionIds();
else
partition_ids_hint.insert(partition_id);
ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper, std::move(partition_ids_hint));
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
@ -7057,7 +7062,7 @@ void StorageReplicatedMergeTree::movePartitionToShard(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} does not have an uuid assigned and it can't be moved between shards", part_name);
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, {part_info.partition_id});
/// The following block is pretty much copy & paste from StorageReplicatedMergeTree::dropPart to avoid conflicts while this is WIP.
/// Extract it to a common method and re-use it before merging.
@ -7265,7 +7270,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
while (true)
{
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, {part_info.partition_id});
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Active});
@ -8334,7 +8339,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
/// We can enqueue part for check from DataPartExchange or SelectProcessor
/// and it's hard to synchronize it with ReplicatedMergeTreeQueue and PartCheckThread...
/// But at least we can ignore parts that are definitely not needed according to virtual parts and drop ranges.
auto pred = queue.getMergePredicate(zookeeper);
auto pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{new_part_info.partition_id});
String covering_virtual = pred.getCoveringVirtualPart(lost_part_name);
if (covering_virtual.empty())
{