mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge pull request #48284 from ClickHouse/rs/qc-quota
Query Cache: Allow per-user quotas
This commit is contained in:
commit
ded8eca041
@ -88,6 +88,33 @@ If the query was aborted due to an exception or user cancellation, no entry is w
|
||||
The size of the query cache in bytes, the maximum number of cache entries and the maximum size of individual cache entries (in bytes and in
|
||||
records) can be configured using different [server configuration options](server-configuration-parameters/settings.md#server_configuration_parameters_query-cache).
|
||||
|
||||
It is also possible to limit the cache usage of individual users using [settings profiles](settings/settings-profiles.md) and [settings
|
||||
constraints](settings/constraints-on-settings.md). More specifically, you can restrict the maximum amount of memory (in bytes) a user may
|
||||
allocate in the query cache and the the maximum number of stored query results. For that, first provide configurations
|
||||
[query_cache_max_size_in_bytes](settings/settings.md#query-cache-max-size-in-bytes) and
|
||||
[query_cache_max_entries](settings/settings.md#query-cache-size-max-items) in a user profile in `users.xml`, then make both settings
|
||||
readonly:
|
||||
|
||||
``` xml
|
||||
<profiles>
|
||||
<default>
|
||||
<!-- The maximum cache size in bytes for user/profile 'default' -->
|
||||
<query_cache_max_size_in_bytes>10000</query_cache_max_size_in_bytes>
|
||||
<!-- The maximum number of SELECT query results stored in the cache for user/profile 'default' -->
|
||||
<query_cache_max_entries>100</query_cache_max_entries>
|
||||
<!-- Make both settings read-only so the user cannot change them -->
|
||||
<constraints>
|
||||
<query_cache_max_size_in_bytes>
|
||||
<readonly/>
|
||||
</query_cache_max_size_in_bytes>
|
||||
<query_cache_max_entries>
|
||||
<readonly/>
|
||||
<query_cache_max_entries>
|
||||
</constraints>
|
||||
</default>
|
||||
</profiles>
|
||||
```
|
||||
|
||||
To define how long a query must run at least such that its result can be cached, you can use setting
|
||||
[query_cache_min_query_duration](settings/settings.md#query-cache-min-query-duration). For example, the result of query
|
||||
|
||||
|
@ -1382,25 +1382,25 @@ If the table does not exist, ClickHouse will create it. If the structure of the
|
||||
|
||||
The following settings are available:
|
||||
|
||||
- `max_size`: The maximum cache size in bytes. 0 means the query cache is disabled. Default value: `1073741824` (1 GiB).
|
||||
- `max_size_in_bytes`: The maximum cache size in bytes. 0 means the query cache is disabled. Default value: `1073741824` (1 GiB).
|
||||
- `max_entries`: The maximum number of `SELECT` query results stored in the cache. Default value: `1024`.
|
||||
- `max_entry_size`: The maximum size in bytes `SELECT` query results may have to be saved in the cache. Default value: `1048576` (1 MiB).
|
||||
- `max_entry_rows`: The maximum number of rows `SELECT` query results may have to be saved in the cache. Default value: `30000000` (30 mil).
|
||||
- `max_entry_size_in_bytes`: The maximum size in bytes `SELECT` query results may have to be saved in the cache. Default value: `1048576` (1 MiB).
|
||||
- `max_entry_size_in_rows`: The maximum number of rows `SELECT` query results may have to be saved in the cache. Default value: `30000000` (30 mil).
|
||||
|
||||
Changed settings take effect immediately.
|
||||
|
||||
:::note
|
||||
Data for the query cache is allocated in DRAM. If memory is scarce, make sure to set a small value for `max_size` or disable the query cache altogether.
|
||||
Data for the query cache is allocated in DRAM. If memory is scarce, make sure to set a small value for `max_size_in_bytes` or disable the query cache altogether.
|
||||
:::
|
||||
|
||||
**Example**
|
||||
|
||||
```xml
|
||||
<query_cache>
|
||||
<max_size>1073741824</max_size>
|
||||
<max_size_in_bytes>1073741824</max_size_in_bytes>
|
||||
<max_entries>1024</max_entries>
|
||||
<max_entry_size>1048576</max_entry_size>
|
||||
<max_entry_rows>30000000</max_entry_rows>
|
||||
<max_entry_size_in_bytes>1048576</max_entry_size_in_bytes>
|
||||
<max_entry_size_in_rows>30000000</max_entry_size_in_rows>
|
||||
</query_cache>
|
||||
```
|
||||
|
||||
|
@ -1512,6 +1512,26 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
## query_cache_max_size_in_bytes {#query-cache-max-size-in-bytes}
|
||||
|
||||
The maximum amount of memory (in bytes) the current user may allocate in the query cache. 0 means unlimited.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Positive integer >= 0.
|
||||
|
||||
Default value: 0 (no restriction).
|
||||
|
||||
## query_cache_max_entries {#query-cache-max-entries}
|
||||
|
||||
The maximum number of query results the current user may store in the query cache. 0 means unlimited.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Positive integer >= 0.
|
||||
|
||||
Default value: 0 (no restriction).
|
||||
|
||||
## insert_quorum {#settings-insert_quorum}
|
||||
|
||||
Enables the quorum writes.
|
||||
|
@ -1517,10 +1517,10 @@
|
||||
|
||||
<!-- Configuration for the query cache -->
|
||||
<!-- <query_cache> -->
|
||||
<!-- <max_size>1073741824</max_size> -->
|
||||
<!-- <max_size_in_bytes>1073741824</max_size_in_bytes> -->
|
||||
<!-- <max_entries>1024</max_entries> -->
|
||||
<!-- <max_entry_size>1048576</max_entry_size> -->
|
||||
<!-- <max_entry_rows>30000000</max_entry_rows> -->
|
||||
<!-- <max_entry_size_in_bytes>1048576</max_entry_size_in_bytes> -->
|
||||
<!-- <max_entry_size_in_rows>30000000</max_entry_size_in_rows> -->
|
||||
<!-- </query_cache> -->
|
||||
|
||||
<!-- Uncomment if enable merge tree metadata cache -->
|
||||
|
@ -214,13 +214,19 @@ public:
|
||||
void setMaxCount(size_t max_count)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return cache_policy->setMaxCount(max_count, lock);
|
||||
cache_policy->setMaxCount(max_count, lock);
|
||||
}
|
||||
|
||||
void setMaxSize(size_t max_size_in_bytes)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return cache_policy->setMaxSize(max_size_in_bytes, lock);
|
||||
cache_policy->setMaxSize(max_size_in_bytes, lock);
|
||||
}
|
||||
|
||||
void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
cache_policy->setQuotaForUser(user_name, max_size_in_bytes, max_entries, lock);
|
||||
}
|
||||
|
||||
virtual ~CacheBase() = default;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ICachePolicyUserQuota.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -38,12 +39,16 @@ public:
|
||||
MappedPtr mapped;
|
||||
};
|
||||
|
||||
virtual size_t weight(std::lock_guard<std::mutex> & /* cache_lock */) const = 0;
|
||||
virtual size_t count(std::lock_guard<std::mutex> & /* cache_lock */) const = 0;
|
||||
virtual size_t maxSize(std::lock_guard<std::mutex>& /* cache_lock */) const = 0;
|
||||
explicit ICachePolicy(CachePolicyUserQuotaPtr user_quotas_) : user_quotas(std::move(user_quotas_)) {}
|
||||
virtual ~ICachePolicy() = default;
|
||||
|
||||
virtual size_t weight(std::lock_guard<std::mutex> & /*cache_lock*/) const = 0;
|
||||
virtual size_t count(std::lock_guard<std::mutex> & /*cache_lock*/) const = 0;
|
||||
virtual size_t maxSize(std::lock_guard<std::mutex>& /*cache_lock*/) const = 0;
|
||||
|
||||
virtual void setMaxCount(size_t /*max_count*/, std::lock_guard<std::mutex> & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); }
|
||||
virtual void setMaxSize(size_t /*max_size_in_bytes*/, std::lock_guard<std::mutex> & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); }
|
||||
virtual void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries, std::lock_guard<std::mutex> & /*cache_lock*/) { user_quotas->setQuotaForUser(user_name, max_size_in_bytes, max_entries); }
|
||||
|
||||
/// HashFunction usually hashes the entire key and the found key will be equal the provided key. In such cases, use get(). It is also
|
||||
/// possible to store other, non-hashed data in the key. In that case, the found key is potentially different from the provided key.
|
||||
@ -51,14 +56,15 @@ public:
|
||||
virtual MappedPtr get(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
|
||||
virtual std::optional<KeyMapped> getWithKey(const Key &, std::lock_guard<std::mutex> & /*cache_lock*/) = 0;
|
||||
|
||||
virtual void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
|
||||
virtual void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /*cache_lock*/) = 0;
|
||||
|
||||
virtual void remove(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
|
||||
virtual void remove(const Key & key, std::lock_guard<std::mutex> & /*cache_lock*/) = 0;
|
||||
|
||||
virtual void reset(std::lock_guard<std::mutex> & /* cache_lock */) = 0;
|
||||
virtual void reset(std::lock_guard<std::mutex> & /*cache_lock*/) = 0;
|
||||
virtual std::vector<KeyMapped> dump() const = 0;
|
||||
|
||||
virtual ~ICachePolicy() = default;
|
||||
protected:
|
||||
CachePolicyUserQuotaPtr user_quotas;
|
||||
};
|
||||
|
||||
}
|
||||
|
43
src/Common/ICachePolicyUserQuota.h
Normal file
43
src/Common/ICachePolicyUserQuota.h
Normal file
@ -0,0 +1,43 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Per-user quotas for usage of shared caches, used by ICachePolicy.
|
||||
/// Currently allows to limit
|
||||
/// - the maximum amount of cache memory a user may consume
|
||||
/// - the maximum number of items a user can store in the cache
|
||||
/// Note that caches usually also have global limits which restrict these values at cache level. Per-user quotas have no effect if they
|
||||
/// exceed the global thresholds.
|
||||
class ICachePolicyUserQuota
|
||||
{
|
||||
public:
|
||||
/// Register or update the user's quota for the given resource.
|
||||
virtual void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries) = 0;
|
||||
|
||||
/// Update the actual resource usage for the given user.
|
||||
virtual void increaseActual(const String & user_name, size_t entry_size_in_bytes) = 0;
|
||||
virtual void decreaseActual(const String & user_name, size_t entry_size_in_bytes) = 0;
|
||||
|
||||
/// Is the user allowed to write a new entry into the cache?
|
||||
virtual bool approveWrite(const String & user_name, size_t entry_size_in_bytes) const = 0;
|
||||
|
||||
virtual ~ICachePolicyUserQuota() = default;
|
||||
};
|
||||
|
||||
using CachePolicyUserQuotaPtr = std::unique_ptr<ICachePolicyUserQuota>;
|
||||
|
||||
|
||||
class NoCachePolicyUserQuota : public ICachePolicyUserQuota
|
||||
{
|
||||
public:
|
||||
void setQuotaForUser(const String & /*user_name*/, size_t /*max_size_in_bytes*/, size_t /*max_entries*/) override {}
|
||||
void increaseActual(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) override {}
|
||||
void decreaseActual(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) override {}
|
||||
bool approveWrite(const String & /*user_name*/, size_t /*entry_size_in_bytes*/) const override { return true; }
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -27,7 +27,8 @@ public:
|
||||
* max_count == 0 means no elements size restrictions.
|
||||
*/
|
||||
LRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, OnWeightLossFunction on_weight_loss_function_)
|
||||
: max_size_in_bytes(std::max(1uz, max_size_in_bytes_))
|
||||
: Base(std::make_unique<NoCachePolicyUserQuota>())
|
||||
, max_size_in_bytes(std::max(1uz, max_size_in_bytes_))
|
||||
, max_count(max_count_)
|
||||
, on_weight_loss_function(on_weight_loss_function_)
|
||||
{
|
||||
|
@ -31,7 +31,8 @@ public:
|
||||
*/
|
||||
/// TODO: construct from special struct with cache policy parameters (also with max_protected_size).
|
||||
SLRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, double size_ratio, OnWeightLossFunction on_weight_loss_function_)
|
||||
: max_protected_size(static_cast<size_t>(max_size_in_bytes_ * std::min(1.0, size_ratio)))
|
||||
: Base(std::make_unique<NoCachePolicyUserQuota>())
|
||||
, max_protected_size(static_cast<size_t>(max_size_in_bytes_ * std::min(1.0, size_ratio)))
|
||||
, max_size_in_bytes(max_size_in_bytes_)
|
||||
, max_count(max_count_)
|
||||
, on_weight_loss_function(on_weight_loss_function_)
|
||||
|
@ -2,11 +2,80 @@
|
||||
|
||||
#include <Common/ICachePolicy.h>
|
||||
|
||||
#include <limits>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class PerUserTTLCachePolicyUserQuota : public ICachePolicyUserQuota
|
||||
{
|
||||
public:
|
||||
void setQuotaForUser(const String & user_name, size_t max_size_in_bytes, size_t max_entries) override
|
||||
{
|
||||
quotas[user_name] = {max_size_in_bytes, max_entries};
|
||||
}
|
||||
|
||||
void increaseActual(const String & user_name, size_t entry_size_in_bytes) override
|
||||
{
|
||||
auto & actual_for_user = actual[user_name];
|
||||
actual_for_user.size_in_bytes += entry_size_in_bytes;
|
||||
actual_for_user.num_items += 1;
|
||||
}
|
||||
|
||||
void decreaseActual(const String & user_name, size_t entry_size_in_bytes) override
|
||||
{
|
||||
chassert(actual.contains(user_name));
|
||||
|
||||
chassert(actual[user_name].size_in_bytes >= entry_size_in_bytes);
|
||||
actual[user_name].size_in_bytes -= entry_size_in_bytes;
|
||||
|
||||
chassert(actual[user_name].num_items >= 1);
|
||||
actual[user_name].num_items -= 1;
|
||||
}
|
||||
|
||||
bool approveWrite(const String & user_name, size_t entry_size_in_bytes) const override
|
||||
{
|
||||
auto it_actual = actual.find(user_name);
|
||||
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// assume zero actual resource consumption is user isn't found
|
||||
if (it_actual != actual.end())
|
||||
actual_for_user = it_actual->second;
|
||||
|
||||
auto it_quota = quotas.find(user_name);
|
||||
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// assume no threshold if no quota is found
|
||||
if (it_quota != quotas.end())
|
||||
quota_for_user = it_quota->second;
|
||||
|
||||
/// Special case: A quota configured as 0 means no threshold
|
||||
if (quota_for_user.size_in_bytes == 0)
|
||||
quota_for_user.size_in_bytes = std::numeric_limits<UInt64>::max();
|
||||
if (quota_for_user.num_items == 0)
|
||||
quota_for_user.num_items = std::numeric_limits<UInt64>::max();
|
||||
|
||||
/// Check size quota
|
||||
if (actual_for_user.size_in_bytes + entry_size_in_bytes >= quota_for_user.size_in_bytes)
|
||||
return false;
|
||||
|
||||
/// Check items quota
|
||||
if (quota_for_user.num_items + 1 >= quota_for_user.num_items)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
struct Resources
|
||||
{
|
||||
size_t size_in_bytes = 0;
|
||||
size_t num_items = 0;
|
||||
};
|
||||
|
||||
/// user name --> cache size quota (in bytes) / number of items quota
|
||||
std::map<String, Resources> quotas;
|
||||
/// user name --> actual cache usage (in bytes) / number of items
|
||||
std::map<String, Resources> actual;
|
||||
};
|
||||
|
||||
|
||||
/// TTLCachePolicy evicts entries for which IsStaleFunction returns true.
|
||||
/// The cache size (in bytes and number of entries) can be changed at runtime. It is expected to set both sizes explicitly after construction.
|
||||
template <typename Key, typename Mapped, typename HashFunction, typename WeightFunction, typename IsStaleFunction>
|
||||
@ -18,8 +87,9 @@ public:
|
||||
using typename Base::KeyMapped;
|
||||
using typename Base::OnWeightLossFunction;
|
||||
|
||||
TTLCachePolicy()
|
||||
: max_size_in_bytes(0)
|
||||
explicit TTLCachePolicy(CachePolicyUserQuotaPtr quotas_)
|
||||
: Base(std::move(quotas_))
|
||||
, max_size_in_bytes(0)
|
||||
, max_count(0)
|
||||
{
|
||||
}
|
||||
@ -61,8 +131,10 @@ public:
|
||||
auto it = cache.find(key);
|
||||
if (it == cache.end())
|
||||
return;
|
||||
size_in_bytes -= weight_function(*it->second);
|
||||
size_t sz = weight_function(*it->second);
|
||||
Base::user_quotas->decreaseActual(it->first.user_name, sz);
|
||||
cache.erase(it);
|
||||
size_in_bytes -= sz;
|
||||
}
|
||||
|
||||
MappedPtr get(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) override
|
||||
@ -88,35 +160,47 @@ public:
|
||||
|
||||
const size_t entry_size_in_bytes = weight_function(*mapped);
|
||||
|
||||
/// Checks against per-cache limits
|
||||
auto sufficient_space_in_cache = [&]()
|
||||
{
|
||||
return (size_in_bytes + entry_size_in_bytes <= max_size_in_bytes) && (cache.size() + 1 <= max_count);
|
||||
};
|
||||
|
||||
if (!sufficient_space_in_cache())
|
||||
/// Checks against per-user limits
|
||||
auto sufficient_space_in_cache_for_user = [&]()
|
||||
{
|
||||
return Base::user_quotas->approveWrite(key.user_name, entry_size_in_bytes);
|
||||
};
|
||||
|
||||
if (!sufficient_space_in_cache() || !sufficient_space_in_cache_for_user())
|
||||
{
|
||||
/// Remove stale entries
|
||||
for (auto it = cache.begin(); it != cache.end();)
|
||||
if (is_stale_function(it->first))
|
||||
{
|
||||
size_in_bytes -= weight_function(*it->second);
|
||||
size_t sz = weight_function(*it->second);
|
||||
Base::user_quotas->decreaseActual(it->first.user_name, sz);
|
||||
it = cache.erase(it);
|
||||
size_in_bytes -= sz;
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
|
||||
if (sufficient_space_in_cache())
|
||||
if (sufficient_space_in_cache() && sufficient_space_in_cache_for_user())
|
||||
{
|
||||
/// Insert or replace key
|
||||
if (auto it = cache.find(key); it != cache.end())
|
||||
{
|
||||
size_in_bytes -= weight_function(*it->second);
|
||||
size_t sz = weight_function(*it->second);
|
||||
Base::user_quotas->decreaseActual(it->first.user_name, sz);
|
||||
cache.erase(it); // stupid bug: (*) doesn't replace existing entries (likely due to custom hash function), need to erase explicitly
|
||||
size_in_bytes -= sz;
|
||||
}
|
||||
|
||||
cache[key] = std::move(mapped); // (*)
|
||||
size_in_bytes += entry_size_in_bytes;
|
||||
Base::user_quotas->increaseActual(key.user_name, entry_size_in_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -565,6 +565,8 @@ class IColumn;
|
||||
M(Bool, enable_writes_to_query_cache, true, "Enable storing results of SELECT queries in the query cache", 0) \
|
||||
M(Bool, enable_reads_from_query_cache, true, "Enable reading results of SELECT queries from the query cache", 0) \
|
||||
M(Bool, query_cache_store_results_of_queries_with_nondeterministic_functions, false, "Store results of queries with non-deterministic functions (e.g. rand(), now()) in the query cache", 0) \
|
||||
M(UInt64, query_cache_max_size_in_bytes, 0, "The maximum amount of memory (in bytes) the current user may allocate in the query cache. 0 means unlimited. ", 0) \
|
||||
M(UInt64, query_cache_max_entries, 0, "The maximum number of query results the current user may store in the query cache. 0 means unlimited.", 0) \
|
||||
M(UInt64, query_cache_min_query_runs, 0, "Minimum number a SELECT query must run before its result is stored in the query cache", 0) \
|
||||
M(Milliseconds, query_cache_min_query_duration, 0, "Minimum time in milliseconds for a query to run for its result to be stored in the query cache.", 0) \
|
||||
M(Bool, query_cache_compress_entries, true, "Compress cache entries.", 0) \
|
||||
|
@ -123,12 +123,13 @@ ASTPtr removeQueryCacheSettings(ASTPtr ast)
|
||||
QueryCache::Key::Key(
|
||||
ASTPtr ast_,
|
||||
Block header_,
|
||||
const std::optional<String> & username_,
|
||||
const String & user_name_, bool is_shared_,
|
||||
std::chrono::time_point<std::chrono::system_clock> expires_at_,
|
||||
bool is_compressed_)
|
||||
: ast(removeQueryCacheSettings(ast_))
|
||||
, header(header_)
|
||||
, username(username_)
|
||||
, user_name(user_name_)
|
||||
, is_shared(is_shared_)
|
||||
, expires_at(expires_at_)
|
||||
, is_compressed(is_compressed_)
|
||||
{
|
||||
@ -169,7 +170,8 @@ bool QueryCache::IsStale::operator()(const Key & key) const
|
||||
return (key.expires_at < std::chrono::system_clock::now());
|
||||
};
|
||||
|
||||
QueryCache::Writer::Writer(Cache & cache_, const Key & key_,
|
||||
QueryCache::Writer::Writer(
|
||||
Cache & cache_, const Key & key_,
|
||||
size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_,
|
||||
std::chrono::milliseconds min_query_runtime_,
|
||||
bool squash_partial_results_,
|
||||
@ -307,7 +309,7 @@ QueryCache::Reader::Reader(Cache & cache_, const Key & key, const std::lock_guar
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry->key.username.has_value() && entry->key.username != key.username)
|
||||
if (!entry->key.is_shared && entry->key.user_name != key.user_name)
|
||||
{
|
||||
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Inaccessible entry found for query {}", key.queryStringFromAst());
|
||||
return;
|
||||
@ -368,8 +370,13 @@ QueryCache::Reader QueryCache::createReader(const Key & key)
|
||||
return Reader(cache, key, lock);
|
||||
}
|
||||
|
||||
QueryCache::Writer QueryCache::createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size)
|
||||
QueryCache::Writer QueryCache::createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota)
|
||||
{
|
||||
/// Update the per-user cache quotas with the values stored in the query context. This happens per query which writes into the query
|
||||
/// cache. Obviously, this is overkill but I could find the good place to hook into which is called when the settings profiles in
|
||||
/// users.xml change.
|
||||
cache.setQuotaForUser(key.user_name, max_query_cache_size_in_bytes_quota, max_query_cache_entries_quota);
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
return Writer(cache, key, max_entry_size_in_bytes, max_entry_size_in_rows, min_query_runtime, squash_partial_results, max_block_size);
|
||||
}
|
||||
@ -399,7 +406,7 @@ std::vector<QueryCache::Cache::KeyMapped> QueryCache::dump() const
|
||||
}
|
||||
|
||||
QueryCache::QueryCache()
|
||||
: cache(std::make_unique<TTLCachePolicy<Key, Chunks, KeyHasher, QueryResultWeight, IsStale>>())
|
||||
: cache(std::make_unique<TTLCachePolicy<Key, Chunks, KeyHasher, QueryResultWeight, IsStale>>(std::make_unique<PerUserTTLCachePolicyUserQuota>()))
|
||||
{
|
||||
}
|
||||
|
||||
@ -407,14 +414,14 @@ void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & c
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size", 1_GiB);
|
||||
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", 1_GiB);
|
||||
cache.setMaxSize(max_size_in_bytes);
|
||||
|
||||
size_t max_entries = config.getUInt64("query_cache.max_entries", 1024);
|
||||
cache.setMaxCount(max_entries);
|
||||
|
||||
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size", 1_MiB);
|
||||
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows", 30'000'000);
|
||||
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", 1_MiB);
|
||||
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", 30'000'000);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -42,10 +42,13 @@ public:
|
||||
/// Result metadata for constructing the pipe.
|
||||
const Block header;
|
||||
|
||||
/// std::nullopt means that the associated entry can be read by other users. In general, sharing is a bad idea: First, it is
|
||||
/// unlikely that different users pose the same queries. Second, sharing potentially breaches security. E.g. User A should not be
|
||||
/// able to bypass row policies on some table by running the same queries as user B for whom no row policies exist.
|
||||
const std::optional<String> username;
|
||||
/// The user who executed the query.
|
||||
const String user_name;
|
||||
|
||||
/// If the associated entry can be read by other users. In general, sharing is a bad idea: First, it is unlikely that different
|
||||
/// users pose the same queries. Second, sharing potentially breaches security. E.g. User A should not be able to bypass row
|
||||
/// policies on some table by running the same queries as user B for whom no row policies exist.
|
||||
bool is_shared;
|
||||
|
||||
/// When does the entry expire?
|
||||
const std::chrono::time_point<std::chrono::system_clock> expires_at;
|
||||
@ -55,7 +58,7 @@ public:
|
||||
|
||||
Key(ASTPtr ast_,
|
||||
Block header_,
|
||||
const std::optional<String> & username_,
|
||||
const String & user_name_, bool is_shared_,
|
||||
std::chrono::time_point<std::chrono::system_clock> expires_at_,
|
||||
bool is_compressed);
|
||||
|
||||
@ -144,7 +147,7 @@ public:
|
||||
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
Reader createReader(const Key & key);
|
||||
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size);
|
||||
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
|
||||
|
||||
void reset();
|
||||
|
||||
|
@ -725,9 +725,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
{
|
||||
QueryCache::Key key(
|
||||
ast, res.pipeline.getHeader(),
|
||||
std::make_optional<String>(context->getUserName()),
|
||||
context->getUserName(), /*dummy for is_shared*/ false,
|
||||
/*dummy value for expires_at*/ std::chrono::system_clock::from_time_t(1),
|
||||
/*dummy value for is_compressed*/ true);
|
||||
/*dummy value for is_compressed*/ false);
|
||||
QueryCache::Reader reader = query_cache->createReader(key);
|
||||
if (reader.hasCacheEntryForKey())
|
||||
{
|
||||
@ -748,7 +748,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
{
|
||||
QueryCache::Key key(
|
||||
ast, res.pipeline.getHeader(),
|
||||
settings.query_cache_share_between_users ? std::nullopt : std::make_optional<String>(context->getUserName()),
|
||||
context->getUserName(), settings.query_cache_share_between_users,
|
||||
std::chrono::system_clock::now() + std::chrono::seconds(settings.query_cache_ttl),
|
||||
settings.query_cache_compress_entries);
|
||||
|
||||
@ -760,7 +760,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
res.pipeline.getHeader(), query_cache, key,
|
||||
std::chrono::milliseconds(context->getSettings().query_cache_min_query_duration.totalMilliseconds()),
|
||||
context->getSettings().query_cache_squash_partial_results,
|
||||
context->getSettings().max_block_size);
|
||||
context->getSettings().max_block_size,
|
||||
context->getSettings().query_cache_max_size_in_bytes,
|
||||
context->getSettings().query_cache_max_entries);
|
||||
res.pipeline.streamIntoQueryCache(stream_in_query_cache_transform);
|
||||
}
|
||||
}
|
||||
|
@ -9,9 +9,10 @@ StreamInQueryCacheTransform::StreamInQueryCacheTransform(
|
||||
const QueryCache::Key & cache_key,
|
||||
std::chrono::milliseconds min_query_duration,
|
||||
bool squash_partial_results,
|
||||
size_t max_block_size)
|
||||
size_t max_block_size,
|
||||
size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota)
|
||||
: ISimpleTransform(header_, header_, false)
|
||||
, cache_writer(cache->createWriter(cache_key, min_query_duration, squash_partial_results, max_block_size))
|
||||
, cache_writer(cache->createWriter(cache_key, min_query_duration, squash_partial_results, max_block_size, max_query_cache_size_in_bytes_quota, max_query_cache_entries_quota))
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,8 @@ public:
|
||||
const QueryCache::Key & cache_key,
|
||||
std::chrono::milliseconds min_query_duration,
|
||||
bool squash_partial_results,
|
||||
size_t max_block_size);
|
||||
size_t max_block_size,
|
||||
size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
@ -36,18 +36,18 @@ void StorageSystemQueryCache::fillData(MutableColumns & res_columns, ContextPtr
|
||||
|
||||
std::vector<QueryCache::Cache::KeyMapped> content = query_cache->dump();
|
||||
|
||||
const String & username = context->getUserName();
|
||||
const String & user_name = context->getUserName();
|
||||
|
||||
for (const auto & [key, query_result] : content)
|
||||
{
|
||||
/// Showing other user's queries is considered a security risk
|
||||
if (key.username.has_value() && key.username != username)
|
||||
if (!key.is_shared && key.user_name != user_name)
|
||||
continue;
|
||||
|
||||
res_columns[0]->insert(key.queryStringFromAst()); /// approximates the original query string
|
||||
res_columns[1]->insert(QueryCache::QueryResultWeight()(*query_result));
|
||||
res_columns[2]->insert(key.expires_at < std::chrono::system_clock::now());
|
||||
res_columns[3]->insert(!key.username.has_value());
|
||||
res_columns[3]->insert(!key.is_shared);
|
||||
res_columns[4]->insert(key.is_compressed);
|
||||
res_columns[5]->insert(std::chrono::system_clock::to_time_t(key.expires_at));
|
||||
res_columns[6]->insert(key.ast->getTreeHash().first);
|
||||
|
@ -0,0 +1,10 @@
|
||||
Run SELECT with quota that current user may use only 1 byte in the query cache 1
|
||||
Expect no entries in the query cache 0
|
||||
Run SELECT again but w/o quota 1
|
||||
Expect one entry in the query cache 1
|
||||
---
|
||||
Run SELECT which writes its result in the query cache 1
|
||||
Run another SELECT with quota that current user may write only 1 entry in the query cache 1
|
||||
Expect one entry in the query cache 1
|
||||
Run another SELECT w/o quota 1
|
||||
Expect two entries in the query cache 2
|
31
tests/queries/0_stateless/02494_query_cache_user_quotas.sql
Normal file
31
tests/queries/0_stateless/02494_query_cache_user_quotas.sql
Normal file
@ -0,0 +1,31 @@
|
||||
-- Tags: no-parallel
|
||||
-- Tag no-parallel: Messes with internal cache
|
||||
|
||||
-- Tests per-user quotas of the query cache. Settings 'query_cache_max_size_in_bytes' and 'query_cache_max_entries' are actually supposed to
|
||||
-- be used in a settings profile, together with a readonly constraint. For simplicity, test both settings stand-alone in a stateless test
|
||||
-- instead of an integration test - the relevant logic will still be covered by that.
|
||||
|
||||
SET allow_experimental_query_cache = true;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
SET query_cache_max_size_in_bytes = 1;
|
||||
SELECT 'Run SELECT with quota that current user may use only 1 byte in the query cache', 1 SETTINGS use_query_cache = true;
|
||||
SELECT 'Expect no entries in the query cache', count(*) FROM system.query_cache;
|
||||
|
||||
SET query_cache_max_size_in_bytes = DEFAULT;
|
||||
SELECT 'Run SELECT again but w/o quota', 1 SETTINGS use_query_cache = true;
|
||||
SELECT 'Expect one entry in the query cache', count(*) FROM system.query_cache;
|
||||
|
||||
SELECT '---';
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
SELECT 'Run SELECT which writes its result in the query cache', 1 SETTINGS use_query_cache = true;
|
||||
SET query_cache_max_entries = 1;
|
||||
SELECT 'Run another SELECT with quota that current user may write only 1 entry in the query cache', 1 SETTINGS use_query_cache = true;
|
||||
SELECT 'Expect one entry in the query cache', count(*) FROM system.query_cache;
|
||||
SET query_cache_max_entries = DEFAULT;
|
||||
SELECT 'Run another SELECT w/o quota', 1 SETTINGS use_query_cache = true;
|
||||
SELECT 'Expect two entries in the query cache', count(*) FROM system.query_cache;
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
Loading…
Reference in New Issue
Block a user