Merge pull request #44651 from hanfei1991/hanfei/fix-async-insert-cleanup

fix bug that async blocks cleanup not work
This commit is contained in:
Han Fei 2023-01-02 21:58:27 +01:00 committed by GitHub
commit 7d7de3833b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 13 deletions

View File

@ -75,8 +75,8 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
clearOldLogs();
auto storage_settings = storage.getSettings();
clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window);
clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts);
clearOldBlocks("blocks", storage_settings->replicated_deduplication_window_seconds, storage_settings->replicated_deduplication_window, cached_block_stats_for_sync_inserts);
clearOldBlocks("async_blocks", storage_settings->replicated_deduplication_window_seconds_for_async_inserts, storage_settings->replicated_deduplication_window_for_async_inserts, cached_block_stats_for_async_inserts);
clearOldMutations();
storage.clearEmptyParts();
}
@ -323,12 +323,12 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
}
};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size)
void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats)
{
auto zookeeper = storage.getZooKeeper();
std::vector<NodeWithStat> timed_blocks;
getBlocksSortedByTime(*zookeeper, timed_blocks);
getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats);
if (timed_blocks.empty())
return;
@ -391,14 +391,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_
}
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks)
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats)
{
timed_blocks.clear();
Strings blocks;
Coordination::Stat stat;
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/" + blocks_dir_name, blocks, &stat))
throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", storage.zookeeper_path, blocks_dir_name);
/// Seems like this code is obsolete, because we delete blocks from cache
/// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code
@ -417,7 +417,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
auto not_cached_blocks = stat.numChildren - cached_block_stats.size();
if (not_cached_blocks)
{
LOG_TRACE(log, "Checking {} blocks ({} are not cached){}", stat.numChildren, not_cached_blocks, " to clear old ones from ZooKeeper.");
LOG_TRACE(log, "Checking {} {} ({} are not cached){}, path is {}", stat.numChildren, blocks_dir_name, not_cached_blocks, " to clear old ones from ZooKeeper.", storage.zookeeper_path + "/" + blocks_dir_name);
}
std::vector<std::string> exists_paths;
@ -427,7 +427,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
if (it == cached_block_stats.end())
{
/// New block. Fetch its stat asynchronously.
exists_paths.emplace_back(storage.zookeeper_path + "/blocks/" + block);
exists_paths.emplace_back(storage.zookeeper_path + "/" + blocks_dir_name + "/" + block);
}
else
{

View File

@ -52,18 +52,19 @@ private:
const std::unordered_map<String, String> & log_pointers_candidate_lost_replicas,
size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper);
using NodeCTimeAndVersionCache = std::map<String, std::pair<Int64, Int32>>;
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size);
void clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats);
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
void clearOldMutations();
using NodeCTimeAndVersionCache = std::map<String, std::pair<Int64, Int32>>;
NodeCTimeAndVersionCache cached_block_stats;
NodeCTimeAndVersionCache cached_block_stats_for_sync_inserts;
NodeCTimeAndVersionCache cached_block_stats_for_async_inserts;
struct NodeWithStat;
/// Returns list of blocks (with their stat) sorted by ctime in descending order.
void getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks);
void getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats);
/// TODO Removing old quorum/failed_parts
};

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-parallel, long, no-fasttest, no-replicated-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Check that if the background cleanup thread works correctly.
CLICKHOUSE_TEST_ZOOKEEPER_PREFIX="${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}"
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS t_async_insert_cleanup NO DELAY;
CREATE TABLE t_async_insert_cleanup (
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup', '{replica}')
ORDER BY (KeyID) SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 1, replicated_deduplication_window_for_async_inserts=10
"
for i in {1..100}; do
$CLICKHOUSE_CLIENT --async_insert 1 --async_insert_deduplicate 1 --wait_for_async_insert 0 --query "insert into t_async_insert_cleanup values ($i), ($((i + 1))), ($((i + 2)))"
done
sleep 1
old_answer=$($CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'")
for i in {1..300}; do
answer=$($CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'")
if [ $answer == '10' ]; then
$CLICKHOUSE_CLIENT -n --query "DROP TABLE t_async_insert_cleanup NO DELAY;"
exit 0
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query "SELECT count(*) FROM t_async_insert_cleanup"
echo $old_answer
$CLICKHOUSE_CLIENT --query "SELECT count(*) FROM system.zookeeper WHERE path like '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_async_insert_cleanup/async_blocks%' settings allow_unrestricted_reads_from_keeper = 'true'"
$CLICKHOUSE_CLIENT -n --query "DROP TABLE t_async_insert_cleanup NO DELAY;"