Cosmetics

This commit is contained in:
Robert Schulze 2023-01-03 20:10:07 +00:00
parent e4b2ed6d6f
commit 0dda4921ba
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
2 changed files with 13 additions and 14 deletions

View File

@ -161,7 +161,7 @@ QueryResultCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key
size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_, size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_,
size_t max_cache_entries_, size_t max_cache_entries_,
size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_,
std::chrono::milliseconds min_query_duration_) std::chrono::milliseconds min_query_runtime_)
: mutex(mutex_) : mutex(mutex_)
, cache(cache_) , cache(cache_)
, key(key_) , key(key_)
@ -170,7 +170,7 @@ QueryResultCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key
, max_cache_entries(max_cache_entries_) , max_cache_entries(max_cache_entries_)
, max_entry_size_in_bytes(max_entry_size_in_bytes_) , max_entry_size_in_bytes(max_entry_size_in_bytes_)
, max_entry_size_in_rows(max_entry_size_in_rows_) , max_entry_size_in_rows(max_entry_size_in_rows_)
, min_query_duration(min_query_duration_) , min_query_runtime(min_query_runtime_)
{ {
if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first)) if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first))
skip_insert = true; /// Key already contained in cache and did not expire yet --> don't replace it skip_insert = true; /// Key already contained in cache and did not expire yet --> don't replace it
@ -182,7 +182,7 @@ try
if (skip_insert) if (skip_insert)
return; return;
if (auto query_duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - query_start_time); query_duration < min_query_duration) if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - query_start_time) < min_query_runtime)
return; return;
auto to_single_chunk = [](const Chunks & chunks_) -> Chunk auto to_single_chunk = [](const Chunks & chunks_) -> Chunk
@ -273,11 +273,10 @@ QueryResultCache::Reader::Reader(const Cache & cache_, const Key & key, size_t &
return; return;
} }
if (it->first.expires_at < std::chrono::system_clock::now()) if (is_stale(it->first))
{ {
Cache & cache_rw = const_cast<Cache &>(cache_);
cache_size_in_bytes_ -= it->second.allocatedBytes(); cache_size_in_bytes_ -= it->second.allocatedBytes();
cache_rw.erase(it); const_cast<Cache &>(cache_).erase(it);
LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Stale entry found and removed for query {}", key.queryStringFromAst()); LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Stale entry found and removed for query {}", key.queryStringFromAst());
return; return;
} }
@ -318,10 +317,10 @@ QueryResultCache::Reader QueryResultCache::createReader(const Key & key)
return Reader(cache, key, cache_size_in_bytes); return Reader(cache, key, cache_size_in_bytes);
} }
QueryResultCache::Writer QueryResultCache::createWriter(const Key & key, std::chrono::milliseconds min_query_duration) QueryResultCache::Writer QueryResultCache::createWriter(const Key & key, std::chrono::milliseconds min_query_runtime)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
return Writer(mutex, cache, key, cache_size_in_bytes, max_cache_size_in_bytes, max_cache_entries, max_cache_entry_size_in_bytes, max_cache_entry_size_in_rows, min_query_duration); return Writer(mutex, cache, key, cache_size_in_bytes, max_cache_size_in_bytes, max_cache_entries, max_cache_entry_size_in_bytes, max_cache_entry_size_in_rows, min_query_runtime);
} }
void QueryResultCache::reset() void QueryResultCache::reset()

View File

@ -70,7 +70,7 @@ private:
using Cache = std::unordered_map<Key, Chunk, KeyHasher>; using Cache = std::unordered_map<Key, Chunk, KeyHasher>;
/// query --> query execution count /// query --> query execution count
using TimesExecutedMap = std::unordered_map<Key, size_t, KeyHasher>; using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
public: public:
/// Buffers multiple result chunks and stores them during destruction as a cache entry. /// Buffers multiple result chunks and stores them during destruction as a cache entry.
@ -91,7 +91,7 @@ public:
size_t new_entry_size_in_rows = 0; size_t new_entry_size_in_rows = 0;
const size_t max_entry_size_in_rows; const size_t max_entry_size_in_rows;
const std::chrono::time_point<std::chrono::system_clock> query_start_time = std::chrono::system_clock::now(); /// Writer construction/destruction coincides with query start/end const std::chrono::time_point<std::chrono::system_clock> query_start_time = std::chrono::system_clock::now(); /// Writer construction/destruction coincides with query start/end
const std::chrono::milliseconds min_query_duration; const std::chrono::milliseconds min_query_runtime;
Chunks partial_query_results; Chunks partial_query_results;
std::atomic<bool> skip_insert = false; std::atomic<bool> skip_insert = false;
@ -99,7 +99,7 @@ public:
size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_, size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_,
size_t max_cache_entries_, size_t max_cache_entries_,
size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_,
std::chrono::milliseconds min_query_duration_); std::chrono::milliseconds min_query_runtime_);
friend class QueryResultCache; /// for createWriter() friend class QueryResultCache; /// for createWriter()
}; };
@ -109,7 +109,7 @@ public:
{ {
public: public:
bool hasCacheEntryForKey() const; bool hasCacheEntryForKey() const;
Pipe && getPipe(); Pipe && getPipe(); /// must be called only if hasCacheEntryForKey() returns true
private: private:
Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_); Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_);
Pipe pipe; Pipe pipe;
@ -119,7 +119,7 @@ public:
QueryResultCache(size_t max_cache_size_in_bytes_, size_t max_cache_entries_, size_t max_cache_entry_size_in_bytes_, size_t max_cache_entry_size_in_rows_); QueryResultCache(size_t max_cache_size_in_bytes_, size_t max_cache_entries_, size_t max_cache_entry_size_in_bytes_, size_t max_cache_entry_size_in_rows_);
Reader createReader(const Key & key); Reader createReader(const Key & key);
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_duration); Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime);
void reset(); void reset();
@ -136,7 +136,7 @@ private:
/// binary search on the sorted container and erase all left of the found key. /// binary search on the sorted container and erase all left of the found key.
mutable std::mutex mutex; mutable std::mutex mutex;
Cache cache TSA_GUARDED_BY(mutex); Cache cache TSA_GUARDED_BY(mutex);
TimesExecutedMap times_executed TSA_GUARDED_BY(mutex); TimesExecuted times_executed TSA_GUARDED_BY(mutex);
size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// updated in each cache insert/delete size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// updated in each cache insert/delete
const size_t max_cache_size_in_bytes; const size_t max_cache_size_in_bytes;