Merge pull request #53422 from rschu1ze/consistency-fixes-for-caches

Refactorings for configuration of in-memory caches
This commit is contained in:
Robert Schulze 2023-08-17 11:08:32 +02:00 committed by GitHub
commit 1215a23de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 239 additions and 184 deletions

View File

@ -2,6 +2,8 @@
#include <sys/resource.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <base/getMemoryAmount.h>
#include <base/errnoToString.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/String.h>
@ -655,43 +657,66 @@ void LocalServer::processConfig()
/// There is no need for concurrent queries, override max_concurrent_queries.
global_context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
const size_t memory_amount = getMemoryAmount();
const double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
const size_t max_cache_size = static_cast<size_t>(memory_amount * cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
String mark_cache_policy = config().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
if (mark_cache_size)
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices).
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);
/// In Server.cpp (./clickhouse-server), we would initialize the query cache here.
/// Intentionally not doing this in clickhouse-local as it doesn't make sense.
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size
= config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// NOTE: it is important to apply any overrides before

View File

@ -29,6 +29,7 @@
#include <Common/ShellCommand.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/formatReadable.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getExecutablePath.h>
@ -658,7 +659,7 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
const auto memory_amount = getMemoryAmount();
const size_t memory_amount = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
formatReadableSizeWithBinarySuffix(memory_amount),
@ -1485,16 +1486,14 @@ try
/// Set up caches.
size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
const size_t max_cache_size = static_cast<size_t>(memory_amount * server_settings.cache_size_to_ram_max_ratio);
String uncompressed_cache_policy = server_settings.uncompressed_cache_policy;
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
size_t uncompressed_cache_size = server_settings.uncompressed_cache_size;
if (uncompressed_cache_size > max_cache_size)
{
uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
@ -1513,39 +1512,59 @@ try
server_settings.async_insert_queue_flush_on_shutdown));
}
size_t mark_cache_size = server_settings.mark_cache_size;
String mark_cache_policy = server_settings.mark_cache_policy;
size_t mark_cache_size = server_settings.mark_cache_size;
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
{
mark_cache_size = max_cache_size;
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
if (server_settings.index_uncompressed_cache_size)
size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size;
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size);
if (server_settings.index_mark_cache_size)
size_t index_mark_cache_size = server_settings.index_mark_cache_size;
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (index_mark_cache_size)
global_context->setIndexMarkCache(server_settings.index_mark_cache_size);
if (server_settings.mmap_cache_size)
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
if (mmap_cache_size)
global_context->setMMappedFileCache(server_settings.mmap_cache_size);
/// A cache for query results.
global_context->setQueryCache(config());
size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
size_t compiled_expression_cache_size = config().getUInt64("compiled_expression_cache_size", compiled_expression_cache_size_default);
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
size_t compiled_expression_cache_elements_size = config().getUInt64("compiled_expression_cache_elements_size", compiled_expression_cache_elements_size_default);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
#endif
/// Set path for format schema files

View File

@ -51,10 +51,11 @@ public:
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
static constexpr std::string_view default_cache_policy = "SLRU";
if (cache_policy_name.empty())
{
static constexpr auto default_cache_policy = "SLRU";
cache_policy_name = default_cache_policy;
}
if (cache_policy_name == "LRU")
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/defines.h>
#include <base/unit.h>
#define DBMS_DEFAULT_PORT 9000
#define DBMS_DEFAULT_SECURE_PORT 9440
@ -64,6 +65,21 @@
/// Max depth of hierarchical dictionary
#define DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH 1000
/// Default maximum (total and entry) sizes and policies of various caches
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_SIZE = 1_GiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRIES = 1024uz;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES = 1_MiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS = 30'000'000uz;
/// Query profiler cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwinding frequently (on every malloc/free, thread/mutex operations, etc).

View File

