fix virtual parts in REPLACE_RANGE

This commit is contained in:
Alexander Tokmakov 2021-05-13 14:29:59 +03:00
parent 430f558234
commit e114c7eb8b
9 changed files with 184 additions and 68 deletions

View File

@ -1,10 +1,16 @@
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Common/Exception.h>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
: format_version(format_version_)
{
@ -15,6 +21,7 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_,
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{
/// TODO make it exception safe (out_replaced_parts->push_back(...) may throw)
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
if (getContainingPartImpl(part_info) != part_info_to_name.end())
@ -32,6 +39,8 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
--it;
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
++it;
break;
}
@ -47,11 +56,16 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
/// Let's go to the right.
while (it != part_info_to_name.end() && part_info.contains(it->first))
{
if (part_info == it->first)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", name);
if (out_replaced_parts)
out_replaced_parts->push_back(it->second);
part_info_to_name.erase(it++);
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
part_info_to_name.emplace(part_info, name);
return true;
}

View File

@ -2005,8 +2005,8 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}
if (!new_part_info.isDisjoint((*prev)->info))
throw Exception("Part " + new_part_name + " intersects previous part " + (*prev)->getNameWithState() +
". It is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.",
new_part_name, (*prev)->getNameWithState());
break;
}
@ -2019,7 +2019,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
while (end != committed_parts_range.end())
{
if ((*end)->info == new_part_info)
throw Exception("Unexpected duplicate part " + (*end)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState());
if (!new_part_info.contains((*end)->info))
{
@ -2030,8 +2030,8 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}
if (!new_part_info.isDisjoint((*end)->info))
throw Exception("Part " + new_part_name + " intersects next part " + (*end)->getNameWithState() +
". It is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
new_part_name, (*end)->getNameWithState());
break;
}

View File

@ -236,6 +236,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
/// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr;
String range_str;
size_t parts_selected_precondition = 0;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
@ -244,7 +245,11 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (!prev_partition_id || partition_id != *prev_partition_id)
{
if (parts_ranges.empty() || !parts_ranges.back().empty())
{
LOG_DEBUG(log, "selectPartsToMerge 1: range {}", range_str);
range_str.clear();
parts_ranges.emplace_back();
}
/// New partition frame.
prev_partition_id = &partition_id;
prev_part = nullptr;
@ -257,17 +262,26 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
* So we have to check if this part is currently being inserted with quorum and so on and so forth.
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts. */
if (!can_merge_callback(nullptr, part, nullptr))
String reason;
bool can = can_merge_callback(nullptr, part, &reason);
LOG_DEBUG(log, "Can merge single part {}: {} {}", part->name, can, reason);
if (!can)
continue;
}
else
{
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if (!can_merge_callback(*prev_part, part, nullptr))
String reason;
bool can = can_merge_callback(*prev_part, part, &reason);
LOG_DEBUG(log, "Can merge {} and {}: {} {}", (*prev_part)->name, part->name, can, reason);
if (!can)
{
/// Starting new interval in the same partition
assert(!parts_ranges.back().empty());
LOG_DEBUG(log, "selectPartsToMerge 2: range {}", range_str);
range_str.clear();
parts_ranges.emplace_back();
/// Now we have no previous part, but it affects only logging
@ -286,6 +300,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
++parts_selected_precondition;
range_str += part->name + " ";
parts_ranges.back().emplace_back(part_info);
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
@ -298,6 +313,9 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = &part;
}
LOG_DEBUG(log, "selectPartsToMerge 3: range {}", range_str);
LOG_DEBUG(log, "selectPartsToMerge: {} ranges", parts_ranges.size());
if (parts_selected_precondition == 0)
{
if (out_disable_reason)

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
@ -386,4 +387,35 @@ ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String
return res;
}
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
{
/// Doesn't produce any part
if (type == ALTER_METADATA)
return {};
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
return {new_part_name};
/// Return {} because selection of merges in the partition where the column is cleared
/// should not be blocked (only execution of merges should be blocked).
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
return {};
if (type == REPLACE_RANGE)
{
Strings res = replace_range_entry->new_part_names;
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
assert(drop_range_info.getBlocksCount() != 0);
if (drop_range_info.getBlocksCount() > 1)
{
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
res.emplace_back(replace_range_entry->drop_range_part_name);
}
return res;
}
return {new_part_name};
}
}

View File

