This commit is contained in:
kssenii 2024-11-11 12:36:09 +01:00
parent e18ff6e56b
commit 386e16bee2
6 changed files with 63 additions and 36 deletions

View File

@ -8,6 +8,7 @@
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/SLRUFileCachePriority.h>
#include <Interpreters/Cache/FileCacheUtils.h>
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
@ -53,16 +54,6 @@ namespace ErrorCodes
namespace
{
size_t roundDownToMultiple(size_t num, size_t multiple)
{
return (num / multiple) * multiple;
}
size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}
std::string getCommonUserID()
{
auto user_from_context = DB::Context::getGlobalContextInstance()->getFilesystemCacheUser();
@ -605,8 +596,8 @@ FileCache::getOrSet(
/// 2. max_file_segments_limit
FileSegment::Range result_range = initial_range;
const auto aligned_offset = roundDownToMultiple(initial_range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
const auto aligned_offset = FileCacheUtils::roundDownToMultiple(initial_range.left, boundary_alignment);
auto aligned_end_offset = std::min(FileCacheUtils::roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= initial_range.left);
chassert(aligned_end_offset >= initial_range.right);

View File

@ -163,6 +163,8 @@ public:
size_t getBackgroundDownloadMaxFileSegmentSize() const { return metadata.getBackgroundDownloadMaxFileSegmentSize(); }
size_t getBoundaryAlignment() const { return boundary_alignment; }
bool tryReserve(
FileSegment & file_segment,
size_t size,

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/Types.h>
namespace FileCacheUtils
{
static size_t roundDownToMultiple(size_t num, size_t multiple)
{
return (num / multiple) * multiple;
}
static size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}
}

View File

@ -4,6 +4,7 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheUtils.h>
#include <base/getThreadId.h>
#include <base/hex.h>
#include <Common/CurrentThread.h>
@ -629,6 +630,31 @@ void FileSegment::completePartAndResetDownloader()
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lk));
}
size_t FileSegment::getSizeForBackgroundDownload() const
{
auto lk = lock();
return getSizeForBackgroundDownloadUnlocked(lk);
}
size_t FileSegment::getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const
{
if (!background_download_enabled
|| !downloaded_size
|| !remote_file_reader)
{
return 0;
}
const size_t background_download_max_file_segment_size = cache->getBackgroundDownloadMaxFileSegmentSize();
size_t desired_size;
if (downloaded_size >= background_download_max_file_segment_size)
desired_size = FileCacheUtils::roundUpToMultiple(downloaded_size, cache->getBoundaryAlignment());
else
desired_size = FileCacheUtils::roundUpToMultiple(background_download_max_file_segment_size, cache->getBoundaryAlignment());
return desired_size - downloaded_size;
}
void FileSegment::complete(bool allow_background_download)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentCompleteMicroseconds);
@ -708,10 +734,8 @@ void FileSegment::complete(bool allow_background_download)
if (is_last_holder)
{
bool added_to_download_queue = false;
if (allow_background_download
&& background_download_enabled
&& remote_file_reader
&& downloaded_size < cache->getBackgroundDownloadMaxFileSegmentSize())
size_t background_download_size = allow_background_download ? getSizeForBackgroundDownloadUnlocked(segment_lock) : 0;
if (background_download_size)
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundDownloadQueuePush);
added_to_download_queue = locked_key->addToDownloadQueue(offset(), segment_lock); /// Finish download in background.

View File

@ -185,6 +185,8 @@ public:
bool assertCorrectness() const;
size_t getSizeForBackgroundDownload() const;
/**
* ========== Methods that must do cv.notify() ==================
*/
@ -230,6 +232,7 @@ private:
String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const;
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
void resetDownloaderUnlocked(const FileSegmentGuard::Lock &);
size_t getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const;
void setDownloadState(State state, const FileSegmentGuard::Lock &);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);

View File

@ -632,9 +632,6 @@ void CacheMetadata::downloadThreadFunc(const bool & stop_flag)
auto & file_segment = holder->front();
if (file_segment.getDownloadedSize() >= download_max_file_segment_size)
continue;
if (file_segment.getOrSetDownloader() != FileSegment::getCallerId())
continue;
@ -681,6 +678,10 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
log, "Downloading {} bytes for file segment {}",
file_segment.range().size() - file_segment.getDownloadedSize(), file_segment.getInfoForLog());
size_t size_to_download = file_segment.getSizeForBackgroundDownload();
if (!size_to_download)
return;
auto reader = file_segment.getRemoteFileReader();
if (!reader)
{
@ -695,7 +696,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
if (reader->internalBuffer().empty())
{
if (!memory)
memory.emplace(DBMS_DEFAULT_BUFFER_SIZE);
memory.emplace(std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size_to_download));
reader->set(memory->data(), memory->size());
}
@ -706,24 +707,13 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
if (offset != static_cast<size_t>(reader->getPosition()))
reader->seek(offset, SEEK_SET);
bool stop = false;
const size_t max_file_segment_size = download_max_file_segment_size.load();
while (!stop && !reader->eof())
while (size_to_download && !reader->eof())
{
auto size = reader->available();
const auto available = reader->available();
chassert(available);
const size_t downloaded_size = file_segment.getDownloadedSize();
if (downloaded_size >= max_file_segment_size)
break;
if (downloaded_size + size > max_file_segment_size)
{
/// Do not download more than download_max_file_segment_size
/// because we want to leave right boundary of file segment aligned.
size = max_file_segment_size - downloaded_size;
stop = true;
}
const auto size = std::min(available, size_to_download);
size_to_download -= size;
std::string failure_reason;
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds, failure_reason))