improve replica recovery

This commit is contained in:
Alexander Tokmakov 2022-10-06 21:09:12 +02:00
parent 766107df0a
commit bb78bf1c70
3 changed files with 190 additions and 12 deletions

View File

@ -2407,6 +2407,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
std::vector<QueueEntryInfo> source_queue;
ActiveDataPartSet get_part_set{format_version};
ActiveDataPartSet drop_range_set{format_version};
std::unordered_set<String> exact_part_names;
{
std::vector<zkutil::ZooKeeper::FutureGet> queue_get_futures;
@ -2444,14 +2445,22 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.parsed_entry->znode_name = source_queue_names[i];
if (info.parsed_entry->type == LogEntry::DROP_RANGE)
{
drop_range_set.add(info.parsed_entry->new_part_name);
if (info.parsed_entry->type == LogEntry::GET_PART)
}
else if (info.parsed_entry->type == LogEntry::GET_PART)
{
String maybe_covering_drop_range = drop_range_set.getContainingPart(info.parsed_entry->new_part_name);
if (maybe_covering_drop_range.empty())
get_part_set.add(info.parsed_entry->new_part_name);
}
else
{
/// We should keep local parts if they present in the queue of source replica.
/// There's a chance that we are the only replica that has these parts.
Strings entry_virtual_parts = info.parsed_entry->getVirtualPartNames(format_version);
std::move(entry_virtual_parts.begin(), entry_virtual_parts.end(), std::inserter(exact_part_names, exact_part_names.end()));
}
}
}
@ -2471,11 +2480,17 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : local_parts_in_zk)
{
if (get_part_set.getContainingPart(part).empty())
{
parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
}
/// We look for exact match (and not for any covering part)
/// because our part might be dropped and covering part might be merged though gap.
/// (avoid resurrection of data that was removed a long time ago)
if (get_part_set.getContainingPart(part) == part)
continue;
if (exact_part_names.contains(part))
continue;
parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part);
}
{
@ -2497,11 +2512,14 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : local_active_parts)
{
if (get_part_set.getContainingPart(part->name).empty())
{
parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
}
if (get_part_set.getContainingPart(part->name) == part->name)
continue;
if (exact_part_names.contains(part->name))
continue;
parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
}
if (getSettings()->detach_old_local_parts_when_cloning_replica)

View File

@ -0,0 +1,13 @@
1 [2,3,4,5]
2 [1,2,3,4,5]
3 [1,2,3,4,5]
4 [3,4,5]
5 [1,2,3,4,5]
6 [1,2,3,4,5]
7 [1,2,3,4,5,20,30,40,50]
8 [1,2,3,4,5,10,20,30,40,50]
9 [1,2,3,4,5,10,20,30,40,50]
['all_18_23_1','all_7_17_2_13']
10 [1,2,3,4,5,10,20,30,40,50]
11 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600]
12 [1,2,3,4,5,10,20,30,40,50,100,300,400,500,600]

View File

