Merge pull request #37859 from KinderRiven/support_max_request_cache_size

Support to set max cache size for per query in local cache
This commit is contained in:
Kseniia Sumarokova 2022-06-10 16:17:18 +02:00 committed by GitHub
commit 580a30c6ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 544 additions and 106 deletions

View File

@ -30,6 +30,11 @@ namespace
}
}
static bool isQueryInitialized()
{
return CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() && CurrentThread::getQueryId().size != 0;
}
IFileCache::IFileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_)
@ -37,6 +42,7 @@ IFileCache::IFileCache(
, max_size(cache_settings_.max_size)
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size)
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
{
}
@ -59,9 +65,7 @@ String IFileCache::getPathInLocalCache(const Key & key)
bool IFileCache::isReadOnly()
{
return !CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext()
|| CurrentThread::getQueryId().size == 0;
return (!isQueryInitialized());
}
void IFileCache::assertInitialized() const
@ -70,6 +74,73 @@ void IFileCache::assertInitialized() const
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
}
IFileCache::QueryContextPtr IFileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
{
if (!isQueryInitialized())
return nullptr;
return getQueryContext(CurrentThread::getQueryId().toString(), cache_lock);
}
IFileCache::QueryContextPtr IFileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> &)
{
auto query_iter = query_map.find(query_id);
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
}
void IFileCache::removeQueryContext(const String & query_id)
{
std::lock_guard cache_lock(mutex);
auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to release query context that does not exist");
query_map.erase(query_iter);
}
IFileCache::QueryContextPtr IFileCache::getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
{
if (query_id.empty())
return nullptr;
auto context = getQueryContext(query_id, cache_lock);
if (!context)
{
auto query_iter = query_map.insert({query_id, std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache)}).first;
context = query_iter->second;
}
return context;
}
IFileCache::QueryContextHolder IFileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
{
std::lock_guard cache_lock(mutex);
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
/// we create context query for current query.
if (enable_filesystem_query_cache_limit && settings.max_query_cache_size)
{
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
return QueryContextHolder(query_id, this, context);
}
else
return QueryContextHolder();
}
IFileCache::QueryContextHolder::QueryContextHolder(const String & query_id_, IFileCache * cache_, IFileCache::QueryContextPtr context_)
: query_id(query_id_), cache(cache_), context(context_)
{
}
IFileCache::QueryContextHolder::~QueryContextHolder()
{
/// If only the query_map and the current holder hold the context_query,
/// the query has been completed and the query_context is released.
if (context && context.use_count() == 2)
cache->removeQueryContext(query_id);
}
LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
: IFileCache(cache_base_path_, cache_settings_)
, max_stash_element_size(cache_settings_.max_elements)
@ -480,8 +551,170 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
return FileSegmentsHolder(std::move(file_segments));
}
bool LRUFileCache::tryReserve(
const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto query_context = enable_filesystem_query_cache_limit ? getCurrentQueryContext(cache_lock) : nullptr;
/// If the context can be found, subsequent cache replacements are made through the Query context.
if (query_context)
{
auto res = tryReserveForQuery(key, offset, size, query_context, cache_lock);
switch (res)
{
case ReserveResult::FITS_IN_QUERY_LIMIT_AND_RESERVATION_COMPLETED :
{
/// When the maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
return true;
}
case ReserveResult::EXCEEDS_QUERY_LIMIT :
{
/// The query currently does not have enough space to reserve.
/// It returns false and reads data directly from the remote fs.
return false;
}
case ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST :
{
/// When the maximum cache capacity of the request is not reached, the cache
/// block is evicted from the main LRU queue.
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
}
}
__builtin_unreachable();
}
else
{
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
}
}
LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
{
/// The maximum cache capacity of the request is not reached, thus the
//// cache block is evicted from the main LRU queue by tryReserveForMainList().
if (query_context->getCacheSize() + size <= query_context->getMaxCacheSize())
{
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
}
/// When skip_download_if_exceeds_query_cache is true, there is no need
/// to evict old data, skip the cache and read directly from remote fs.
else if (query_context->isSkipDownloadIfExceed())
{
return ReserveResult::EXCEEDS_QUERY_LIMIT;
}
/// The maximum cache size of the query is reached, the cache will be
/// evicted from the history cache accessed by the current query.
else
{
size_t removed_size = 0;
size_t queue_size = queue.getElementsNum(cache_lock);
auto * cell_for_reserve = getCell(key, offset, cache_lock);
std::vector<IFileCache::LRUQueue::Iterator> ghost;
std::vector<FileSegmentCell *> trash;
std::vector<FileSegmentCell *> to_evict;
auto is_overflow = [&]
{
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size)
|| (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize());
};
/// Select the cache from the LRU queue held by query for expulsion.
for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++)
{
if (!is_overflow())
break;
auto * cell = getCell(iter->key, iter->offset, cache_lock);
if (!cell)
{
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
ghost.push_back(iter);
removed_size += iter->size;
}
else
{
size_t cell_size = cell->size();
assert(iter->size == cell_size);
if (cell->releasable())
{
auto & file_segment = cell->file_segment;
std::lock_guard segment_lock(file_segment->mutex);
switch (file_segment->download_state)
{
case FileSegment::State::DOWNLOADED:
{
to_evict.push_back(cell);
break;
}
default:
{
trash.push_back(cell);
break;
}
}
removed_size += cell_size;
--queue_size;
}
}
}
assert(trash.empty());
for (auto & cell : trash)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
for (auto & iter : ghost)
query_context->remove(iter->key, iter->offset, iter->size, cache_lock);
if (is_overflow())
{
return ReserveResult::EXCEEDS_QUERY_LIMIT;
}
if (cell_for_reserve)
{
auto queue_iterator = cell_for_reserve->queue_iterator;
if (queue_iterator)
queue.incrementSize(*queue_iterator, size, cache_lock);
else
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
}
for (auto & cell : to_evict)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
query_context->reserve(key, offset, size, cache_lock);
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
}
}
bool LRUFileCache::tryReserveForMainList(
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
{
auto removed_size = 0;
size_t queue_size = queue.getElementsNum(cache_lock);
@ -595,6 +828,9 @@ bool LRUFileCache::tryReserve(
if (queue.getTotalWeight(cache_lock) > (1ull << 63))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
if (query_context)
query_context->reserve(key, offset, size, cache_lock);
return true;
}
@ -852,7 +1088,6 @@ FileSegments LRUFileCache::getSnapshot() const
for (const auto & [offset, cell] : cells_by_offset)
file_segments.push_back(FileSegment::getSnapshot(cell.file_segment, cache_lock));
}
return file_segments;
}
@ -938,7 +1173,7 @@ LRUFileCache::FileSegmentCell::FileSegmentCell(
}
}
LRUFileCache::LRUQueue::Iterator LRUFileCache::LRUQueue::add(
IFileCache::LRUQueue::Iterator IFileCache::LRUQueue::add(
const IFileCache::Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & /* cache_lock */)
{
#ifndef NDEBUG
@ -956,30 +1191,30 @@ LRUFileCache::LRUQueue::Iterator LRUFileCache::LRUQueue::add(
return queue.insert(queue.end(), FileKeyAndOffset(key, offset, size));
}
void LRUFileCache::LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
void IFileCache::LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
{
cache_size -= queue_it->size;
queue.erase(queue_it);
}
void LRUFileCache::LRUQueue::removeAll(std::lock_guard<std::mutex> & /* cache_lock */)
void IFileCache::LRUQueue::removeAll(std::lock_guard<std::mutex> & /* cache_lock */)
{
queue.clear();
cache_size = 0;
}
void LRUFileCache::LRUQueue::moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
void IFileCache::LRUQueue::moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
{
queue.splice(queue.end(), queue, queue_it);
}
void LRUFileCache::LRUQueue::incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & /* cache_lock */)
void IFileCache::LRUQueue::incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & /* cache_lock */)
{
cache_size += size_increment;
queue_it->size += size_increment;
}
bool LRUFileCache::LRUQueue::contains(
bool IFileCache::LRUQueue::contains(
const IFileCache::Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */) const
{
/// This method is used for assertions in debug mode.
@ -992,31 +1227,7 @@ bool LRUFileCache::LRUQueue::contains(
return false;
}
void LRUFileCache::LRUQueue::assertCorrectness(LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock)
{
[[maybe_unused]] size_t total_size = 0;
for (auto it = queue.begin(); it != queue.end();)
{
auto & [key, offset, size, _] = *it++;
auto * cell = cache->getCell(key, offset, cache_lock);
if (!cell)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache is in inconsistent state: LRU queue contains entries with no cache cell (assertCorrectness())");
}
assert(cell->size() == size);
total_size += size;
}
assert(total_size == cache_size);
assert(cache_size <= cache->max_size);
assert(queue.size() <= cache->max_element_size);
}
String LRUFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_lock */) const
String IFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_lock */) const
{
String result;
for (const auto & [key, offset, size, _] : queue)
@ -1065,14 +1276,38 @@ void LRUFileCache::assertCacheCellsCorrectness(
void LRUFileCache::assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock)
{
assertCacheCellsCorrectness(files[key], cache_lock);
queue.assertCorrectness(this, cache_lock);
assertQueueCorrectness(cache_lock);
}
void LRUFileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock)
{
for (const auto & [key, cells_by_offset] : files)
assertCacheCellsCorrectness(files[key], cache_lock);
queue.assertCorrectness(this, cache_lock);
assertQueueCorrectness(cache_lock);
}
void LRUFileCache::assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lock)
{
[[maybe_unused]] size_t total_size = 0;
for (auto it = queue.begin(); it != queue.end();)
{
auto & [key, offset, size, _] = *it++;
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache is in inconsistent state: LRU queue contains entries with no cache cell (assertCorrectness())");
}
assert(cell->size() == size);
total_size += size;
}
assert(total_size == queue.getTotalWeight(cache_lock));
assert(queue.getTotalWeight(cache_lock) <= max_size);
assert(queue.getElementsNum(cache_lock) <= max_element_size);
}
}

