Merge pull request #57743 from ClickHouse/other_timeout_for_wait

Increase async block cache deduplication timeout
This commit is contained in:
Alexander Gololobov 2023-12-12 15:40:04 +01:00 committed by GitHub
commit 09d6e6b945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 52 deletions

View File

@ -18,6 +18,7 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
static constexpr int FAILURE_RETRY_MS = 3000;
template <typename TStorage> template <typename TStorage>
struct AsyncBlockIDsCache<TStorage>::Cache : public std::unordered_set<String> struct AsyncBlockIDsCache<TStorage>::Cache : public std::unordered_set<String>
@ -29,35 +30,12 @@ struct AsyncBlockIDsCache<TStorage>::Cache : public std::unordered_set<String>
{} {}
}; };
template <typename TStorage>
std::vector<String> AsyncBlockIDsCache<TStorage>::getChildren()
{
auto zookeeper = storage.getZooKeeper();
auto watch_callback = [last_time = this->last_updatetime.load()
, my_update_min_interval = this->update_min_interval
, my_task = task->shared_from_this()](const Coordination::WatchResponse &)
{
auto now = std::chrono::steady_clock::now();
if (now - last_time < my_update_min_interval)
{
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(my_update_min_interval - (now - last_time));
my_task->scheduleAfter(sleep_time.count());
}
else
my_task->schedule();
};
std::vector<String> children;
Coordination::Stat stat;
zookeeper->tryGetChildrenWatch(path, children, &stat, watch_callback);
return children;
}
template <typename TStorage> template <typename TStorage>
void AsyncBlockIDsCache<TStorage>::update() void AsyncBlockIDsCache<TStorage>::update()
try try
{ {
std::vector<String> paths = getChildren(); auto zookeeper = storage.getZooKeeper();
std::vector<String> paths = zookeeper->getChildren(path);
std::unordered_set<String> set; std::unordered_set<String> set;
for (String & p : paths) for (String & p : paths)
{ {
@ -69,21 +47,20 @@ try
++version; ++version;
} }
cv.notify_all(); cv.notify_all();
last_updatetime = std::chrono::steady_clock::now();
} }
catch (...) catch (...)
{ {
LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false)); LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false));
task->scheduleAfter(update_min_interval.count()); task->scheduleAfter(FAILURE_RETRY_MS);
} }
template <typename TStorage> template <typename TStorage>
AsyncBlockIDsCache<TStorage>::AsyncBlockIDsCache(TStorage & storage_) AsyncBlockIDsCache<TStorage>::AsyncBlockIDsCache(TStorage & storage_)
: storage(storage_), : storage(storage_)
update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms), , update_wait(storage.getSettings()->async_block_ids_cache_update_wait_ms)
path(storage.getZooKeeperPath() + "/async_blocks"), , path(storage.getZooKeeperPath() + "/async_blocks")
log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"), , log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)")
log(&Poco::Logger::get(log_name)) , log(&Poco::Logger::get(log_name))
{ {
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); }); task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); });
} }
@ -95,6 +72,16 @@ void AsyncBlockIDsCache<TStorage>::start()
task->activateAndSchedule(); task->activateAndSchedule();
} }
template <typename TStorage>
void AsyncBlockIDsCache<TStorage>::triggerCacheUpdate()
{
/// Trigger task update. Watch-based updates may produce a lot of
/// redundant work in case of multiple replicas, so we use manually controlled updates
/// in case of duplicates
if (!task->schedule())
LOG_TRACE(log, "Task is already scheduled, will wait for update for {}ms", update_wait.count());
}
/// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version. /// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version.
template <typename TStorage> template <typename TStorage>
Strings AsyncBlockIDsCache<TStorage>::detectConflicts(const Strings & paths, UInt64 & last_version) Strings AsyncBlockIDsCache<TStorage>::detectConflicts(const Strings & paths, UInt64 & last_version)
@ -102,21 +89,20 @@ Strings AsyncBlockIDsCache<TStorage>::detectConflicts(const Strings & paths, UIn
if (!storage.getSettings()->use_async_block_ids_cache) if (!storage.getSettings()->use_async_block_ids_cache)
return {}; return {};
std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
/// The timeout here is to prevent deadlock, just in case.
cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;});
if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);
CachePtr cur_cache; CachePtr cur_cache;
cur_cache = cache_ptr; {
last_version = version; std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
cv.wait_for(lk, update_wait, [&]{return version != last_version;});
lk.unlock(); if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);
cur_cache = cache_ptr;
last_version = version;
}
if (cur_cache == nullptr) if (cur_cache == nullptr)
return {}; return {};