@ -0,0 +1,147 @@
drop table if exists rmt1;
drop table if exists rmt2;
create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=5;
create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '2') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1, old_parts_lifetime=0, max_parts_to_merge_at_once=5;
-- insert part only on one replica
system stop replicated sends rmt1;
insert into rmt1 values (1);
detach table rmt1; -- make replica inactive
system start replicated sends rmt1;
-- trigger log rotation, rmt1 will be lost
insert into rmt2 values (2);
insert into rmt2 values (3);
insert into rmt2 values (4);
insert into rmt2 values (5);
-- check that entry was not removed from the queue (part is not lost)
set receive_timeout=5;
system sync replica rmt2; -- {serverError TIMEOUT_EXCEEDED}
set receive_timeout=300;
select 1, arraySort(groupArray(n)) from rmt2;
-- rmt1 will mimic rmt2
attach table rmt1;
system sync replica rmt1;
system sync replica rmt2;
-- check that no parts are lost
select 2, arraySort(groupArray(n)) from rmt1;
select 3, arraySort(groupArray(n)) from rmt2;
truncate table rmt1;
truncate table rmt2;
-- insert parts only on one replica and merge them
system stop replicated sends rmt2;
insert into rmt2 values (1);
insert into rmt2 values (2);
system sync replica rmt2;
optimize table rmt2 final;
system sync replica rmt2;
-- give it a chance to remove source parts
select sleep(2) format Null; -- increases probability of reproducing the issue
detach table rmt2;
system start replicated sends rmt2;
-- trigger log rotation, rmt2 will be lost
insert into rmt1 values (3);
insert into rmt1 values (4);
insert into rmt1 values (5);
set receive_timeout=5;
-- check that entry was not removed from the queue (part is not lost)
system sync replica rmt1; -- {serverError TIMEOUT_EXCEEDED}
set receive_timeout=300;
select 4, arraySort(groupArray(n)) from rmt1;
-- rmt1 will mimic rmt2
system stop fetches rmt1;
attach table rmt2;
system sync replica rmt2;
-- give rmt2 a chance to remove merged part (but it should not do it)
select sleep(2) format Null; -- increases probability of reproducing the issue
system start fetches rmt1;
system sync replica rmt1;
-- check that no parts are lost
select 5, arraySort(groupArray(n)) from rmt1;
select 6, arraySort(groupArray(n)) from rmt2;
-- insert part only on one replica
system stop replicated sends rmt1;
insert into rmt1 values (123);
alter table rmt1 update n=10 where n=123 settings mutations_sync=1;
-- give it a chance to remove source part
select sleep(2) format Null; -- increases probability of reproducing the issue
detach table rmt1; -- make replica inactive
system start replicated sends rmt1;
-- trigger log rotation, rmt1 will be lost
insert into rmt2 values (20);
insert into rmt2 values (30);
insert into rmt2 values (40);
insert into rmt2 values (50);
-- check that entry was not removed from the queue (part is not lost)
set receive_timeout=5;
system sync replica rmt2; -- {serverError TIMEOUT_EXCEEDED}
set receive_timeout=300;
select 7, arraySort(groupArray(n)) from rmt2;
-- rmt1 will mimic rmt2
system stop fetches rmt2;
attach table rmt1;
system sync replica rmt1;
-- give rmt1 a chance to remove mutated part (but it should not do it)
select sleep(2) format Null; -- increases probability of reproducing the issue
system start fetches rmt2;
system sync replica rmt2;
-- check that no parts are lost
select 8, arraySort(groupArray(n)) from rmt1;
select 9, arraySort(groupArray(n)) from rmt2;
-- avoid arbitrary merges after inserting
optimize table rmt2 final;
-- insert parts (all_18_18_0, all_19_19_0) on both replicas (will be deduplicated, but it does not matter)
insert into rmt1 values (100);
insert into rmt2 values (100);
insert into rmt1 values (200);
insert into rmt2 values (200);
detach table rmt1;
-- create a gap in block numbers buy dropping part
insert into rmt2 values (300);
alter table rmt2 drop part 'all_19_19_0';
insert into rmt2 values (400);
insert into rmt2 values (500);
insert into rmt2 values (600);
system sync replica rmt2;
-- merge through gap
optimize table rmt2;
select arraySort(groupArrayDistinct(_part)) from rmt2;
-- give it a chance to cleanup log
select sleep(2) format Null; -- increases probability of reproducing the issue
-- rmt1 will mimic rmt2, but will not be able to fetch parts for a while
system stop replicated sends rmt2;
attach table rmt1;
-- rmt1 should not show the value (100) from dropped part
select 10, arraySort(groupArray(n)) from rmt1;
select 11, arraySort(groupArray(n)) from rmt2;
system start replicated sends rmt2;
system sync replica rmt1;
select 12, arraySort(groupArray(n)) from rmt1;
drop table rmt1;
drop table rmt2;