View File

@ -12,6 +12,7 @@
#include <map>
#include "FileCache_fwd.h"
#include <IO/ReadSettings.h>
#include <Common/logger_useful.h>
#include <Common/FileSegment.h>
#include <Core/Types.h>
@ -20,6 +21,14 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class IFileCache;
using FileCachePtr = std::shared_ptr<IFileCache>;
/**
* Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
*/
@ -106,58 +115,6 @@ protected:
mutable std::mutex mutex;
virtual bool tryReserve(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) = 0;
virtual void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual bool isLastFileSegmentHolder(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
/// If file segment was partially downloaded and then space reservation fails (because of no
/// space left), then update corresponding cache cell metadata (file segment size).
virtual void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
void assertInitialized() const;
};
using FileCachePtr = std::shared_ptr<IFileCache>;
class LRUFileCache final : public IFileCache
{
public:
LRUFileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_);
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
FileSegments getSnapshot() const override;
void initialize() override;
void remove(const Key & key) override;
void remove() override;
std::vector<String> tryGetCachePaths(const Key & key) override;
size_t getUsedCacheSize() const override;
size_t getFileSegmentsNum() const override;
private:
class LRUQueue
{
public:
@ -186,8 +143,6 @@ private:
/// Space reservation for a file segment is incremental, so we need to be able to increment size of the queue entry.
void incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & cache_lock);
void assertCorrectness(LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
String toString(std::lock_guard<std::mutex> & cache_lock) const;
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) const;
@ -203,6 +158,171 @@ private:
size_t cache_size = 0;
};
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first) ^ std::hash<UInt64>()(key.second);
}
};
using AccessRecord = std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
/// Used to track and control the cache access of each query.
/// Through it, we can realize the processing of different queries by the cache layer.
struct QueryContext
{
LRUQueue lru_queue;
AccessRecord records;
size_t cache_size = 0;
size_t max_cache_size;
bool skip_download_if_exceeds_query_cache;
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_)
, skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) {}
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size < size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
lru_queue.remove(record->second, cache_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (cache_size + size > max_cache_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reserved cache size exceeds the remaining cache size");
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = lru_queue.add(key, offset, 0, cache_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->size += size;
}
cache_size += size;
}
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
lru_queue.moveToEnd(record->second, cache_lock);
}
}
size_t getMaxCacheSize() { return max_cache_size; }
size_t getCacheSize() { return cache_size; }
LRUQueue & queue() { return lru_queue; }
bool isSkipDownloadIfExceed() { return skip_download_if_exceeds_query_cache; }
};
using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
QueryContextMap query_map;
bool enable_filesystem_query_cache_limit;
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock);
void removeQueryContext(const String & query_id);
QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> &);
virtual bool tryReserve(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) = 0;
virtual void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual bool isLastFileSegmentHolder(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
/// If file segment was partially downloaded and then space reservation fails (because of no
/// space left), then update corresponding cache cell metadata (file segment size).
virtual void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
void assertInitialized() const;
public:
/// Save a query context information, and adopt different cache policies
/// for different queries through the context cache layer.
struct QueryContextHolder : private boost::noncopyable
{
explicit QueryContextHolder(const String & query_id_, IFileCache * cache_, QueryContextPtr context_);
QueryContextHolder() = default;
~QueryContextHolder();
String query_id {};
IFileCache * cache = nullptr;
QueryContextPtr context = nullptr;
};
QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings);
};
class LRUFileCache final : public IFileCache
{
public:
LRUFileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_);
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
FileSegments getSnapshot() const override;
void initialize() override;
void remove(const Key & key) override;
void remove() override;
std::vector<String> tryGetCachePaths(const Key & key) override;
size_t getUsedCacheSize() const override;
size_t getFileSegmentsNum() const override;
private:
struct FileSegmentCell : private boost::noncopyable
{
FileSegmentPtr file_segment;
@ -227,26 +347,22 @@ private:
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first) ^ std::hash<UInt64>()(key.second);
}
};
using AccessRecord = std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
CachedFiles files;
LRUQueue queue;
LRUQueue stash_queue;
AccessRecord records;
size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
enum class ReserveResult
{
FITS_IN_QUERY_LIMIT_AND_RESERVATION_COMPLETED,
EXCEEDS_QUERY_LIMIT,
FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST,
};
Poco::Logger * log;
FileSegments getImpl(
@ -266,6 +382,17 @@ private:
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) override;
bool tryReserveForMainList(
const Key & key, size_t offset, size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
/// Limit the maximum cache size for current query.
LRUFileCache::ReserveResult tryReserveForQuery(
const Key & key, size_t offset, size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
@ -309,6 +436,8 @@ public:
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);
void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);
void assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lock);
};
}

