diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index ef7ebead966..45e16e81208 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -715,6 +715,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C for (const String & produced_part_name : queue_entry->getVirtualPartNames()) { auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version); + + /// Oddly enough, getVirtualPartNames() may return _virtual_ part name. + /// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list. + /// Fortunately, it's easy to distinguish virtual parts from normal parts by part level. + /// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...) + auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION + auto another_max_level = std::numeric_limits::max(); /// REPLACE/MOVE PARTITION + if (part_info.level == max_level || part_info.level == another_max_level) + continue; + auto it = entry->block_numbers.find(part_info.partition_id); if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion()) mutation.parts_to_do.add(produced_part_name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 0c0d7ea6407..900b9d9c8ee 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4053,6 +4053,7 @@ Pipe StorageReplicatedMergeTree::alterPartition( /// If new version returns ordinary name, else returns part name containing the first and last month of the month +/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...) static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info) { if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) @@ -4068,7 +4069,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, return part_info.getPartName(); } -bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info) +bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition) { /// Even if there is no data in the partition, you still need to mark the range for deletion. /// - Because before executing DETACH, tasks for downloading parts to this partition can be executed. @@ -4091,14 +4092,21 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St mutation_version = queue.getCurrentMutationVersion(partition_id, right); } - /// Empty partition. - if (right == 0) - return false; + /// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason. + auto max_level = std::numeric_limits::max(); + if (!for_replace_partition) + { + max_level = MergeTreePartInfo::MAX_LEVEL; - --right; + /// Empty partition. + if (right == 0) + return false; + + --right; + } /// Artificial high level is chosen, to make this part "covering" all parts inside. - part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version); + part_info = MergeTreePartInfo(partition_id, left, right, max_level, mutation_version); return true; } @@ -5305,11 +5313,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// Firstly, generate last block number and compute drop_range /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. + /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true? MergeTreePartInfo drop_range; - drop_range.partition_id = partition_id; - drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber(); - drop_range.min_block = replace ? 0 : drop_range.max_block; - drop_range.level = std::numeric_limits::max(); + getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true); + if (!replace) + drop_range.min_block = drop_range.max_block; String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); @@ -5388,6 +5396,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( } /// We are almost ready to commit changes, remove fetches and merges from drop range + /// FIXME it's unsafe to remove queue entries before we actually commit REPLACE_RANGE to replication log queue.removePartProducingOpsInRange(zookeeper, drop_range, entry); /// Remove deduplication block_ids of replacing parts @@ -5502,11 +5511,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta /// A range for log entry to remove parts from the source table (myself). MergeTreePartInfo drop_range; - drop_range.partition_id = partition_id; - drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber(); - drop_range.min_block = 0; - drop_range.level = std::numeric_limits::max(); - + getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true); String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); if (drop_range.getBlocksCount() > 1) @@ -5561,6 +5566,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta drop_range_dest.max_block = drop_range.max_block; drop_range_dest.min_block = drop_range.max_block; drop_range_dest.level = drop_range.level; + drop_range_dest.mutation = drop_range.mutation; entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; entry.source_replica = dest_table_storage->replica_name; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index d851082d5c2..2126a4d2fec 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -526,7 +526,7 @@ private: /// Produce an imaginary part info covering all parts in the specified partition (at the call moment). /// Returns false if the partition doesn't exist yet. - bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info); + bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false); /// Check for a node in ZK. If it is, remember this information, and then immediately answer true. std::unordered_set existing_nodes_cache; diff --git a/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.reference b/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.reference new file mode 100644 index 00000000000..edf8a7e391c --- /dev/null +++ b/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.reference @@ -0,0 +1,14 @@ +1 1 +2 2 +3 3 +4 4 +mt 0 0_1_1_0 2 +rmt 0 0_0_0_0 2 +1 1 +2 2 +mt 0 0_1_1_0 2 +rmt 0 0_3_3_0 2 +0000000000 UPDATE s = concat(\'s\', toString(n)) WHERE 1 [] 0 1 +0000000001 DROP COLUMN s [] 0 1 +3 +4 diff --git a/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.sql b/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.sql new file mode 100644 index 00000000000..178f9b81ead --- /dev/null +++ b/tests/queries/0_stateless/01149_zookeeper_mutation_stuck_after_replace_partition.sql @@ -0,0 +1,39 @@ +drop table if exists mt; +drop table if exists rmt sync; + +create table mt (n UInt64, s String) engine = MergeTree partition by intDiv(n, 10) order by n; +insert into mt values (3, '3'), (4, '4'); + +create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n; +insert into rmt values (1,'1'), (2, '2'); + +select * from rmt; +select * from mt; +select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name; + +alter table rmt update s = 's'||toString(n) where 1; + +select * from rmt; +alter table rmt replace partition '0' from mt; + +select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name; + +alter table rmt drop column s; + +select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt'; +select * from rmt; + +drop table rmt sync; + +set replication_alter_partitions_sync=0; +create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n; +insert into rmt values (1,'1'), (2, '2'); + +alter table rmt update s = 's'||toString(n) where 1; +alter table rmt drop partition '0'; + +set replication_alter_partitions_sync=1; +alter table rmt drop column s; + +drop table mt; +drop table rmt sync;