Added removing query cache by tag.

This commit is contained in:
Michal Tabaszewski 2024-08-16 22:54:09 +02:00 committed by Robert Schulze
parent d80b37001c
commit 4f799467ec
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
13 changed files with 98 additions and 2 deletions

View File

@ -197,6 +197,12 @@ public:
cache_policy->remove(key);
}
void removeWithPredicate(std::function<bool(const Key&, const MappedPtr &)> predicate)
{
std::lock_guard lock(mutex);
cache_policy->removeWithPredicate(predicate);
}
size_t sizeInBytes() const
{
std::lock_guard lock(mutex);

View File

@ -55,6 +55,7 @@ public:
virtual void set(const Key & key, const MappedPtr & mapped) = 0;
virtual void remove(const Key & key) = 0;
virtual void removeWithPredicate(std::function<bool(const Key&, const MappedPtr &)> predicate) = 0;
virtual void clear() = 0;
virtual std::vector<KeyMapped> dump() const = 0;

View File

@ -68,6 +68,23 @@ public:
current_size_in_bytes = 0;
}
void removeWithPredicate(std::function<bool(const Key&, const MappedPtr&)> predicate) override
{
for(auto it = cells.begin(); it != cells.end();)
{
if(predicate(it->first, it->second.value))
{
auto & cell = it->second;
current_size_in_bytes -= cell.size;
queue.erase(cell.queue_iterator);
cells.erase(it);
it = cells.erase(it);
}
else
++it;
}
}
void remove(const Key & key) override
{
auto it = cells.find(key);

View File

@ -95,6 +95,27 @@ public:
cells.erase(it);
}
void removeWithPredicate(std::function<bool(const Key&, const MappedPtr&)> predicate) override
{
for(auto it = cells.begin(); it != cells.end();)
{
if(predicate(it->first, it->second.value))
{
auto & cell = it->second;
current_size_in_bytes -= cell.size;
if (cell.is_protected)
current_protected_size -= cell.size;
auto & queue = cell.is_protected ? protected_queue : probationary_queue;
queue.erase(cell.queue_iterator);
it = cells.erase(it);
}
else
++it;
}
}
MappedPtr get(const Key & key) override
{
auto it = cells.find(key);

View File

@ -133,6 +133,23 @@ public:
Base::user_quotas->clear();
}
void removeWithPredicate(std::function<bool(const Key&, const MappedPtr&)> predicate) override
{
for(auto it = cache.begin(); it != cache.end();)
{
if(predicate(it->first, it->second))
{
size_t sz = weight_function(*it->second);
if (it->first.user_id.has_value())
Base::user_quotas->decreaseActual(*it->first.user_id, sz);
it = cache.erase(it);
size_in_bytes -= sz;
}
else
++it;
}
}
void remove(const Key & key) override
{
auto it = cache.find(key);

View File

@ -619,6 +619,15 @@ QueryCache::Writer QueryCache::createWriter(const Key & key, std::chrono::millis
return Writer(cache, key, max_entry_size_in_bytes, max_entry_size_in_rows, min_query_runtime, squash_partial_results, max_block_size);
}
void QueryCache::clearWithTag(const String & tag)
{
auto removeWithTag = [tag](const Key & k, const Cache::MappedPtr & _){
return k.tag == tag;
};
cache.removeWithPredicate(removeWithTag);
std::lock_guard lock(mutex);
}
void QueryCache::clear()
{
cache.clear();

View File

@ -212,6 +212,7 @@ public:
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
void clear();
void clearWithTag(const String & tag);
size_t sizeInBytes() const;
size_t count() const;

View File

@ -3236,6 +3236,14 @@ void Context::clearQueryCache() const
shared->query_cache->clear();
}
void Context::clearQueryCacheWithTag(const String & tag) const
{
std::lock_guard lock(shared->mutex);
if (shared->query_cache)
shared->query_cache->clearWithTag(tag);
}
void Context::clearCaches() const
{
std::lock_guard lock(shared->mutex);

View File

@ -1069,6 +1069,7 @@ public:
void updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<QueryCache> getQueryCache() const;
void clearQueryCache() const;
void clearQueryCacheWithTag(const String & tag) const;
/** Clear the caches of the uncompressed blocks and marks.
* This is usually done when renaming tables, changing the type of columns, deleting a table.

View File

@ -369,9 +369,12 @@ BlockIO InterpreterSystemQuery::execute()
system_context->clearMMappedFileCache();
break;
case Type::DROP_QUERY_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
getContext()->clearQueryCache();
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
!query.query_cache_tag.empty() ? getContext()->clearQueryCacheWithTag(query.query_cache_tag): getContext()->clearQueryCache();
break;
}
case Type::DROP_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);

View File

@ -129,6 +129,7 @@ public:
String storage_policy;
String volume;
String disk;
String query_cache_tag;
UInt64 seconds{};
String filesystem_cache_name;

View File

@ -471,6 +471,7 @@ namespace DB
MR_MACROS(TABLE, "TABLE") \
MR_MACROS(TABLES, "TABLES") \
MR_MACROS(TAGS, "TAGS") \
MR_MACROS(TAG, "TAG") \
MR_MACROS(TAGS_INNER_UUID, "TAGS INNER UUID") \
MR_MACROS(TEMPORARY_TABLE, "TEMPORARY TABLE") \
MR_MACROS(TEMPORARY, "TEMPORARY") \

View File

@ -489,6 +489,16 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
return false;
break;
}
case Type::DROP_QUERY_CACHE:
{
ParserLiteral tag_parser;
ASTPtr ast;
if (ParserKeyword{Keyword::TAG}.ignore(pos, expected) && tag_parser.parse(pos, ast, expected))
res->query_cache_tag = ast->as<ASTLiteral>()->value.safeGet<String>();
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
break;
}
case Type::SYNC_FILESYSTEM_CACHE:
{
ParserLiteral path_parser;