support_max_query_cache_size

This commit is contained in:
KinderRiven 2022-06-05 16:21:36 +08:00
parent f860407af5
commit 3ffb0b3549
8 changed files with 424 additions and 1 deletions

View File

@ -112,6 +112,7 @@ void LRUFileCache::useCell(
{
/// Move to the end of the queue. The iterator remains valid.
queue.moveToEnd(*cell.queue_iterator, cache_lock);
updateQueryContext(cell.file_segment->key(), cell.file_segment->offset(), 0, cache_lock);
}
}
@ -480,7 +481,43 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
return FileSegmentsHolder(std::move(file_segments));
}
bool LRUFileCache::tryReserve(
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto res = tryReserveForQuery(key, offset, size, cache_lock);
if (res == ReserveResult::FINISHED)
{
/// When the maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
updateQueryContext(key, offset, size, cache_lock);
return true;
}
else
{
if (res == ReserveResult::NO_ENOUGH_SPACE)
{
/// The query currently does not have enough space to reserve.
/// It returns false and reads data directly from the remote end.
return false;
}
else if (res == ReserveResult::NO_NEED)
{
/// When the maximum cache capacity of the request is not reached, the cache
/// block is evicted from the main LRU queue.
if (tryReserveForMainList(key, offset, size, cache_lock))
{
updateQueryContext(key, offset, size, cache_lock);
return true;
}
else
return false;
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
}
}
bool LRUFileCache::tryReserveForMainList(
const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto removed_size = 0;
@ -598,6 +635,163 @@ bool LRUFileCache::tryReserve(
return true;
}
LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() || CurrentThread::getQueryId().size == 0)
return ReserveResult::NO_NEED;
auto query_id = CurrentThread::getQueryId().toString();
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
auto query_context = query_iter->second;
/// The maximum cache capacity of the request is not reached, thus the
//// cache block is evicted from the main LRU queue by tryReserveForMainList().
if (query_context->cache_size + size < query_context->max_cache_size)
{
return ReserveResult::NO_NEED;
}
/// The maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
else
{
size_t removed_size = 0;
size_t queue_size = queue.getElementsNum(cache_lock);
auto * cell_for_reserve = getCell(key, offset, cache_lock);
std::vector<FileSegmentCell *> trash;
std::vector<FileSegmentCell *> to_evict;
std::vector<LRUQueue::Iterator> ghost;
auto is_overflow = [&]
{
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size)
|| (query_context->max_cache_size != 0 && query_context->cache_size + size - removed_size > query_context->max_cache_size);
};
/// Select the cache from the LRU queue held by query for expulsion.
for (auto iter = query_context->queue.begin(); iter != query_context->queue.end(); iter++)
{
if (!is_overflow())
break;
auto cell = getCell(iter->key, iter->offset, cache_lock);
if (!cell)
{
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
removed_size += iter->size;
ghost.push_back(iter);
}
else
{
size_t cell_size = cell->size();
assert(iter->size == cell_size);
if (cell->releasable())
{
auto & file_segment = cell->file_segment;
std::lock_guard segment_lock(file_segment->mutex);
switch (file_segment->download_state)
{
case FileSegment::State::DOWNLOADED:
{
to_evict.push_back(cell);
break;
}
default:
{
trash.push_back(cell);
break;
}
}
removed_size += cell_size;
--queue_size;
}
}
}
for (auto & iter : ghost)
{
query_context->remove(iter->key, iter->offset, cache_lock);
}
assert(trash.empty());
for (auto & cell : trash)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
}
}
if (is_overflow())
{
return ReserveResult::NO_ENOUGH_SPACE;
}
if (cell_for_reserve)
{
auto queue_iterator = cell_for_reserve->queue_iterator;
if (queue_iterator)
queue.incrementSize(*queue_iterator, size, cache_lock);
else
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
}
for (auto & cell : to_evict)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
query_context->remove(file_segment->key(), file_segment->offset(), cache_lock);
}
}
return ReserveResult::FINISHED;
}
}
void LRUFileCache::updateQueryContext(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() || CurrentThread::getQueryId().size == 0)
return;
auto query_id = CurrentThread::getQueryId().toString();
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
auto query_context = query_iter->second;
auto record_iter = query_context->records.find({key, offset});
if (size)
{
if (record_iter == query_context->records.end())
{
auto queue_iter = query_context->queue.add(key, offset, 0, cache_lock);
record_iter = query_context->records.insert({{key, offset}, queue_iter}).first;
}
record_iter->second->size += size;
query_context->cache_size += size;
}
/// Triggers the LRU to move the cache to the end of the queue.
else if (record_iter != query_context->records.end())
query_context->queue.moveToEnd(record_iter->second, cache_lock);
}
void LRUFileCache::remove(const Key & key)
{
assertInitialized();
@ -1067,4 +1261,38 @@ void LRUFileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lo
queue.assertCorrectness(this, cache_lock);
}
void LRUFileCache::createOrSetQueryContext(const ReadSettings & settings)
{
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() || CurrentThread::getQueryId().size == 0)
return;
auto query_id = CurrentThread::getQueryId().toString();
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
query_iter = query_map.insert({query_id, std::make_shared<QueryContext>(settings.max_query_cache_size)}).first;
auto query_context = query_iter->second;
query_context->ref_count++;
}
void LRUFileCache::tryReleaseQueryContext()
{
if (!CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() || CurrentThread::getQueryId().size == 0)
return;
auto query_id = CurrentThread::getQueryId().toString();
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
return;
auto query_context = query_iter->second;
query_context->ref_count--;
/// The query has been completed, removing the relevant context.
if (!query_context->ref_count)
query_map.erase(query_iter);
}
}

