Fix drop part bug

This commit is contained in:
alesapin 2021-06-23 22:24:43 +03:00
parent 3b13567b57
commit 81c74435a3
4 changed files with 47 additions and 22 deletions

View File

@ -2331,7 +2331,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c
if (part->info.partition_id != drop_range.partition_id)
throw Exception("Unexpected partition_id of part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (part->info.min_block < drop_range.min_block) /// NOTE Always false, because drop_range.min_block == 0
if (part->info.min_block < drop_range.min_block)
{
if (drop_range.min_block <= part->info.max_block)
{

View File

@ -137,14 +137,21 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
virtual_parts.add(virtual_part_name, nullptr, log);
addPartToMutations(virtual_part_name);
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type != LogEntry::DROP_RANGE)
addPartToMutations(virtual_part_name);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
{
queue.push_back(entry);
}
else
{
queue.push_front(entry);
}
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{

View File

@ -2194,23 +2194,37 @@ bool StorageReplicatedMergeTree::executeFetchShared(
void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
if (entry.detach)
LOG_DEBUG(log, "Detaching parts.");
else
LOG_DEBUG(log, "Removing parts.");
/// Delete the parts contained in the range to be deleted.
/// It's important that no old parts remain (after the merge), because otherwise,
/// after adding a new replica, this new replica downloads them, but does not delete them.
/// And, if you do not, the parts will come to life after the server is restarted.
/// Therefore, we use all data parts.
auto metadata_snapshot = getInMemoryMetadataPtr();
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
DataPartsVector parts_to_remove;
{
auto data_parts_lock = lockParts();
/// It's a DROP PART
if (!drop_range_info.isFakeDropRangePart())
{
auto containing_part = getActiveContainingPart(drop_range_info, MergeTreeDataPartState::Committed, data_parts_lock);
if (containing_part && containing_part->info != drop_range_info)
{
LOG_INFO(log, "Skipping drop range for part {} because covering part {} already exists", drop_range_info.getPartName(), containing_part->name);
return;
}
}
if (entry.detach)
LOG_DEBUG(log, "Detaching parts.");
else
LOG_DEBUG(log, "Removing parts.");
/// Delete the parts contained in the range to be deleted.
/// It's important that no old parts remain (after the merge), because otherwise,
/// after adding a new replica, this new replica downloads them, but does not delete them.
/// And, if you do not, the parts will come to life after the server is restarted.
/// Therefore, we use all data parts.
///
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range_info, true, data_parts_lock);
}
@ -6992,15 +7006,16 @@ bool StorageReplicatedMergeTree::dropPartImpl(
getClearBlocksInPartitionOps(ops, *zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block);
size_t clear_block_ops_size = ops.size();
/// Set fake level to treat this part as virtual in queue.
auto drop_part_info = part->info;
drop_part_info.level = MergeTreePartInfo::MAX_LEVEL;
/// If `part_name` is result of a recent merge and source parts are still available then
/// DROP_RANGE with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_RANGE;
entry.source_replica = replica_name;
entry.new_part_name = getPartNamePossiblyFake(format_version, drop_part_info);
/// We don't set fake drop level (999999999) for the single part DROP_RANGE.
/// First of all we don't guarantee anything other than the part will not be
/// active after DROP PART, but covering part (without data of dropped part) can exist.
/// If we add part with 9999999 level than we can break invariant in virtual_parts of
/// the queue.
entry.new_part_name = getPartNamePossiblyFake(format_version, part->info);
entry.detach = detach;
entry.create_time = time(nullptr);

View File

@ -7,9 +7,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
NUM_REPLICAS=5
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS ttl_table$i"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS ttl_table$i" &
done
wait
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n --query "CREATE TABLE ttl_table$i(
@ -18,7 +19,7 @@ for i in $(seq 1 $NUM_REPLICAS); do
ENGINE ReplicatedMergeTree('/test/01921_concurrent_ttl_and_normal_merges/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/ttl_table', '$i')
ORDER BY tuple()
TTL key + INTERVAL 1 SECOND
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100;"
SETTINGS merge_with_ttl_timeout=1, max_replicated_merges_with_ttl_in_queue=100, max_number_of_merges_with_ttl_in_pool=100, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
done
function optimize_thread
@ -66,5 +67,7 @@ $CLICKHOUSE_CLIENT --query "SELECT * FROM system.replication_queue where table l
$CLICKHOUSE_CLIENT --query "SELECT COUNT() > 0 FROM system.part_log where table like 'ttl_table%' and database = '${CLICKHOUSE_DATABASE}'"
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS ttl_table$i"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS ttl_table$i" &
done
wait