mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #55158 from kssenii/fs-cache-improvement
fs cache improvement for big reads
This commit is contained in:
commit
e4f66b8469
@ -78,6 +78,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
|
||||
rm /etc/clickhouse-server/config.d/merge_tree.xml
|
||||
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
|
||||
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
|
||||
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
|
||||
|
||||
start
|
||||
stop
|
||||
@ -114,6 +115,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
|
||||
rm /etc/clickhouse-server/config.d/merge_tree.xml
|
||||
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
|
||||
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
|
||||
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
|
||||
|
||||
start
|
||||
|
||||
|
@ -729,6 +729,7 @@ class IColumn;
|
||||
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
|
||||
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
|
||||
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
|
||||
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
|
||||
\
|
||||
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
|
||||
M(Bool, enable_filesystem_read_prefetches_log, false, "Log to system.filesystem prefetch_log during query. Should be used only for testing or debugging, not recommended to be turned on by default", 0) \
|
||||
|
@ -114,30 +114,41 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
|
||||
cache_log->add(std::move(elem));
|
||||
}
|
||||
|
||||
void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
|
||||
bool CachedOnDiskReadBufferFromFile::nextFileSegmentsBatch()
|
||||
{
|
||||
chassert(!file_segments || file_segments->empty());
|
||||
size_t size = getRemainingSizeToRead();
|
||||
if (!size)
|
||||
return false;
|
||||
|
||||
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
{
|
||||
file_segments = cache->get(cache_key, file_offset_of_buffer_end, size, settings.filesystem_cache_segments_batch_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
CreateFileSegmentSettings create_settings(FileSegmentKind::Regular);
|
||||
file_segments = cache->getOrSet(cache_key, file_offset_of_buffer_end, size, file_size.value(), create_settings, settings.filesystem_cache_segments_batch_size);
|
||||
}
|
||||
return !file_segments->empty();
|
||||
}
|
||||
|
||||
void CachedOnDiskReadBufferFromFile::initialize()
|
||||
{
|
||||
if (initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Caching buffer already initialized");
|
||||
|
||||
implementation_buffer.reset();
|
||||
|
||||
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
{
|
||||
file_segments = cache->get(cache_key, offset, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
CreateFileSegmentSettings create_settings(FileSegmentKind::Regular);
|
||||
file_segments = cache->getOrSet(cache_key, offset, size, file_size.value(), create_settings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
|
||||
*/
|
||||
if (file_segments->empty())
|
||||
if (!nextFileSegmentsBatch())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of file segments cannot be empty");
|
||||
|
||||
chassert(!file_segments->empty());
|
||||
|
||||
LOG_TEST(
|
||||
log,
|
||||
"Having {} file segments to read: {}, current offset: {}",
|
||||
@ -512,7 +523,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
|
||||
cache_file_reader.reset();
|
||||
|
||||
file_segments->popFront();
|
||||
if (file_segments->empty())
|
||||
if (file_segments->empty() && !nextFileSegmentsBatch())
|
||||
return false;
|
||||
|
||||
current_file_segment = &file_segments->front();
|
||||
@ -788,9 +799,9 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
return false;
|
||||
|
||||
if (!initialized)
|
||||
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
|
||||
initialize();
|
||||
|
||||
if (file_segments->empty())
|
||||
if (file_segments->empty() && !nextFileSegmentsBatch())
|
||||
return false;
|
||||
|
||||
const size_t original_buffer_size = internal_buffer.size();
|
||||
@ -1159,7 +1170,7 @@ off_t CachedOnDiskReadBufferFromFile::seek(off_t offset, int whence)
|
||||
return new_pos;
|
||||
}
|
||||
|
||||
size_t CachedOnDiskReadBufferFromFile::getTotalSizeToRead()
|
||||
size_t CachedOnDiskReadBufferFromFile::getRemainingSizeToRead()
|
||||
{
|
||||
/// Last position should be guaranteed to be set, as at least we always know file size.
|
||||
if (!read_until_position)
|
||||
|
@ -63,7 +63,7 @@ public:
|
||||
private:
|
||||
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;
|
||||
|
||||
void initialize(size_t offset, size_t size);
|
||||
void initialize();
|
||||
|
||||
/**
|
||||
* Return a list of file segments ordered in ascending order. This list represents
|
||||
@ -85,7 +85,7 @@ private:
|
||||
|
||||
bool nextImplStep();
|
||||
|
||||
size_t getTotalSizeToRead();
|
||||
size_t getRemainingSizeToRead();
|
||||
|
||||
bool completeFileSegmentAndGetNext();
|
||||
|
||||
@ -95,6 +95,8 @@ private:
|
||||
|
||||
static bool canStartFromCache(size_t current_offset, const FileSegment & file_segment);
|
||||
|
||||
bool nextFileSegmentsBatch();
|
||||
|
||||
Poco::Logger * log;
|
||||
FileCache::Key cache_key;
|
||||
String source_file_path;
|
||||
|
@ -100,6 +100,7 @@ struct ReadSettings
|
||||
bool enable_filesystem_cache_log = false;
|
||||
/// Don't populate cache when the read is not part of query execution (e.g. background thread).
|
||||
bool avoid_readthrough_cache_outside_query_context = true;
|
||||
size_t filesystem_cache_segments_batch_size = 20;
|
||||
|
||||
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
|
||||
bool skip_download_if_exceeds_query_cache = true;
|
||||
|
@ -147,7 +147,7 @@ CacheGuard::Lock FileCache::lockCache() const
|
||||
return cache_guard.lock();
|
||||
}
|
||||
|
||||
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range) const
|
||||
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
|
||||
{
|
||||
/// Given range = [left, right] and non-overlapping ordered set of file segments,
|
||||
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
|
||||
@ -165,6 +165,9 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
FileSegments result;
|
||||
auto add_to_result = [&](const FileSegmentMetadata & file_segment_metadata)
|
||||
{
|
||||
if (file_segments_limit && result.size() == file_segments_limit)
|
||||
return false;
|
||||
|
||||
FileSegmentPtr file_segment;
|
||||
if (!file_segment_metadata.evicting())
|
||||
{
|
||||
@ -180,6 +183,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
}
|
||||
|
||||
result.push_back(file_segment);
|
||||
return true;
|
||||
};
|
||||
|
||||
const auto & file_segments = locked_key;
|
||||
@ -197,7 +201,8 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
if (file_segment_metadata.file_segment->range().right < range.left)
|
||||
return {};
|
||||
|
||||
add_to_result(file_segment_metadata);
|
||||
if (!add_to_result(file_segment_metadata))
|
||||
return result;
|
||||
}
|
||||
else /// segment_it <-- segmment{k}
|
||||
{
|
||||
@ -213,7 +218,8 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
/// [___________
|
||||
/// ^
|
||||
/// range.left
|
||||
add_to_result(prev_file_segment_metadata);
|
||||
if (!add_to_result(prev_file_segment_metadata))
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -229,7 +235,9 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
if (range.right < file_segment_metadata.file_segment->range().left)
|
||||
break;
|
||||
|
||||
add_to_result(file_segment_metadata);
|
||||
if (!add_to_result(file_segment_metadata))
|
||||
return result;
|
||||
|
||||
++segment_it;
|
||||
}
|
||||
}
|
||||
@ -237,11 +245,34 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size)
|
||||
{
|
||||
assert(size > 0);
|
||||
std::vector<FileSegment::Range> ranges;
|
||||
|
||||
size_t current_pos = offset;
|
||||
size_t end_pos_non_included = offset + size;
|
||||
size_t remaining_size = size;
|
||||
|
||||
FileSegments file_segments;
|
||||
while (current_pos < end_pos_non_included)
|
||||
{
|
||||
auto current_file_segment_size = std::min(remaining_size, max_file_segment_size);
|
||||
ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1);
|
||||
|
||||
remaining_size -= current_file_segment_size;
|
||||
current_pos += current_file_segment_size;
|
||||
}
|
||||
|
||||
return ranges;
|
||||
}
|
||||
|
||||
FileSegments FileCache::splitRangeIntoFileSegments(
|
||||
LockedKey & locked_key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
FileSegment::State state,
|
||||
size_t file_segments_limit,
|
||||
const CreateFileSegmentSettings & settings)
|
||||
{
|
||||
assert(size > 0);
|
||||
@ -253,7 +284,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
|
||||
size_t remaining_size = size;
|
||||
|
||||
FileSegments file_segments;
|
||||
while (current_pos < end_pos_non_included)
|
||||
while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit))
|
||||
{
|
||||
current_file_segment_size = std::min(remaining_size, max_file_segment_size);
|
||||
remaining_size -= current_file_segment_size;
|
||||
@ -265,7 +296,6 @@ FileSegments FileCache::splitRangeIntoFileSegments(
|
||||
current_pos += current_file_segment_size;
|
||||
}
|
||||
|
||||
assert(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
|
||||
return file_segments;
|
||||
}
|
||||
|
||||
@ -273,6 +303,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
LockedKey & locked_key,
|
||||
FileSegments & file_segments,
|
||||
const FileSegment::Range & range,
|
||||
size_t file_segments_limit,
|
||||
bool fill_with_detached_file_segments,
|
||||
const CreateFileSegmentSettings & settings)
|
||||
{
|
||||
@ -289,6 +320,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
assert(!file_segments.empty());
|
||||
|
||||
auto it = file_segments.begin();
|
||||
size_t processed_count = 0;
|
||||
auto segment_range = (*it)->range();
|
||||
|
||||
size_t current_pos;
|
||||
@ -301,11 +333,17 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
++processed_count;
|
||||
}
|
||||
else
|
||||
current_pos = range.left;
|
||||
|
||||
while (current_pos <= range.right && it != file_segments.end())
|
||||
auto is_limit_reached = [&]() -> bool
|
||||
{
|
||||
return file_segments_limit && processed_count >= file_segments_limit;
|
||||
};
|
||||
|
||||
while (current_pos <= range.right && it != file_segments.end() && !is_limit_reached())
|
||||
{
|
||||
segment_range = (*it)->range();
|
||||
|
||||
@ -313,6 +351,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
{
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
++processed_count;
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -326,18 +365,47 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
|
||||
|
||||
file_segments.insert(it, file_segment);
|
||||
++processed_count;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto split = splitRangeIntoFileSegments(
|
||||
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
|
||||
file_segments.splice(it, std::move(split));
|
||||
auto ranges = splitRange(current_pos, hole_size);
|
||||
FileSegments hole;
|
||||
for (const auto & r : ranges)
|
||||
{
|
||||
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
|
||||
hole.push_back(metadata_it->second->file_segment);
|
||||
++processed_count;
|
||||
|
||||
if (is_limit_reached())
|
||||
break;
|
||||
}
|
||||
file_segments.splice(it, std::move(hole));
|
||||
}
|
||||
|
||||
if (is_limit_reached())
|
||||
break;
|
||||
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
++processed_count;
|
||||
}
|
||||
|
||||
auto erase_unprocessed = [&]()
|
||||
{
|
||||
chassert(file_segments.size() >= file_segments_limit);
|
||||
file_segments.erase(it, file_segments.end());
|
||||
chassert(file_segments.size() == file_segments_limit);
|
||||
};
|
||||
|
||||
if (is_limit_reached())
|
||||
{
|
||||
erase_unprocessed();
|
||||
return;
|
||||
}
|
||||
|
||||
chassert(!file_segments_limit || file_segments.size() < file_segments_limit);
|
||||
|
||||
if (current_pos <= range.right)
|
||||
{
|
||||
/// ________] -- requested range
|
||||
@ -356,9 +424,21 @@ void FileCache::fillHolesWithEmptyFileSegments(
|
||||
}
|
||||
else
|
||||
{
|
||||
auto split = splitRangeIntoFileSegments(
|
||||
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
|
||||
file_segments.splice(file_segments.end(), std::move(split));
|
||||
auto ranges = splitRange(current_pos, hole_size);
|
||||
FileSegments hole;
|
||||
for (const auto & r : ranges)
|
||||
{
|
||||
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
|
||||
hole.push_back(metadata_it->second->file_segment);
|
||||
++processed_count;
|
||||
|
||||
if (is_limit_reached())
|
||||
break;
|
||||
}
|
||||
file_segments.splice(it, std::move(hole));
|
||||
|
||||
if (is_limit_reached())
|
||||
erase_unprocessed();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -374,7 +454,7 @@ FileSegmentsHolderPtr FileCache::set(
|
||||
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
|
||||
FileSegment::Range range(offset, offset + size - 1);
|
||||
|
||||
auto file_segments = getImpl(*locked_key, range);
|
||||
auto file_segments = getImpl(*locked_key, range, /* file_segments_limit */0);
|
||||
if (!file_segments.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");
|
||||
|
||||
@ -388,7 +468,7 @@ FileSegmentsHolderPtr FileCache::set(
|
||||
else
|
||||
{
|
||||
file_segments = splitRangeIntoFileSegments(
|
||||
*locked_key, offset, size, FileSegment::State::EMPTY, settings);
|
||||
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
|
||||
}
|
||||
|
||||
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
|
||||
@ -400,43 +480,137 @@ FileCache::getOrSet(
|
||||
size_t offset,
|
||||
size_t size,
|
||||
size_t file_size,
|
||||
const CreateFileSegmentSettings & settings)
|
||||
const CreateFileSegmentSettings & settings,
|
||||
size_t file_segments_limit)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
|
||||
|
||||
assertInitialized();
|
||||
|
||||
const auto aligned_offset = roundDownToMultiple(offset, boundary_alignment);
|
||||
const auto aligned_end = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size);
|
||||
const auto aligned_size = aligned_end - aligned_offset;
|
||||
FileSegment::Range range(offset, offset + size - 1);
|
||||
|
||||
FileSegment::Range range(aligned_offset, aligned_offset + aligned_size - 1);
|
||||
const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment);
|
||||
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
|
||||
|
||||
chassert(aligned_offset <= range.left);
|
||||
chassert(aligned_end_offset >= range.right);
|
||||
|
||||
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
|
||||
|
||||
/// Get all segments which intersect with the given range.
|
||||
auto file_segments = getImpl(*locked_key, range);
|
||||
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
|
||||
|
||||
if (file_segments_limit)
|
||||
{
|
||||
chassert(file_segments.size() <= file_segments_limit);
|
||||
if (file_segments.size() == file_segments_limit)
|
||||
range.right = aligned_end_offset = file_segments.back()->range().right;
|
||||
}
|
||||
|
||||
/// Check case if we have uncovered prefix, e.g.
|
||||
///
|
||||
/// [_______________]
|
||||
/// ^ ^
|
||||
/// range.left range.right
|
||||
/// [___] [__________] <-- current cache (example)
|
||||
/// [ ]
|
||||
/// ^----^
|
||||
/// uncovered prefix.
|
||||
const bool has_uncovered_prefix = file_segments.empty() || range.left < file_segments.front()->range().left;
|
||||
|
||||
if (aligned_offset < range.left && has_uncovered_prefix)
|
||||
{
|
||||
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? range.left - 1 : file_segments.front()->range().left - 1);
|
||||
auto prefix_file_segments = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);
|
||||
|
||||
if (prefix_file_segments.empty())
|
||||
{
|
||||
/// [____________________][_______________]
|
||||
/// ^ ^ ^
|
||||
/// aligned_offset range.left range.right
|
||||
/// [___] [__________] <-- current cache (example)
|
||||
range.left = aligned_offset;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// [____________________][_______________]
|
||||
/// ^ ^ ^
|
||||
/// aligned_offset range.left range.right
|
||||
/// ____] [____] [___] [__________] <-- current cache (example)
|
||||
/// ^
|
||||
/// prefix_file_segments.back().right
|
||||
|
||||
chassert(prefix_file_segments.back()->range().right < range.left);
|
||||
chassert(prefix_file_segments.back()->range().right >= aligned_offset);
|
||||
|
||||
range.left = prefix_file_segments.back()->range().right + 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Check case if we have uncovered suffix.
|
||||
///
|
||||
/// [___________________]
|
||||
/// ^ ^
|
||||
/// range.left range.right
|
||||
/// [___] [___] <-- current cache (example)
|
||||
/// [___]
|
||||
/// ^---^
|
||||
/// uncovered_suffix
|
||||
const bool has_uncovered_suffix = file_segments.empty() || file_segments.back()->range().right < range.right;
|
||||
|
||||
if (range.right < aligned_end_offset && has_uncovered_suffix)
|
||||
{
|
||||
auto suffix_range = FileSegment::Range(range.right, aligned_end_offset);
|
||||
/// We need to get 1 file segment, so file_segments_limit = 1 here.
|
||||
auto suffix_file_segments = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);
|
||||
|
||||
if (suffix_file_segments.empty())
|
||||
{
|
||||
/// [__________________][ ]
|
||||
/// ^ ^ ^
|
||||
/// range.left range.right aligned_end_offset
|
||||
/// [___] [___] <-- current cache (example)
|
||||
|
||||
range.right = aligned_end_offset;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// [__________________][ ]
|
||||
/// ^ ^ ^
|
||||
/// range.left range.right aligned_end_offset
|
||||
/// [___] [___] [_________] <-- current cache (example)
|
||||
/// ^
|
||||
/// suffix_file_segments.front().left
|
||||
range.right = suffix_file_segments.front()->range().left - 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (file_segments.empty())
|
||||
{
|
||||
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, settings);
|
||||
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
chassert(file_segments.front()->range().right >= range.left);
|
||||
chassert(file_segments.back()->range().left <= range.right);
|
||||
|
||||
fillHolesWithEmptyFileSegments(
|
||||
*locked_key, file_segments, range, /* fill_with_detached */false, settings);
|
||||
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings);
|
||||
|
||||
if (!file_segments.front()->range().contains(offset))
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} to include {} "
|
||||
"(end offset: {}, aligned offset: {}, aligned end offset: {})",
|
||||
file_segments.front()->range().toString(), offset, range.right, aligned_offset, aligned_end_offset);
|
||||
}
|
||||
}
|
||||
|
||||
while (!file_segments.empty() && file_segments.front()->range().right < offset)
|
||||
file_segments.pop_front();
|
||||
chassert(file_segments_limit ? file_segments.back()->range().left <= range.right : file_segments.back()->range().contains(range.right));
|
||||
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
|
||||
|
||||
while (!file_segments.empty() && file_segments.back()->range().left >= offset + size)
|
||||
file_segments.pop_back();
|
||||
|
||||
chassert(!file_segments.empty());
|
||||
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
|
||||
}
|
||||
|
||||
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size)
|
||||
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size, size_t file_segments_limit)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetMicroseconds);
|
||||
|
||||
@ -448,12 +622,20 @@ FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size
|
||||
FileSegment::Range range(offset, offset + size - 1);
|
||||
|
||||
/// Get all segments which intersect with the given range.
|
||||
auto file_segments = getImpl(*locked_key, range);
|
||||
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
|
||||
if (!file_segments.empty())
|
||||
{
|
||||
fillHolesWithEmptyFileSegments(
|
||||
*locked_key, file_segments, range, /* fill_with_detached */true, CreateFileSegmentSettings{});
|
||||
if (file_segments_limit)
|
||||
{
|
||||
chassert(file_segments.size() <= file_segments_limit);
|
||||
if (file_segments.size() == file_segments_limit)
|
||||
range.right = file_segments.back()->range().right;
|
||||
}
|
||||
|
||||
fillHolesWithEmptyFileSegments(
|
||||
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */true, CreateFileSegmentSettings{});
|
||||
|
||||
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
|
||||
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
|
||||
}
|
||||
}
|
||||
|
@ -80,8 +80,13 @@ public:
|
||||
* As long as pointers to returned file segments are held
|
||||
* it is guaranteed that these file segments are not removed from cache.
|
||||
*/
|
||||
FileSegmentsHolderPtr
|
||||
getOrSet(const Key & key, size_t offset, size_t size, size_t file_size, const CreateFileSegmentSettings & settings);
|
||||
FileSegmentsHolderPtr getOrSet(
|
||||
const Key & key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
size_t file_size,
|
||||
const CreateFileSegmentSettings & settings,
|
||||
size_t file_segments_limit = 0);
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
@ -92,7 +97,7 @@ public:
|
||||
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
|
||||
* it's state (and become DOWNLOADED).
|
||||
*/
|
||||
FileSegmentsHolderPtr get(const Key & key, size_t offset, size_t size);
|
||||
FileSegmentsHolderPtr get(const Key & key, size_t offset, size_t size, size_t file_segments_limit);
|
||||
|
||||
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
|
||||
|
||||
@ -204,26 +209,41 @@ private:
|
||||
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
|
||||
|
||||
void assertInitialized() const;
|
||||
|
||||
void assertCacheCorrectness();
|
||||
|
||||
void loadMetadata();
|
||||
void loadMetadataImpl();
|
||||
void loadMetadataForKeys(const std::filesystem::path & keys_dir);
|
||||
|
||||
FileSegments getImpl(const LockedKey & locked_key, const FileSegment::Range & range) const;
|
||||
/// Get all file segments from cache which intersect with `range`.
|
||||
/// If `file_segments_limit` > 0, return no more than first file_segments_limit
|
||||
/// file segments.
|
||||
FileSegments getImpl(
|
||||
const LockedKey & locked_key,
|
||||
const FileSegment::Range & range,
|
||||
size_t file_segments_limit) const;
|
||||
|
||||
/// Split range into subranges by max_file_segment_size,
|
||||
/// each subrange size must be less or equal to max_file_segment_size.
|
||||
std::vector<FileSegment::Range> splitRange(size_t offset, size_t size);
|
||||
|
||||
/// Split range into subranges by max_file_segment_size (same as in splitRange())
|
||||
/// and create a new file segment for each subrange.
|
||||
/// If `file_segments_limit` > 0, create no more than first file_segments_limit
|
||||
/// file segments.
|
||||
FileSegments splitRangeIntoFileSegments(
|
||||
LockedKey & locked_key,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
FileSegment::State state,
|
||||
size_t file_segments_limit,
|
||||
const CreateFileSegmentSettings & create_settings);
|
||||
|
||||
void fillHolesWithEmptyFileSegments(
|
||||
LockedKey & locked_key,
|
||||
FileSegments & file_segments,
|
||||
const FileSegment::Range & range,
|
||||
size_t file_segments_limit,
|
||||
bool fill_with_detached_file_segments,
|
||||
const CreateFileSegmentSettings & settings);
|
||||
|
||||
|
@ -58,6 +58,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
|
||||
|
||||
if (has("load_metadata_threads"))
|
||||
load_metadata_threads = get_uint("load_metadata_threads");
|
||||
|
||||
if (boundary_alignment > max_file_segment_size)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `boundary_alignment` cannot exceed `max_file_segment_size`");
|
||||
}
|
||||
|
||||
void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||
|
@ -924,8 +924,8 @@ void FileSegment::use()
|
||||
}
|
||||
}
|
||||
|
||||
FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_, bool complete_on_dtor_)
|
||||
: file_segments(std::move(file_segments_)), complete_on_dtor(complete_on_dtor_)
|
||||
FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_)
|
||||
: file_segments(std::move(file_segments_))
|
||||
{
|
||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheHoldFileSegments, file_segments.size());
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheHoldFileSegments, file_segments.size());
|
||||
@ -935,9 +935,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentHolderCompleteMicroseconds);
|
||||
|
||||
if (!complete_on_dtor)
|
||||
return;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheUnusedHoldFileSegments, file_segments.size());
|
||||
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
|
||||
file_segment_it = completeAndPopFrontImpl();
|
||||
|
@ -136,6 +136,8 @@ public:
|
||||
size_t size() const { return right - left + 1; }
|
||||
|
||||
String toString() const { return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right)); }
|
||||
|
||||
bool contains(size_t offset) const { return left <= offset && offset <= right; }
|
||||
};
|
||||
|
||||
static String getCallerId();
|
||||
@ -324,7 +326,7 @@ struct FileSegmentsHolder : private boost::noncopyable
|
||||
{
|
||||
FileSegmentsHolder() = default;
|
||||
|
||||
explicit FileSegmentsHolder(FileSegments && file_segments_, bool complete_on_dtor_ = true);
|
||||
explicit FileSegmentsHolder(FileSegments && file_segments_);
|
||||
|
||||
~FileSegmentsHolder();
|
||||
|
||||
@ -350,7 +352,6 @@ struct FileSegmentsHolder : private boost::noncopyable
|
||||
|
||||
private:
|
||||
FileSegments file_segments{};
|
||||
const bool complete_on_dtor = true;
|
||||
|
||||
FileSegments::iterator completeAndPopFrontImpl();
|
||||
};
|
||||
|
@ -4813,6 +4813,7 @@ ReadSettings Context::getReadSettings() const
|
||||
res.enable_filesystem_cache = settings.enable_filesystem_cache;
|
||||
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
|
||||
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
|
||||
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
|
||||
|
||||
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
|
||||
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
|
||||
|
@ -242,7 +242,7 @@ TEST_F(FileCacheTest, get)
|
||||
settings.max_elements = 5;
|
||||
settings.boundary_alignment = 1;
|
||||
|
||||
const size_t file_size = -1; // the value doesn't really matter because boundary_alignment == 1.
|
||||
const size_t file_size = INT_MAX; // the value doesn't really matter because boundary_alignment == 1.
|
||||
|
||||
{
|
||||
std::cerr << "Step 1\n";
|
||||
|
@ -577,6 +577,11 @@ class SettingsRandomizer:
|
||||
),
|
||||
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
|
||||
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
||||
"filesystem_cache_segments_batch_size": lambda: random.choice([0, 3, 10, 50]),
|
||||
"read_from_filesystem_cache_if_exists_otherwise_bypass_cache": lambda: random.randint(
|
||||
0, 1
|
||||
),
|
||||
"throw_on_error_from_cache_on_write_operations": lambda: random.randint(0, 1),
|
||||
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
||||
"allow_prefetched_read_pool_for_remote_filesystem": lambda: random.randint(
|
||||
0, 1
|
||||
|
@ -153,6 +153,7 @@ if [[ -n "$EXPORT_S3_STORAGE_POLICIES" ]]; then
|
||||
|
||||
ln -sf $SRC_PATH/config.d/storage_conf.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/s3_cache_new.xml $DEST_SERVER_PATH/users.d/
|
||||
fi
|
||||
|
||||
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
|
||||
|
7
tests/config/users.d/s3_cache_new.xml
Normal file
7
tests/config/users.d/s3_cache_new.xml
Normal file
@ -0,0 +1,7 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<filesystem_cache_segments_batch_size>10</filesystem_cache_segments_batch_size>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
@ -46,7 +46,7 @@ def test_parallel_cache_loading_on_startup(cluster, node_name):
|
||||
path = 'paralel_loading_test',
|
||||
disk = 'hdd_blob',
|
||||
max_file_segment_size = '1Ki',
|
||||
boundary_alignemt = '1Ki',
|
||||
boundary_alignment = '1Ki',
|
||||
max_size = '1Gi',
|
||||
max_elements = 10000000,
|
||||
load_metadata_threads = 30);
|
||||
|
@ -12,6 +12,7 @@
|
||||
<path>/tiny_local_cache/</path>
|
||||
<max_size>10M</max_size>
|
||||
<max_file_segment_size>1M</max_file_segment_size>
|
||||
<boundary_alignment>1M</boundary_alignment>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
</tiny_local_cache>
|
||||
|
||||
|
@ -22,6 +22,7 @@ SETTINGS min_bytes_for_wide_part = 0,
|
||||
type = cache,
|
||||
max_size = '128Mi',
|
||||
max_file_segment_size = '10Ki',
|
||||
boundary_alignment = '5Ki',
|
||||
path = '${CLICKHOUSE_TEST_UNIQUE_NAME}',
|
||||
cache_on_write_operations = 1,
|
||||
enable_filesystem_query_cache_limit = 1,
|
||||
|
Loading…
Reference in New Issue
Block a user