diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 315f471fd5c..1c667b1c867 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -75,8 +75,8 @@ void ReplicatedMergeTreeCleanupThread::iterate() { clearOldLogs(); auto storage_settings = storage.getSettings(); - clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window); - clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts); + clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts); + clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts, cached_block_stats_for_async_inserts); clearOldMutations(); storage.clearEmptyParts(); } @@ -323,12 +323,12 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat } }; -void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size) +void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats) { auto zookeeper = storage.getZooKeeper(); std::vector timed_blocks; - getBlocksSortedByTime(*zookeeper, timed_blocks); + getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats); if (timed_blocks.empty()) return; @@ -391,14 +391,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_ } -void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks) +void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats) { timed_blocks.clear(); Strings blocks; Coordination::Stat stat; - if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat)) - throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); + if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/" + blocks_dir_name, blocks, &stat)) + throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", storage.zookeeper_path, blocks_dir_name); /// Seems like this code is obsolete, because we delete blocks from cache /// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code @@ -417,7 +417,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & auto not_cached_blocks = stat.numChildren - cached_block_stats.size(); if (not_cached_blocks) { - LOG_TRACE(log, "Checking {} blocks ({} are not cached){}", stat.numChildren, not_cached_blocks, " to clear old ones from ZooKeeper."); + LOG_TRACE(log, "Checking {} {} ({} are not cached){}, path is {}", stat.numChildren, blocks_dir_name, not_cached_blocks, " to clear old ones from ZooKeeper.", storage.zookeeper_path + "/" + blocks_dir_name); } std::vector exists_paths; @@ -427,7 +427,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & if (it == cached_block_stats.end()) { /// New block. Fetch its stat asynchronously. - exists_paths.emplace_back(storage.zookeeper_path + "/blocks/" + block); + exists_paths.emplace_back(storage.zookeeper_path + "/" + blocks_dir_name + "/" + block); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index f8731ca0f43..35838625bbe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -52,18 +52,19 @@ private: const std::unordered_map & log_pointers_candidate_lost_replicas, size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper); + using NodeCTimeAndVersionCache = std::map>; /// Remove old block hashes from ZooKeeper. This is done by the leader replica. - void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size); + void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats); /// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. void clearOldMutations(); - using NodeCTimeAndVersionCache = std::map>; - NodeCTimeAndVersionCache cached_block_stats; + NodeCTimeAndVersionCache cached_block_stats_for_sync_inserts; + NodeCTimeAndVersionCache cached_block_stats_for_async_inserts; struct NodeWithStat; /// Returns list of blocks (with their stat) sorted by ctime in descending order. - void getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks); + void getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats); /// TODO Removing old quorum/failed_parts }; diff --git a/tests/queries/0_stateless/02515_cleanup_async_insert_block_ids.reference b/tests/queries/0_stateless/02515_cleanup_async_insert_block_ids.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02515_cleanup_async_insert_block_ids.sh b/tests/queries/0_stateless/02515_cleanup_async_insert_block_ids.sh new file mode 100755 index 00000000000..9e22089d5e1 --- /dev/null +++ b/tests/queries/0_stateless/02515_cleanup_async_insert_block_ids.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Tags: zookeeper, no-parallel, long, no-fasttest, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# Check that if the background cleanup thread works correctly. +CLICKHOUSE_TEST_ZOOKEEPER_PREFIX="${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}" + +$CLICKHOUSE_CLIENT -n --query " + DROP TABLE IF EXISTS t_async_insert_cleanup NO DELAY; + CREATE TABLE t_async_insert_cleanup ( + KeyID UInt32 + ) Engine = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup', '{replica}') + ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, replicated_deduplication_window_for_async_inserts=10 +" + +for i in {1..100}; do + $CLICKHOUSE_CLIENT --async_insert 1 --async_insert_deduplicate 1 --wait_for_async_insert 0 --query "insert into t_async_insert_cleanup values ($i), ($((i + 1))), ($((i + 2)))" +done + +sleep 1 + +old_answer=$($CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'") + +for i in {1..300}; do + answer=$($CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'") + if [ $answer == '10' ]; then + $CLICKHOUSE_CLIENT -n --query "DROP TABLE t_async_insert_cleanup NO DELAY;" + exit 0 + fi + sleep 1 +done + +$CLICKHOUSE_CLIENT --query "SELECT count(*) FROM t_async_insert_cleanup" +echo $old_answer +$CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'" +$CLICKHOUSE_CLIENT -n --query "DROP TABLE t_async_insert_cleanup NO DELAY;"