View File

@ -12,6 +12,7 @@
#include <map>
#include "FileCache_fwd.h"
#include <IO/ReadSettings.h>
#include <Common/logger_useful.h>
#include <Common/FileSegment.h>
#include <Core/Types.h>
@ -96,6 +97,10 @@ public:
virtual size_t getFileSegmentsNum() const = 0;
virtual void createOrSetQueryContext(const ReadSettings & settings) = 0;
virtual void tryReleaseQueryContext() = 0;
protected:
String cache_base_path;
size_t max_size;
@ -157,6 +162,10 @@ public:
size_t getFileSegmentsNum() const override;
void createOrSetQueryContext(const ReadSettings & settings) override;
void tryReleaseQueryContext() override;
private:
class LRUQueue
{
@ -247,6 +256,38 @@ private:
size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
struct QueryContext
{
LRUQueue queue;
AccessRecord records;
size_t cache_size = 0;
size_t max_cache_size;
size_t ref_count = 0;
QueryContext(size_t max_cache_size_) : max_cache_size(max_cache_size_) { }
void remove(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
auto record = records.find({key, offset});
cache_size -= record->second->size;
queue.remove(record->second, cache_lock);
records.erase({key, offset});
}
};
using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
enum class ReserveResult
{
FINISHED,
NO_ENOUGH_SPACE,
NO_NEED,
};
QueryContextMap query_map;
Poco::Logger * log;
FileSegments getImpl(
@ -266,6 +307,17 @@ private:
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) override;
bool tryReserveForMainList(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock);
/// Limit the maximum cache size for current query.
LRUFileCache::ReserveResult tryReserveForQuery(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock);
void updateQueryContext(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,

View File

@ -0,0 +1,89 @@
#include "MultilevelFileCacheWrapper.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MultiplelevelFileCacheWarpper::MultiplelevelFileCacheWarpper(
const String & cache_base_path_, const FileCacheSettings & cache_settings_, FileCachePtr cache_)
: IFileCache(cache_base_path_, cache_settings_), cache(cache_)
{
}
void MultiplelevelFileCacheWarpper::initialize()
{
cache->initialize();
}
void MultiplelevelFileCacheWarpper::remove(const Key & key)
{
cache->remove(key);
}
std::vector<String> MultiplelevelFileCacheWarpper::tryGetCachePaths(const Key & key)
{
return cache->tryGetCachePaths(key);
}
FileSegmentsHolder MultiplelevelFileCacheWarpper::getOrSet(const Key & key, size_t offset, size_t size)
{
return cache->getOrSet(key, offset, size);
}
FileSegmentsHolder MultiplelevelFileCacheWarpper::get(const Key & key, size_t offset, size_t size)
{
return cache->get(key, offset, size);
}
FileSegmentsHolder MultiplelevelFileCacheWarpper::setDownloading(const Key & key, size_t offset, size_t size)
{
return cache->setDownloading(key, offset, size);
}
FileSegments MultiplelevelFileCacheWarpper::getSnapshot() const
{
return cache->getSnapshot();
}
String MultiplelevelFileCacheWarpper::dumpStructure(const Key & key)
{
return cache->dumpStructure(key);
}
size_t MultiplelevelFileCacheWarpper::getUsedCacheSize() const
{
return cache->getUsedCacheSize();
}
size_t MultiplelevelFileCacheWarpper::getFileSegmentsNum() const
{
return cache->getFileSegmentsNum();
}
bool MultiplelevelFileCacheWarpper::tryReserve(const Key &, size_t, size_t, std::lock_guard<std::mutex> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "tryReserve not implemented");
}
void MultiplelevelFileCacheWarpper::remove(Key, size_t, std::lock_guard<std::mutex> &, std::lock_guard<std::mutex> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "remove");
}
bool MultiplelevelFileCacheWarpper::isLastFileSegmentHolder(
const Key &, size_t, std::lock_guard<std::mutex> &, std::lock_guard<std::mutex> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "isLastFileSegmentHolder");
}
void MultiplelevelFileCacheWarpper::reduceSizeToDownloaded(
const Key &, size_t, std::lock_guard<std::mutex> &, std::lock_guard<std::mutex> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "reduceSizeToDownloaded");
}
};