@ -2,6 +2,7 @@
#include <Core/BaseSettings.h>
#include <Core/Defines.h>
namespace Poco::Util
@ -56,13 +57,13 @@ namespace DB
M(UInt64, max_concurrent_select_queries, 0, "Limit on total number of concurrently select queries. Zero means Unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro ram max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, "SLRU", "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, 0, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, 5368709120, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, "SLRU", "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, 0, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, 0, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, 1000, "A cache for mmapped files.", 0) /* The choice of default is arbitrary. */ \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
M(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
M(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for index marks. Zero means disabled.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "A cache for mmapped files.", 0) \
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \

View File

@ -493,7 +493,6 @@ void QueryCache::reset()
cache.reset();
std::lock_guard lock(mutex);
times_executed.clear();
cache_size_in_bytes = 0;
}
size_t QueryCache::weight() const
@ -511,7 +510,7 @@ size_t QueryCache::recordQueryRun(const Key & key)
std::lock_guard lock(mutex);
size_t times = ++times_executed[key];
// Regularly drop times_executed to avoid DOS-by-unlimited-growth.
static constexpr size_t TIMES_EXECUTED_MAX_SIZE = 10'000;
static constexpr auto TIMES_EXECUTED_MAX_SIZE = 10'000uz;
if (times_executed.size() > TIMES_EXECUTED_MAX_SIZE)
times_executed.clear();
return times;
@ -522,23 +521,19 @@ std::vector<QueryCache::Cache::KeyMapped> QueryCache::dump() const
return cache.dump();
}
QueryCache::QueryCache()
QueryCache::QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
: cache(std::make_unique<TTLCachePolicy<Key, Entry, KeyHasher, QueryCacheEntryWeight, IsStale>>(std::make_unique<PerUserTTLCachePolicyUserQuota>()))
{
updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes_, max_entry_size_in_rows_);
}
void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
void QueryCache::updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
{
std::lock_guard lock(mutex);
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", 1_GiB);
cache.setMaxSize(max_size_in_bytes);
size_t max_entries = config.getUInt64("query_cache.max_entries", 1024);
cache.setMaxCount(max_entries);
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", 1_MiB);
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", 30'000'000);
max_entry_size_in_bytes = max_entry_size_in_bytes_;
max_entry_size_in_rows = max_entry_size_in_rows_;
}
}

View File

@ -4,7 +4,6 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/Sources/SourceFromChunks.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Processors/Chunk.h>
#include <QueryPipeline/Pipe.h>
@ -110,9 +109,6 @@ private:
/// query --> query result
using Cache = CacheBase<Key, Entry, KeyHasher, QueryCacheEntryWeight>;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
public:
/// Buffers multiple partial query result chunks (buffer()) and eventually stores them as cache entry (finalizeWrite()).
///
@ -177,9 +173,9 @@ public:
friend class QueryCache; /// for createReader()
};
QueryCache();
QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
void updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_);
Reader createReader(const Key & key);
Writer createWriter(const Key & key, std::chrono::milliseconds min_query_runtime, bool squash_partial_results, size_t max_block_size, size_t max_query_cache_size_in_bytes_quota, size_t max_query_cache_entries_quota);
@ -199,14 +195,15 @@ private:
Cache cache; /// has its own locking --> not protected by mutex
mutable std::mutex mutex;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
TimesExecuted times_executed TSA_GUARDED_BY(mutex);
/// Cache configuration
size_t max_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// Updated in each cache insert/delete
friend class StorageSystemQueryCache;
};

View File

@ -245,27 +245,27 @@ struct ContextSharedPart : boost::noncopyable
std::optional<BackupsWorker> backups_worker;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
std::unique_ptr<AccessControl> access_control;
mutable ResourceManagerPtr resource_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
SessionTracker session_tracker;
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
@ -2269,7 +2269,7 @@ UncompressedCachePtr Context::getUncompressedCache() const
}
void Context::dropUncompressedCache() const
void Context::clearUncompressedCache() const
{
auto lock = getLock();
if (shared->uncompressed_cache)
@ -2293,7 +2293,7 @@ MarkCachePtr Context::getMarkCache() const
return shared->mark_cache;
}
void Context::dropMarkCache() const
void Context::clearMarkCache() const
{
auto lock = getLock();
if (shared->mark_cache)
@ -2315,32 +2315,6 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
static size_t getPrefetchThreadpoolSizeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return getPrefetchThreadpoolSizeFromConfig(config);
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
@ -2351,7 +2325,6 @@ void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
@ -2359,7 +2332,7 @@ UncompressedCachePtr Context::getIndexUncompressedCache() const
}
void Context::dropIndexUncompressedCache() const
void Context::clearIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
@ -2383,44 +2356,13 @@ MarkCachePtr Context::getIndexMarkCache() const
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
void Context::clearIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setQueryCache(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>();
shared->query_cache->updateConfiguration(config);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->updateConfiguration(config);
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::dropQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
@ -2437,15 +2379,50 @@ MMappedFileCachePtr Context::getMMappedFileCache() const
return shared->mmap_cache;
}
void Context::dropMMappedFileCache() const
void Context::clearMMappedFileCache() const
{
auto lock = getLock();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
void Context::setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows)
{
auto lock = getLock();
void Context::dropCaches() const
if (shared->query_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query cache has been already created.");
shared->query_cache = std::make_shared<QueryCache>(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
void Context::updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
if (shared->query_cache)
{
size_t max_size_in_bytes = config.getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t max_entries = config.getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
shared->query_cache->updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes, max_entry_size_in_rows);
}
}
QueryCachePtr Context::getQueryCache() const
{
auto lock = getLock();
return shared->query_cache;
}
void Context::clearQueryCache() const
{
auto lock = getLock();
if (shared->query_cache)
shared->query_cache->reset();
}
void Context::clearCaches() const
{
auto lock = getLock();
@ -2461,11 +2438,31 @@ void Context::dropCaches() const
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->query_cache)
shared->query_cache->reset();
if (shared->mmap_cache)
shared->mmap_cache->reset();
/// Intentionally not dropping the query cache which is transactionally inconsistent by design.
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const

View File

@ -915,44 +915,39 @@ public:
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// --- Caches ------------------------------------------------------------------------------------------
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
void dropUncompressedCache() const;
void clearUncompressedCache() const;
/// Create a cache of marks of specified size. This can be done only once.
void setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
void clearMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;
void clearIndexUncompressedCache() const;
/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;
void clearIndexMarkCache() const;
/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;
void dropMMappedFileCache() const;
void clearMMappedFileCache() const;
/// Create a cache of query results for statements which run repeatedly.
void setQueryCache(const Poco::Util::AbstractConfiguration & config);
void setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows);
void updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<QueryCache> getQueryCache() const;
void dropQueryCache() const;
void clearQueryCache() const;
/** Clear the caches of the uncompressed blocks and marks.
* This is usually done when renaming tables, changing the type of columns, deleting a table.
@ -960,7 +955,16 @@ public:
* (when deleting a table - it is necessary, since in its place another can appear)
* const - because the change in the cache is not considered significant.
*/
void dropCaches() const;
void clearCaches() const;
/// -----------------------------------------------------------------------------------------------------
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;