View File

@ -14,8 +14,6 @@ class AsyncBlockIDsCache
struct Cache; struct Cache;
using CachePtr = std::shared_ptr<Cache>; using CachePtr = std::shared_ptr<Cache>;
std::vector<String> getChildren();
void update(); void update();
public: public:
@ -27,12 +25,13 @@ public:
Strings detectConflicts(const Strings & paths, UInt64 & last_version); Strings detectConflicts(const Strings & paths, UInt64 & last_version);
void triggerCacheUpdate();
private: private:
TStorage & storage; TStorage & storage;
std::atomic<std::chrono::steady_clock::time_point> last_updatetime; const std::chrono::milliseconds update_wait;
const std::chrono::milliseconds update_min_interval;
std::mutex mu; std::mutex mu;
CachePtr cache_ptr; CachePtr cache_ptr;

View File

@ -95,7 +95,7 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \ M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \ M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "Minimum interval between updates of async_block_ids_cache", 0) \ M(Milliseconds, async_block_ids_cache_update_wait_ms, 100, "How long each insert iteration will wait for async_block_ids_cache update", 0) \
M(Bool, use_async_block_ids_cache, true, "Use in-memory cache to filter duplicated async inserts based on block ids", 0) \ M(Bool, use_async_block_ids_cache, true, "Use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \ M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \ M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
@ -214,6 +214,7 @@ struct Settings;
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Bool, use_metadata_cache, false) \ MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Bool, use_metadata_cache, false) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_enable_clear_old_broken_detached, 0) \ MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_enable_clear_old_broken_detached, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30) \ MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, async_block_ids_cache_min_update_interval_ms, 1000) \
/// Settings that should not change after the creation of a table. /// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE /// NOLINTNEXTLINE

View File

@ -307,7 +307,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters); auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters);
/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets /// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block /// when preprocessing or filtering data for async inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block; std::optional<BlockWithPartition> unmerged_block;
if constexpr (async_insert) if constexpr (async_insert)
@ -456,7 +456,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
if (!delayed_chunk) if (!delayed_chunk)
return; return;
for (auto & partition: delayed_chunk->partitions) for (auto & partition : delayed_chunk->partitions)
{ {
int retry_times = 0; int retry_times = 0;
/// users may have lots of same inserts. It will be helpful to deduplicate in advance. /// users may have lots of same inserts. It will be helpful to deduplicate in advance.
@ -469,6 +469,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
} }
/// reset the cache version to zero for every partition write. /// reset the cache version to zero for every partition write.
/// Version zero allows to avoid wait on first iteration
cache_version = 0; cache_version = 0;
while (true) while (true)
{ {
@ -476,6 +477,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false).first; auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false).first;
if (conflict_block_ids.empty()) if (conflict_block_ids.empty())
break; break;
storage.async_block_ids_cache.triggerCacheUpdate();
++retry_times; ++retry_times;
LOG_DEBUG(log, "Found duplicate block IDs: {}, retry times {}", toString(conflict_block_ids), retry_times); LOG_DEBUG(log, "Found duplicate block IDs: {}, retry times {}", toString(conflict_block_ids), retry_times);
/// partition clean conflict /// partition clean conflict