View File

@ -0,0 +1,48 @@
#pragma once
#include "FileCache.h"
namespace DB
{
class MultiplelevelFileCacheWarpper : public IFileCache
{
public:
MultiplelevelFileCacheWarpper(const String & cache_base_path_, const FileCacheSettings & cache_settings_, FileCachePtr cache_);
void initialize() override;
void remove(const Key & key) override;
std::vector<String> tryGetCachePaths(const Key & key) override;
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
FileSegments getSnapshot() const override;
String dumpStructure(const Key & key) override;
size_t getUsedCacheSize() const override;
size_t getFileSegmentsNum() const override;
private:
bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) override;
void remove(Key key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock) override;
bool isLastFileSegmentHolder(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock) override;
void reduceSizeToDownloaded(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock) override;
private:
FileCachePtr cache;
};
};

View File

@ -574,6 +574,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
M(UInt64, max_query_cache_size, 1073741824, "Max remote filesystem cache size that can be used by a single query", 0) \
\
M(Bool, use_structure_from_insertion_table_in_table_functions, false, "Use structure from insertion table instead of schema inference from data", 0) \
\

View File

@ -56,6 +56,7 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
{
cache->createOrSetQueryContext(settings);
}
void CachedReadBufferFromRemoteFS::appendFilesystemCacheLog(
@ -485,6 +486,7 @@ CachedReadBufferFromRemoteFS::~CachedReadBufferFromRemoteFS()
{
appendFilesystemCacheLog((*current_file_segment_it)->range(), read_type);
}
cache->tryReleaseQueryContext();
}
void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)

View File

@ -81,6 +81,7 @@ struct ReadSettings
size_t filesystem_cache_max_wait_sec = 1;
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
size_t max_query_cache_size = 1073741824;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -3418,6 +3418,8 @@ ReadSettings Context::getReadSettings() const
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.max_query_cache_size = settings.max_query_cache_size;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;