View File

@ -247,10 +247,10 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
DatabaseCatalog::instance().removeDependencies(table_id, check_ref_deps, check_loading_deps, is_drop_or_detach_database);
database->dropTable(context_, table_id.table_name, query.sync);
/// We have to drop mmapio cache when dropping table from Ordinary database
/// We have to clear mmapio cache when dropping table from Ordinary database
/// to avoid reading old data if new table with the same name is created
if (database->getUUID() == UUIDHelpers::Nil)
context_->dropMMappedFileCache();
context_->clearMMappedFileCache();
}
db = database;

View File

@ -319,27 +319,27 @@ BlockIO InterpreterSystemQuery::execute()
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropMarkCache();
system_context->clearMarkCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache();
system_context->clearUncompressedCache();
break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropIndexMarkCache();
system_context->clearIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropIndexUncompressedCache();
system_context->clearIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache();
system_context->clearMMappedFileCache();
break;
case Type::DROP_QUERY_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
getContext()->dropQueryCache();
getContext()->clearQueryCache();
break;
#if USE_EMBEDDED_COMPILER
case Type::DROP_COMPILED_EXPRESSION_CACHE:

View File

@ -19,7 +19,7 @@ public:
size_t getCompiledExpressionSize() const { return compiled_expression_size; }
virtual ~CompiledExpressionCacheEntry() {}
virtual ~CompiledExpressionCacheEntry() = default;
private:

View File

@ -2328,7 +2328,7 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
removePartsFinally(parts_to_remove);
/// This is needed to close files to avoid they reside on disk after being deleted.
/// NOTE: we can drop files from cache more selectively but this is good enough.
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
return parts_to_remove.size();
}
@ -2834,7 +2834,7 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
}
if (!getStorageID().hasUUID())
getContext()->dropCaches();
getContext()->clearCaches();
/// TODO: remove const_cast
for (const auto & part : data_parts_by_info)
@ -2875,9 +2875,9 @@ void MergeTreeData::dropAllData()
}
/// Tables in atomic databases have UUID and stored in persistent locations.
/// No need to drop caches (that are keyed by filesystem path) because collision is not possible.
/// No need to clear caches (that are keyed by filesystem path) because collision is not possible.
if (!getStorageID().hasUUID())
getContext()->dropCaches();
getContext()->clearCaches();
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
NameSet part_names_failed;

View File

@ -777,7 +777,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr
num_marks_saved = 0;
total_rows = 0;
total_bytes = 0;
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
}

View File

@ -1997,7 +1997,7 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
}
/// New parts with other data may appear in place of deleted parts.
local_context->dropCaches();
local_context->clearCaches();
return results;
}

View File

@ -426,7 +426,7 @@ void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
num_indices_saved = 0;
total_rows = 0;
total_bytes = 0;
getContext()->dropMMappedFileCache();
getContext()->clearMMappedFileCache();
}