This commit is contained in:
Han Fei 2023-01-17 15:47:52 +01:00
parent 30a798182a
commit 8a74238fe0
6 changed files with 44 additions and 9 deletions

View File

@ -100,6 +100,7 @@
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \ M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(FilesystemCacheSize, "Filesystem cache size in bytes") \ M(FilesystemCacheSize, "Filesystem cache size in bytes") \
M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \ M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(S3Requests, "S3 requests") \ M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \ M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \ M(KeeperOutstandingRequets, "Number of outstanding requests") \

View File

@ -10,6 +10,7 @@
M(InsertQuery, "Same as Query, but only for INSERT queries.") \ M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \ M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.") \
M(FailedQuery, "Number of failed queries.") \ M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \

View File

@ -1,12 +1,32 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h> #include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <chrono> #include <unordered_set>
#include <mutex>
namespace ProfileEvents
{
extern const Event AsyncInsertCacheHits;
}
namespace CurrentMetrics
{
extern const Metric AsyncInsertCacheSize;
}
namespace DB namespace DB
{ {
struct AsyncBlockIDsCache::Cache : public std::unordered_set<String>
{
CurrentMetrics::Increment cache_size_increment;
explicit Cache(std::unordered_set<String> && set_)
: std::unordered_set<String>(std::move(set_))
, cache_size_increment(CurrentMetrics::AsyncInsertCacheSize, size())
{}
};
std::vector<String> AsyncBlockIDsCache::getChildren() std::vector<String> AsyncBlockIDsCache::getChildren()
{ {
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
@ -32,15 +52,14 @@ void AsyncBlockIDsCache::update()
try try
{ {
std::vector<String> paths = getChildren(); std::vector<String> paths = getChildren();
Cache cache; std::unordered_set<String> set;
for (String & p : paths) for (String & p : paths)
{ {
cache.insert(std::move(p)); set.insert(std::move(p));
} }
LOG_TRACE(log, "Updating async block succeed. {} block ids has been updated.", cache.size());
{ {
std::lock_guard lock(mu); std::lock_guard lock(mu);
cache_ptr = std::make_shared<Cache>(std::move(cache)); cache_ptr = std::make_shared<Cache>(std::move(set));
++version; ++version;
} }
cv.notify_all(); cv.notify_all();
@ -95,6 +114,8 @@ Strings AsyncBlockIDsCache::detectConflicts(const Strings & paths, UInt64 & last
} }
} }
ProfileEvents::increment(ProfileEvents::AsyncInsertCacheHits);
return conflicts; return conflicts;
} }

View File

@ -5,7 +5,6 @@
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <chrono> #include <chrono>
#include <unordered_set>
namespace DB namespace DB
{ {
@ -14,7 +13,7 @@ class StorageReplicatedMergeTree;
class AsyncBlockIDsCache class AsyncBlockIDsCache
{ {
using Cache = std::unordered_set<String>; struct Cache;
using CachePtr = std::shared_ptr<Cache>; using CachePtr = std::shared_ptr<Cache>;
std::vector<String> getChildren(); std::vector<String> getChildren();

View File

@ -682,7 +682,10 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
/// prefilter by cache /// prefilter by cache
conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version); conflict_block_ids = storage.async_block_ids_cache.detectConflicts(block_id, cache_version);
if (!conflict_block_ids.empty()) if (!conflict_block_ids.empty())
{
cache_version = 0;
return; return;
}
for (const auto & single_block_id : block_id) for (const auto & single_block_id : block_id)
block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id); block_id_path.push_back(storage.zookeeper_path + "/async_blocks/" + single_block_id);
} }

View File

@ -98,7 +98,7 @@ gen.join()
retry = 0 retry = 0
while (True): while True:
time.sleep(5) time.sleep(5)
result = client.query("select KeyID from t_async_insert_dedup order by KeyID") result = client.query("select KeyID from t_async_insert_dedup order by KeyID")
result = result.split() result = result.split()
@ -124,6 +124,16 @@ while (True):
else: else:
print(len(result), flush=True) print(len(result), flush=True)
break break
result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}")
result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}")
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY") client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
os._exit(os.EX_OK) os._exit(os.EX_OK)