diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c12e9d0270a..a7706d440fb 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2407,6 +2407,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo std::vector source_queue; ActiveDataPartSet get_part_set{format_version}; ActiveDataPartSet drop_range_set{format_version}; + std::unordered_set exact_part_names; { std::vector queue_get_futures; @@ -2444,14 +2445,22 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo info.parsed_entry->znode_name = source_queue_names[i]; if (info.parsed_entry->type == LogEntry::DROP_RANGE) + { drop_range_set.add(info.parsed_entry->new_part_name); - - if (info.parsed_entry->type == LogEntry::GET_PART) + } + else if (info.parsed_entry->type == LogEntry::GET_PART) { String maybe_covering_drop_range = drop_range_set.getContainingPart(info.parsed_entry->new_part_name); if (maybe_covering_drop_range.empty()) get_part_set.add(info.parsed_entry->new_part_name); } + else + { + /// We should keep local parts if they present in the queue of source replica. + /// There's a chance that we are the only replica that has these parts. + Strings entry_virtual_parts = info.parsed_entry->getVirtualPartNames(format_version); + std::move(entry_virtual_parts.begin(), entry_virtual_parts.end(), std::inserter(exact_part_names, exact_part_names.end())); + } } } @@ -2471,11 +2480,17 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo for (const auto & part : local_parts_in_zk) { - if (get_part_set.getContainingPart(part).empty()) - { - parts_to_remove_from_zk.emplace_back(part); - LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part); - } + /// We look for exact match (and not for any covering part) + /// because our part might be dropped and covering part might be merged though gap. + /// (avoid resurrection of data that was removed a long time ago) + if (get_part_set.getContainingPart(part) == part) + continue; + + if (exact_part_names.contains(part)) + continue; + + parts_to_remove_from_zk.emplace_back(part); + LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part); } { @@ -2497,11 +2512,14 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo for (const auto & part : local_active_parts) { - if (get_part_set.getContainingPart(part->name).empty()) - { - parts_to_remove_from_working_set.emplace_back(part); - LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name); - } + if (get_part_set.getContainingPart(part->name) == part->name) + continue; + + if (exact_part_names.contains(part->name)) + continue; + + parts_to_remove_from_working_set.emplace_back(part); + LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name); } if (getSettings()->detach_old_local_parts_when_cloning_replica) diff --git a/tests/queries/0_stateless/02448_clone_replica_lost_part.reference b/tests/queries/0_stateless/02448_clone_replica_lost_part.reference new file mode 100644 index 00000000000..af82f72c49e --- /dev/null +++ b/tests/queries/0_stateless/02448_clone_replica_lost_part.reference @@ -0,0 +1,13 @@ +1 [2,3,4,5] +2 [1,2,3,4,5] +3 [1,2,3,4,5] +4 [3,4,5] +5 [1,2,3,4,5] +6 [1,2,3,4,5] +7 [1,2,3,4,5,20,30,40,50] +8 [1,2,3,4,5,10,20,30,40,50] +9 [1,2,3,4,5,10,20,30,40,50] +['all_18_23_1','all_7_17_2_13'] +10 [1,2,3,4,5,10,20,30,40,50] +11 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600] +12 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600] diff --git a/tests/queries/0_stateless/02448_clone_replica_lost_part.sql b/tests/queries/0_stateless/02448_clone_replica_lost_part.sql new file mode 100644 index 00000000000..d395caf41db --- /dev/null +++ b/tests/queries/0_stateless/02448_clone_replica_lost_part.sql @@ -0,0 +1,147 @@ + +drop table if exists rmt1; +drop table if exists rmt2; +create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple() + settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=5; +create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '2') order by tuple() + settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=5; + +-- insert part only on one replica +system stop replicated sends rmt1; +insert into rmt1 values (1); +detach table rmt1; -- make replica inactive +system start replicated sends rmt1; + +-- trigger log rotation, rmt1 will be lost +insert into rmt2 values (2); +insert into rmt2 values (3); +insert into rmt2 values (4); +insert into rmt2 values (5); +-- check that entry was not removed from the queue (part is not lost) +set receive_timeout=5; +system sync replica rmt2; -- {serverError TIMEOUT_EXCEEDED} +set receive_timeout=300; + +select 1, arraySort(groupArray(n)) from rmt2; + +-- rmt1 will mimic rmt2 +attach table rmt1; +system sync replica rmt1; +system sync replica rmt2; + +-- check that no parts are lost +select 2, arraySort(groupArray(n)) from rmt1; +select 3, arraySort(groupArray(n)) from rmt2; + + +truncate table rmt1; +truncate table rmt2; + + +-- insert parts only on one replica and merge them +system stop replicated sends rmt2; +insert into rmt2 values (1); +insert into rmt2 values (2); +system sync replica rmt2; +optimize table rmt2 final; +system sync replica rmt2; +-- give it a chance to remove source parts +select sleep(2) format Null; -- increases probability of reproducing the issue +detach table rmt2; +system start replicated sends rmt2; + + +-- trigger log rotation, rmt2 will be lost +insert into rmt1 values (3); +insert into rmt1 values (4); +insert into rmt1 values (5); +set receive_timeout=5; +-- check that entry was not removed from the queue (part is not lost) +system sync replica rmt1; -- {serverError TIMEOUT_EXCEEDED} +set receive_timeout=300; + +select 4, arraySort(groupArray(n)) from rmt1; + +-- rmt1 will mimic rmt2 +system stop fetches rmt1; +attach table rmt2; +system sync replica rmt2; +-- give rmt2 a chance to remove merged part (but it should not do it) +select sleep(2) format Null; -- increases probability of reproducing the issue +system start fetches rmt1; +system sync replica rmt1; + +-- check that no parts are lost +select 5, arraySort(groupArray(n)) from rmt1; +select 6, arraySort(groupArray(n)) from rmt2; + + +-- insert part only on one replica +system stop replicated sends rmt1; +insert into rmt1 values (123); +alter table rmt1 update n=10 where n=123 settings mutations_sync=1; +-- give it a chance to remove source part +select sleep(2) format Null; -- increases probability of reproducing the issue +detach table rmt1; -- make replica inactive +system start replicated sends rmt1; + +-- trigger log rotation, rmt1 will be lost +insert into rmt2 values (20); +insert into rmt2 values (30); +insert into rmt2 values (40); +insert into rmt2 values (50); +-- check that entry was not removed from the queue (part is not lost) +set receive_timeout=5; +system sync replica rmt2; -- {serverError TIMEOUT_EXCEEDED} +set receive_timeout=300; + +select 7, arraySort(groupArray(n)) from rmt2; + +-- rmt1 will mimic rmt2 +system stop fetches rmt2; +attach table rmt1; +system sync replica rmt1; +-- give rmt1 a chance to remove mutated part (but it should not do it) +select sleep(2) format Null; -- increases probability of reproducing the issue +system start fetches rmt2; +system sync replica rmt2; + +-- check that no parts are lost +select 8, arraySort(groupArray(n)) from rmt1; +select 9, arraySort(groupArray(n)) from rmt2; + +-- avoid arbitrary merges after inserting +optimize table rmt2 final; +-- insert parts (all_18_18_0, all_19_19_0) on both replicas (will be deduplicated, but it does not matter) +insert into rmt1 values (100); +insert into rmt2 values (100); +insert into rmt1 values (200); +insert into rmt2 values (200); +detach table rmt1; + +-- create a gap in block numbers buy dropping part +insert into rmt2 values (300); +alter table rmt2 drop part 'all_19_19_0'; +insert into rmt2 values (400); +insert into rmt2 values (500); +insert into rmt2 values (600); +system sync replica rmt2; +-- merge through gap +optimize table rmt2; +select arraySort(groupArrayDistinct(_part)) from rmt2; +-- give it a chance to cleanup log +select sleep(2) format Null; -- increases probability of reproducing the issue + +-- rmt1 will mimic rmt2, but will not be able to fetch parts for a while +system stop replicated sends rmt2; +attach table rmt1; +-- rmt1 should not show the value (100) from dropped part +select 10, arraySort(groupArray(n)) from rmt1; +select 11, arraySort(groupArray(n)) from rmt2; + +system start replicated sends rmt2; +system sync replica rmt1; +select 12, arraySort(groupArray(n)) from rmt1; + +drop table rmt1; +drop table rmt2;