diff --git a/src/Interpreters/Cache/QueryResultCache.cpp b/src/Interpreters/Cache/QueryResultCache.cpp index 6c9ae832321..42a42918d83 100644 --- a/src/Interpreters/Cache/QueryResultCache.cpp +++ b/src/Interpreters/Cache/QueryResultCache.cpp @@ -177,71 +177,74 @@ QueryResultCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key } QueryResultCache::Writer::~Writer() -try { - if (skip_insert) - return; - - if (std::chrono::duration_cast(std::chrono::system_clock::now() - query_start_time) < min_query_runtime) - return; - - auto to_single_chunk = [](const Chunks & chunks_) -> Chunk + try { - if (chunks_.empty()) - return {}; + if (skip_insert) + return; - Chunk res = chunks_[0].clone(); - for (size_t i = 1; i != chunks_.size(); ++i) - res.append(chunks_[i]); - return res; - }; + if (std::chrono::duration_cast(std::chrono::system_clock::now() - query_start_time) < min_query_runtime) + return; - auto query_result = to_single_chunk(partial_query_results); - new_entry_size_in_bytes = query_result.allocatedBytes(); // updated because compression potentially affects the size of the single chunk vs the aggregate size of individual chunks + auto to_single_chunk = [](const Chunks & chunks_) -> Chunk + { + if (chunks_.empty()) + return {}; - std::lock_guard lock(mutex); + Chunk res = chunks_[0].clone(); + for (size_t i = 1; i != chunks_.size(); ++i) + res.append(chunks_[i]); + return res; + }; - if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first)) - return; /// same check as in ctor because a parallel Writer could have inserted the current key in the meantime + auto query_result = to_single_chunk(partial_query_results); + new_entry_size_in_bytes = query_result.allocatedBytes(); // updated because compression potentially affects the size of the single chunk vs the aggregate size of individual chunks - auto sufficient_space_in_cache = [this]() TSA_REQUIRES(mutex) - { - return (cache_size_in_bytes + new_entry_size_in_bytes <= max_cache_size_in_bytes) && (cache.size() + 1 <= max_cache_entries); - }; + std::lock_guard lock(mutex); - if (!sufficient_space_in_cache()) - { - size_t removed_items = 0; - /// Remove stale entries - for (auto it = cache.begin(); it != cache.end();) - if (is_stale(it->first)) - { - cache_size_in_bytes -= it->second.allocatedBytes(); - it = cache.erase(it); - ++removed_items; - } - else - ++it; - LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Removed {} stale entries", removed_items); + if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first)) + return; /// same check as in ctor because a parallel Writer could have inserted the current key in the meantime + + auto sufficient_space_in_cache = [this]() TSA_REQUIRES(mutex) + { + return (cache_size_in_bytes + new_entry_size_in_bytes <= max_cache_size_in_bytes) && (cache.size() + 1 <= max_cache_entries); + }; + + if (!sufficient_space_in_cache()) + { + size_t removed_items = 0; + /// Remove stale entries + for (auto it = cache.begin(); it != cache.end();) + if (is_stale(it->first)) + { + cache_size_in_bytes -= it->second.allocatedBytes(); + it = cache.erase(it); + ++removed_items; + } + else + ++it; + LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Removed {} stale entries", removed_items); + } + + /// Insert or replace if enough space + if (sufficient_space_in_cache()) + { + cache_size_in_bytes += query_result.allocatedBytes(); + if (auto it = cache.find(key); it != cache.end()) + cache_size_in_bytes -= it->second.allocatedBytes(); /// key replacement + + /// cache[key] = query_result; /// does no replacement for unclear reasons + cache.erase(key); + cache[key] = std::move(query_result); + + LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Stored result of query {}", key.queryStringFromAst()); + } } - - /// Insert or replace if enough space - if (sufficient_space_in_cache()) + catch (...) { - cache_size_in_bytes += query_result.allocatedBytes(); - if (auto it = cache.find(key); it != cache.end()) - cache_size_in_bytes -= it->second.allocatedBytes(); /// key replacement - - /// cache[key] = query_result; /// does no replacement for unclear reasons - cache.erase(key); - cache[key] = std::move(query_result); - - LOG_DEBUG(&Poco::Logger::get("QueryResultCache"), "Stored result of query {}", key.queryStringFromAst()); + tryLogCurrentException(&Poco::Logger::get("QueryResultCache"), __PRETTY_FUNCTION__); } } -catch (const std::exception &) -{ -} void QueryResultCache::Writer::buffer(Chunk && partial_query_result) {