mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #37516 from KinderRiven/improve_local_cache
Control cache downloads to avoid negative optimization of local caches
This commit is contained in:
commit
f5d69506b4
@ -72,6 +72,8 @@ void IFileCache::assertInitialized() const
|
||||
|
||||
LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
|
||||
: IFileCache(cache_base_path_, cache_settings_)
|
||||
, max_stash_element_size(cache_settings_.max_elements)
|
||||
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
|
||||
, log(&Poco::Logger::get("LRUFileCache"))
|
||||
{
|
||||
}
|
||||
@ -404,9 +406,42 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
|
||||
keyToStr(key), offset, size, dumpStructureUnlocked(key, cache_lock));
|
||||
|
||||
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state);
|
||||
FileSegmentCell cell(std::move(file_segment), this, cache_lock);
|
||||
auto skip_or_download = [&]() -> FileSegmentPtr
|
||||
{
|
||||
if (state == FileSegment::State::EMPTY && enable_cache_hits_threshold)
|
||||
{
|
||||
auto record = records.find({key, offset});
|
||||
|
||||
if (record == records.end())
|
||||
{
|
||||
auto queue_iter = stash_queue.add(key, offset, 0, cache_lock);
|
||||
records.insert({{key, offset}, queue_iter});
|
||||
|
||||
if (stash_queue.getElementsNum(cache_lock) > max_stash_element_size)
|
||||
{
|
||||
auto remove_queue_iter = stash_queue.begin();
|
||||
records.erase({remove_queue_iter->key, remove_queue_iter->offset});
|
||||
stash_queue.remove(remove_queue_iter, cache_lock);
|
||||
}
|
||||
|
||||
/// For segments that do not reach the download threshold, we do not download them, but directly read them
|
||||
return std::make_shared<FileSegment>(offset, size, key, this, FileSegment::State::SKIP_CACHE);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto queue_iter = record->second;
|
||||
queue_iter->hits++;
|
||||
stash_queue.moveToEnd(queue_iter, cache_lock);
|
||||
|
||||
state = queue_iter->hits >= enable_cache_hits_threshold ? FileSegment::State::EMPTY : FileSegment::State::SKIP_CACHE;
|
||||
return std::make_shared<FileSegment>(offset, size, key, this, state);
|
||||
}
|
||||
}
|
||||
else
|
||||
return std::make_shared<FileSegment>(offset, size, key, this, state);
|
||||
};
|
||||
|
||||
FileSegmentCell cell(skip_or_download(), this, cache_lock);
|
||||
auto & offsets = files[key];
|
||||
|
||||
if (offsets.empty())
|
||||
@ -471,7 +506,7 @@ bool LRUFileCache::tryReserve(
|
||||
std::vector<FileSegmentCell *> to_evict;
|
||||
std::vector<FileSegmentCell *> trash;
|
||||
|
||||
for (const auto & [entry_key, entry_offset, entry_size] : queue)
|
||||
for (const auto & [entry_key, entry_offset, entry_size, _] : queue)
|
||||
{
|
||||
if (!is_overflow())
|
||||
break;
|
||||
@ -619,7 +654,7 @@ void LRUFileCache::remove()
|
||||
std::vector<FileSegment *> to_remove;
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
{
|
||||
const auto & [key, offset, size] = *it++;
|
||||
const auto & [key, offset, size, _] = *it++;
|
||||
auto * cell = getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
throw Exception(
|
||||
@ -637,6 +672,10 @@ void LRUFileCache::remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all access information.
|
||||
records.clear();
|
||||
stash_queue.removeAll(cache_lock);
|
||||
}
|
||||
|
||||
void LRUFileCache::remove(
|
||||
@ -882,6 +921,7 @@ LRUFileCache::FileSegmentCell::FileSegmentCell(
|
||||
queue_iterator = cache->queue.add(file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock);
|
||||
break;
|
||||
}
|
||||
case FileSegment::State::SKIP_CACHE:
|
||||
case FileSegment::State::EMPTY:
|
||||
case FileSegment::State::DOWNLOADING:
|
||||
{
|
||||
@ -898,7 +938,7 @@ LRUFileCache::LRUQueue::Iterator LRUFileCache::LRUQueue::add(
|
||||
const IFileCache::Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
for (const auto & [entry_key, entry_offset, _] : queue)
|
||||
for (const auto & [entry_key, entry_offset, entry_size, entry_hits] : queue)
|
||||
{
|
||||
if (entry_key == key && entry_offset == offset)
|
||||
throw Exception(
|
||||
@ -918,6 +958,12 @@ void LRUFileCache::LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mute
|
||||
queue.erase(queue_it);
|
||||
}
|
||||
|
||||
void LRUFileCache::LRUQueue::removeAll(std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
queue.clear();
|
||||
cache_size = 0;
|
||||
}
|
||||
|
||||
void LRUFileCache::LRUQueue::moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
|
||||
{
|
||||
queue.splice(queue.end(), queue, queue_it);
|
||||
@ -934,7 +980,7 @@ bool LRUFileCache::LRUQueue::contains(
|
||||
{
|
||||
/// This method is used for assertions in debug mode.
|
||||
/// So we do not care about complexity here.
|
||||
for (const auto & [entry_key, entry_offset, size] : queue)
|
||||
for (const auto & [entry_key, entry_offset, size, _] : queue)
|
||||
{
|
||||
if (key == entry_key && offset == entry_offset)
|
||||
return true;
|
||||
@ -947,7 +993,7 @@ void LRUFileCache::LRUQueue::assertCorrectness(LRUFileCache * cache, std::lock_g
|
||||
[[maybe_unused]] size_t total_size = 0;
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
{
|
||||
auto & [key, offset, size] = *it++;
|
||||
auto & [key, offset, size, _] = *it++;
|
||||
|
||||
auto * cell = cache->getCell(key, offset, cache_lock);
|
||||
if (!cell)
|
||||
@ -969,7 +1015,7 @@ void LRUFileCache::LRUQueue::assertCorrectness(LRUFileCache * cache, std::lock_g
|
||||
String LRUFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_lock */) const
|
||||
{
|
||||
String result;
|
||||
for (const auto & [key, offset, size] : queue)
|
||||
for (const auto & [key, offset, size, _] : queue)
|
||||
{
|
||||
if (!result.empty())
|
||||
result += ", ";
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <map>
|
||||
|
||||
@ -165,6 +166,7 @@ private:
|
||||
Key key;
|
||||
size_t offset;
|
||||
size_t size;
|
||||
size_t hits = 0;
|
||||
|
||||
FileKeyAndOffset(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) {}
|
||||
};
|
||||
@ -194,6 +196,8 @@ private:
|
||||
|
||||
Iterator end() { return queue.end(); }
|
||||
|
||||
void removeAll(std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
private:
|
||||
std::list<FileKeyAndOffset> queue;
|
||||
size_t cache_size = 0;
|
||||
@ -223,8 +227,26 @@ private:
|
||||
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
|
||||
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
|
||||
|
||||
using AccessKeyAndOffset = std::pair<Key, size_t>;
|
||||
|
||||
struct KeyAndOffsetHash
|
||||
{
|
||||
std::size_t operator()(const AccessKeyAndOffset & key) const
|
||||
{
|
||||
return std::hash<UInt128>()(key.first) ^ std::hash<UInt64>()(key.second);
|
||||
}
|
||||
};
|
||||
|
||||
using AccessRecord = std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
|
||||
|
||||
CachedFiles files;
|
||||
LRUQueue queue;
|
||||
|
||||
LRUQueue stash_queue;
|
||||
AccessRecord records;
|
||||
size_t max_stash_element_size;
|
||||
size_t enable_cache_hits_threshold;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
FileSegments getImpl(
|
||||
|
@ -11,6 +11,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
max_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
|
||||
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
|
||||
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
|
||||
enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,6 +14,8 @@ struct FileCacheSettings
|
||||
size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
|
||||
bool cache_on_write_operations = false;
|
||||
|
||||
size_t enable_cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD;
|
||||
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
|
||||
};
|
||||
|
||||
|
@ -7,6 +7,7 @@ namespace DB
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE = 1024 * 1024 * 1024;
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024;
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024;
|
||||
static constexpr int REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD = 0;
|
||||
|
||||
class IFileCache;
|
||||
using FileCachePtr = std::shared_ptr<IFileCache>;
|
||||
|
@ -59,6 +59,10 @@ FileSegment::FileSegment(
|
||||
downloader_id = getCallerId();
|
||||
break;
|
||||
}
|
||||
case (State::SKIP_CACHE):
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state");
|
||||
@ -525,6 +529,14 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
|
||||
|
||||
void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
|
||||
{
|
||||
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
|
||||
|
||||
if (is_last_holder && download_state == State::SKIP_CACHE)
|
||||
{
|
||||
cache->remove(key(), offset(), cache_lock, segment_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
if (download_state == State::SKIP_CACHE || is_detached)
|
||||
return;
|
||||
|
||||
@ -542,8 +554,7 @@ void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std
|
||||
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
|
||||
/// downloader or the only owner of the segment.
|
||||
|
||||
bool can_update_segment_state = isDownloaderImpl(segment_lock)
|
||||
|| cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
|
||||
bool can_update_segment_state = isDownloaderImpl(segment_lock) || is_last_holder;
|
||||
|
||||
if (can_update_segment_state)
|
||||
download_state = State::PARTIALLY_DOWNLOADED;
|
||||
|
@ -491,7 +491,10 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
|
||||
|
||||
/// Do not hold pointer to file segment if it is not needed anymore
|
||||
/// so can become releasable and can be evicted from cache.
|
||||
file_segments_holder->file_segments.erase(file_segment_it);
|
||||
/// If the status of filesegment state is SKIP_CACHE, it will not be deleted.
|
||||
/// It will be deleted from the cache when the holder is destructed.
|
||||
if ((*file_segment_it)->state() != FileSegment::State::SKIP_CACHE)
|
||||
file_segments_holder->file_segments.erase(file_segment_it);
|
||||
|
||||
if (current_file_segment_it == file_segments_holder->file_segments.end())
|
||||
return false;
|
||||
|
@ -22,6 +22,17 @@
|
||||
<data_cache_max_size>22548578304</data_cache_max_size>
|
||||
<cache_on_write_operations>0</cache_on_write_operations>
|
||||
</s3_cache_2>
|
||||
<s3_cache_3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
|
||||
<access_key_id>clickhouse</access_key_id>
|
||||
<secret_access_key>clickhouse</secret_access_key>
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<cache_enabled>0</cache_enabled>
|
||||
<data_cache_max_size>22548578304</data_cache_max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
<enable_cache_hits_threshold>1</enable_cache_hits_threshold>
|
||||
</s3_cache_3>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3_cache>
|
||||
@ -38,6 +49,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_cache_2>
|
||||
<s3_cache_3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3_cache_3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_cache_3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
|
@ -17,3 +17,19 @@ SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesy
|
||||
0 745 746
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
DROP TABLE IF EXISTS test;
|
||||
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_3', min_bytes_for_wide_part = 10485760;
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
|
||||
0 0 1
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
|
||||
0 0 1
|
||||
0 745 746
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
|
@ -16,3 +16,18 @@ SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
|
||||
DROP TABLE IF EXISTS test;
|
||||
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_3', min_bytes_for_wide_part = 10485760;
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
|
||||
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache;
|
Loading…
Reference in New Issue
Block a user