This commit is contained in:
Alexander Tokmakov 2023-06-07 16:55:14 +02:00
parent d2aa1779ed
commit 32372967e9
4 changed files with 11 additions and 16 deletions

View File

@ -6743,14 +6743,12 @@ size_t StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
if (parts.empty())
return total_parts_to_remove;
size_t res = 0;
NOEXCEPT_SCOPE({ res = clearOldPartsAndRemoveFromZKImpl(zookeeper, std::move(parts)); });
return res;
NOEXCEPT_SCOPE({ clearOldPartsAndRemoveFromZKImpl(zookeeper, std::move(parts)); });
return total_parts_to_remove;
}
size_t StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts)
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts)
{
DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
DataPartsVector parts_to_delete_completely; // All parts except duplicates
DataPartsVector parts_to_retry_deletion; // Parts that should be retried due to network problems
@ -6861,8 +6859,6 @@ size_t StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooK
/// Otherwise nobody will try to remove them again (see grabOldParts).
delete_parts_from_fs_and_rollback_in_case_of_error(parts_to_remove_from_filesystem, "old");
}
return total_parts_to_remove;
}

View File

@ -344,7 +344,7 @@ private:
/// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
size_t clearOldPartsAndRemoveFromZK();
size_t clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);
void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);
template<bool async_insert>
friend class ReplicatedMergeTreeSinkImpl;

View File

@ -38,11 +38,7 @@ select count(), sum(n), sum(m) from rmt;
-- New table can assign merges/mutations and can remove old parts
create table rmt2 (n int, m int, k String) engine=ReplicatedMergeTree('/test/02432/{database}', '2') order by tuple()
settings storage_policy = 's3_cache', allow_remote_fs_zero_copy_replication=1,
<<<<<<< HEAD
max_part_removal_threads=10, concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
=======
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1,
>>>>>>> master
concurrent_part_removal_threshold=1, cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0,
min_bytes_for_wide_part=0, min_rows_for_wide_part=0, max_replicated_merges_in_queue=1,
old_parts_lifetime=0;

View File

@ -7,11 +7,11 @@ drop table if exists rmt2;
create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '1') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4,
merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=500;
merge_selecting_sleep_ms=1000, max_merge_selecting_sleep_ms=2000;
create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02448/{database}/rmt', '2') order by tuple()
settings min_replicated_logs_to_keep=1, max_replicated_logs_to_keep=2, cleanup_delay_period=0, cleanup_delay_period_random_add=1,
cleanup_thread_preferred_points_per_iteration=0, old_parts_lifetime=0, max_parts_to_merge_at_once=4,
merge_selecting_sleep_ms=100, max_merge_selecting_sleep_ms=500;
merge_selecting_sleep_ms=1000, max_merge_selecting_sleep_ms=2000;
-- insert part only on one replica
system stop replicated sends rmt1;
@ -141,7 +141,10 @@ system sync replica rmt2;
-- merge through gap
optimize table rmt2;
-- give it a chance to cleanup log
select sleep(2) format Null; -- increases probability of reproducing the issue
select sleepEachRow(2) from url('http://localhost:8123/?param_tries={1..10}&query=' || encodeURLComponent(
'select value from system.zookeeper where path=''//test/02448/' || currentDatabase() || '/rmt/replicas/1/is_lost'' and value=''1'''
), 'LineAsString', 's String') settings max_threads=1 format Null;
-- rmt1 will mimic rmt2, but will not be able to fetch parts for a while
system stop replicated sends rmt2;