@ -6,6 +6,7 @@
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Disks/IDisk.h>
#include <mutex>
@ -128,36 +129,13 @@ struct ReplicatedMergeTreeLogEntryData
/// Returns a set of parts that will appear after executing the entry + parts to block
/// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames() const
{
/// Doesn't produce any part
if (type == ALTER_METADATA)
return {};
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
return {new_part_name};
/// Return {} because selection of merges in the partition where the column is cleared
/// should not be blocked (only execution of merges should be blocked).
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
return {};
if (type == REPLACE_RANGE)
{
Strings res = replace_range_entry->new_part_names;
res.emplace_back(replace_range_entry->drop_range_part_name);
return res;
}
return {new_part_name};
}
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
/// These parts are added to future_parts.
Strings getBlockingPartNames() const
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
{
Strings res = getVirtualPartNames();
Strings res = getVirtualPartNames(format_version);
if (type == CLEAR_COLUMN)
res.emplace_back(new_part_name);

View File

@ -133,7 +133,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock)
{
for (const String & virtual_part_name : entry->getVirtualPartNames())
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
virtual_parts.add(virtual_part_name);
addPartToMutations(virtual_part_name);
@ -220,7 +220,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
for (const String & virtual_part_name : entry->getVirtualPartNames())
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
current_parts.add(virtual_part_name);
@ -249,7 +249,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
else
{
for (const String & virtual_part_name : entry->getVirtualPartNames())
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
/// Because execution of the entry is unsuccessful,
/// `virtual_part_name` will never appear so we won't need to mutate
@ -752,7 +752,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
/// mutation block number that would appear as a result of executing the queue.
for (const auto & queue_entry : queue)
{
for (const String & produced_part_name : queue_entry->getVirtualPartNames())
for (const String & produced_part_name : queue_entry->getVirtualPartNames(format_version))
{
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
@ -1033,7 +1033,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|| entry.type == LogEntry::ATTACH_PART
|| entry.type == LogEntry::MUTATE_PART)
{
for (const String & new_part_name : entry.getBlockingPartNames())
for (const String & new_part_name : entry.getBlockingPartNames(format_version))
{
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
@ -1251,7 +1251,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
++entry->num_tries;
entry->last_attempt_time = time(nullptr);
for (const String & new_part_name : entry->getBlockingPartNames())
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
{
if (!queue.future_parts.emplace(new_part_name, entry).second)
throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -1288,7 +1288,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false;
entry->execution_complete.notify_all();
for (const String & new_part_name : entry->getBlockingPartNames())
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
{
if (!queue.future_parts.erase(new_part_name))
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
@ -1585,7 +1585,7 @@ void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_nam
std::lock_guard lock(state_mutex);
virtual_parts.add(part_name);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); //FIXME
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); //FIXME
}
@ -1817,6 +1817,24 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
else
inprogress_quorum_part.clear();
String blocks_str;
for (const auto & partition : committing_blocks)
{
blocks_str += partition.first;
blocks_str += " (";
for (const auto & num : partition.second)
blocks_str += toString(num);
blocks_str += + ") ";
}
ActiveDataPartSet virtual_parts(queue.format_version);
{
std::lock_guard lock(queue.state_mutex);
virtual_parts = queue.virtual_parts;
}
LOG_DEBUG(queue.log, "MergePredicate: ver {},\t prev_virt {},\t comm {},\t, virt {},\t iqp {}",
merges_version, boost::algorithm::join(prev_virtual_parts.getParts(), ", "), blocks_str, boost::algorithm::join(virtual_parts.getParts(), ", "), inprogress_quorum_part);
}
bool ReplicatedMergeTreeMergePredicate::operator()(

View File

@ -225,6 +225,26 @@ static String extractZooKeeperPath(const String & path)
return normalizeZooKeeperPath(path);
}
static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(const String & partition_id)
{
/// NOTE We don't have special log entry type for MOVE PARTITION/ATTACH PARTITION FROM,
/// so we use REPLACE_RANGE with dummy range of one block, which means "attach, not replace".
/// It's safe to fill drop range for MOVE PARTITION/ATTACH PARTITION FROM with zeros,
/// because drop range for REPLACE PARTITION must contain at least 2 blocks,
/// so we can distinguish dummy drop range from any real or virtual part.
/// But we should never construct such part name, even for virtual part,
/// because it can be confused with real part <partition>_0_0_0.
/// TODO get rid of this.
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.min_block = 0;
drop_range.max_block = 0;
drop_range.level = 0;
drop_range.mutation = 0;
return drop_range;
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
@ -2149,13 +2169,16 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
Stopwatch watch;
auto & entry_replace = *entry.replace_range_entry;
LOG_DEBUG(log, "Executing log entry {} to replace parts range {} with {} parts from {}.{}",
entry.znode_name, entry_replace.drop_range_part_name, entry_replace.new_part_names.size(),
entry_replace.from_database, entry_replace.from_table);
auto metadata_snapshot = getInMemoryMetadataPtr();
MergeTreePartInfo drop_range = MergeTreePartInfo::fromPartName(entry_replace.drop_range_part_name, format_version);
/// Range with only one block has special meaning ATTACH PARTITION
bool replace = drop_range.getBlocksCount() > 1;
bool replace = drop_range.getBlocksCount() > 1; //FIXME
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry); //FIXME
struct PartDescription
{
@ -2226,7 +2249,16 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
if (parts_to_add.empty() && replace)
{
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
parts_to_remove_str += part->name;
parts_to_remove_str += " ";
}
LOG_TRACE(log, "Replacing {} parts {}with empty set", parts_to_remove.size(), parts_to_remove_str);
}
}
if (parts_to_add.empty())
@ -2361,8 +2393,9 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
/// Filter covered parts
PartDescriptions final_parts;
Strings final_part_names;
{
Strings final_part_names = adding_parts_active_set.getParts();
final_part_names = adding_parts_active_set.getParts();
for (const String & final_part_name : final_part_names)
{
@ -2380,7 +2413,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (!prev.found_new_part_info.isDisjoint(curr.found_new_part_info))
{
throw Exception("Intersected final parts detected: " + prev.found_new_part_name
+ " and " + curr.found_new_part_name + ". It should be investigated.", ErrorCodes::INCORRECT_DATA);
+ " and " + curr.found_new_part_name + ". It should be investigated.", ErrorCodes::LOGICAL_ERROR);
}
}
}
@ -2459,7 +2492,17 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
transaction.commit(&data_parts_lock);
if (replace)
{
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
parts_to_remove_str += part->name;
parts_to_remove_str += " ";
}
LOG_TRACE(log, "Replacing {} parts {}with {} parts ", parts_to_remove.size(), parts_to_remove_str,
final_parts.size(), boost::algorithm::join(final_part_names, ", "));
}
}
PartLog::addNewParts(getContext(), res_parts, watch.elapsed());
@ -4731,7 +4774,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version,
return part_info.getPartName();
}
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition)
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_range)
{
/// Even if there is no data in the partition, you still need to mark the range for deletion.
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
@ -4754,17 +4797,22 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
}
/// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason.
auto max_level = std::numeric_limits<decltype(part_info.level)>::max();
if (!for_replace_partition)
{
max_level = MergeTreePartInfo::MAX_LEVEL;
/// Empty partition.
if (right == 0)
return false;
--right;
decltype(part_info.level) max_level = MergeTreePartInfo::MAX_LEVEL;
if (for_replace_range)
{
/// REPLACE/MOVE PARTITION uses different max level for unknown (probably historical) reason.
max_level = std::numeric_limits<decltype(part_info.level)>::max();
/// NOTE Undo max block number decrement for REPLACE_RANGE, because there are invariants:
/// - drop range for REPLACE PARTITION must contain at least 2 blocks (1 skipped block and at least 1 real block)
/// - drop range for MOVE PARTITION/ATTACH PARTITION FROM always contains 1 block
++right;
}
/// Artificial high level is chosen, to make this part "covering" all parts inside.
@ -6069,13 +6117,26 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
MergeTreePartInfo drop_range;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
if (replace && partition_was_empty)
{
/// Nothing to drop, will just attach new parts
LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id);
replace = false;
}
if (!replace)
drop_range.min_block = drop_range.max_block;
{
/// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range
drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(drop_range.partition_id);
}
assert(drop_range.getBlocksCount() > 0);
assert(replace == (drop_range.getBlocksCount() > 1));
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
if (drop_range.getBlocksCount() > 1)
if (replace)
{
/// We have to prohibit merges in drop_range, since new merge log entry appeared after this REPLACE FROM entry
/// could produce new merged part instead in place of just deleted parts.
@ -6268,10 +6329,10 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
/// A range for log entry to remove parts from the source table (myself).
MergeTreePartInfo drop_range;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
bool partition_was_not_empty = getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
if (drop_range.getBlocksCount() > 1)
if (partition_was_not_empty)
{
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInBlockRange(drop_range_fake_part_name);
@ -6318,12 +6379,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
ReplicatedMergeTreeLogEntryData entry;
{
MergeTreePartInfo drop_range_dest;
drop_range_dest.partition_id = drop_range.partition_id;
drop_range_dest.max_block = drop_range.max_block;
drop_range_dest.min_block = drop_range.max_block; //FIXME typo?
drop_range_dest.level = drop_range.level;
drop_range_dest.mutation = drop_range.mutation;
MergeTreePartInfo drop_range_dest = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(drop_range.partition_id);
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = dest_table_storage->replica_name;

View File

@ -609,7 +609,7 @@ private:
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet.
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_range = false);
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
mutable std::unordered_set<std::string> existing_nodes_cache;

View File

@ -67,7 +67,7 @@ function drop_partition_thread()
while true; do
REPLICA=$(($RANDOM % 16))
PARTITION=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "ALTER TABLE dst_$TO_REPLICA DROP PARTITION $PARTITION" 2>/dev/null
$CLICKHOUSE_CLIENT -q "ALTER TABLE dst_$REPLICA DROP PARTITION $PARTITION" 2>/dev/null
sleep 0.$RANDOM;
done
}