View File

@ -11,6 +11,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
max_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD);
}

View File

@ -13,6 +13,7 @@ struct FileCacheSettings
size_t max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS;
size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
bool cache_on_write_operations = false;
bool enable_filesystem_query_cache_limit = false;
size_t enable_cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD;

View File

@ -200,6 +200,7 @@ private:
const Range segment_range;
State download_state;
String downloader_id;
RemoteFileReaderPtr remote_file_reader;

View File

@ -98,9 +98,10 @@ TEST(LRUFileCache, get)
DB::ThreadStatus thread_status;
/// To work with cache need query_id and query context.
std::string query_id = "query_id";
auto query_context = DB::Context::createCopy(getContext().context);
query_context->makeQueryContext();
query_context->setCurrentQueryId("query_id");
query_context->setCurrentQueryId(query_id);
DB::CurrentThread::QueryScope query_scope_holder(query_context);
DB::FileCacheSettings settings;
@ -513,4 +514,5 @@ TEST(LRUFileCache, get)
assertRange(49, segments1[1], DB::FileSegment::Range(10, 19), DB::FileSegment::State::EMPTY);
assertRange(50, segments1[2], DB::FileSegment::Range(20, 24), DB::FileSegment::State::EMPTY);
}
}

View File

@ -574,6 +574,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, max_query_cache_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be used by a single query", 0) \
\
M(Bool, use_structure_from_insertion_table_in_table_functions, false, "Use structure from insertion table instead of schema inference from data", 0) \
\

View File

@ -55,6 +55,7 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
{
}

View File

@ -123,6 +123,8 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::FilesystemCacheReadBuffers};
ProfileEvents::Counters current_file_segment_counters;
IFileCache::QueryContextHolder query_context_holder;
};
}

View File

@ -82,6 +82,9 @@ struct ReadSettings
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
size_t max_query_cache_size = (128UL * 1024 * 1024 * 1024);
bool skip_download_if_exceeds_query_cache = true;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
FileCachePtr remote_fs_cache;

View File

@ -3419,6 +3419,9 @@ ReadSettings Context::getReadSettings() const
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.max_query_cache_size = settings.max_query_cache_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
res.local_fs_buffer_size = settings.max_read_buffer_size;

View File

@ -33,6 +33,17 @@
<cache_on_write_operations>1</cache_on_write_operations>
<enable_cache_hits_threshold>1</enable_cache_hits_threshold>
</s3_cache_3>
<s3_cache_4>
<type>s3</type>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<data_cache_enabled>1</data_cache_enabled>
<cache_enabled>0</cache_enabled>
<data_cache_max_size>22548578304</data_cache_max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<enable_filesystem_query_cache_limit>1</enable_filesystem_query_cache_limit>
</s3_cache_4>
</disks>
<policies>
<s3_cache>
@ -56,6 +67,13 @@
</main>
</volumes>
</s3_cache_3>
<s3_cache_4>
<volumes>
<main>
<disk>s3_cache_4</disk>
</main>
</volumes>
</s3_cache_4>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,19 @@
-- { echo }
SYSTEM DROP FILESYSTEM CACHE;
SET enable_filesystem_cache_on_write_operations=0;
SET skip_download_if_exceeds_query_cache=1;
SET max_query_cache_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
0 0 1
0 79 80
SYSTEM DROP FILESYSTEM CACHE;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;

View File

@ -0,0 +1,21 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage
-- { echo }
SYSTEM DROP FILESYSTEM CACHE;
SET enable_filesystem_cache_on_write_operations=0;
SET skip_download_if_exceeds_query_cache=1;
SET max_query_cache_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
SYSTEM DROP FILESYSTEM CACHE;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;