This commit is contained in:
kssenii 2022-08-19 20:13:46 +02:00
parent 90f17a9440
commit 26384b2543
6 changed files with 98 additions and 151 deletions

View File

@ -430,19 +430,20 @@ void FileSegment::completeBatchAndResetDownloader()
cv.notify_all();
}
void FileSegment::completeWithState(State state, bool auto_resize)
void FileSegment::completeWithState(State state)
{
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
bool is_downloader = isDownloaderImpl(segment_lock);
if (!is_downloader)
auto caller_id = getCallerId();
if (caller_id != downloader_id)
{
cv.notify_all();
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"File segment can be completed only by downloader or downloader's FileSegmentsHodler");
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"File segment completion can be done only by downloader. (CallerId: {}, downloader id: {}",
caller_id, downloader_id);
}
if (state != State::DOWNLOADED
@ -450,140 +451,46 @@ void FileSegment::completeWithState(State state, bool auto_resize)
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
{
cv.notify_all();
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
}
if (state == State::DOWNLOADED)
{
if (auto_resize && downloaded_size != range().size())
{
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
assert(downloaded_size <= range().size());
segment_range = Range(segment_range.left, segment_range.left + downloaded_size - 1);
}
/// Update states and finalize cache write buffer.
setDownloaded(segment_lock);
if (downloaded_size != range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment as DOWNLOADED, because downloaded size ({}) does not match expected size ({})",
downloaded_size, range().size());
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
}
download_state = state;
try
{
completeImpl(cache_lock, segment_lock);
}
catch (...)
{
if (!downloader_id.empty() && is_downloader)
downloader_id.clear();
cv.notify_all();
throw;
}
completeBasedOnCurrentState(cache_lock, segment_lock);
cv.notify_all();
}
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock)
void FileSegment::completeWithoutState(std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(mutex);
completeBasedOnCurrentState(cache_lock, segment_lock);
}
void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
if (is_detached)
return;
assertNotDetached(segment_lock);
SCOPE_EXIT({
cv.notify_one();
});
completeBasedOnCurrentStateUnlocked(cache_lock, segment_lock);
}
void FileSegment::completeBasedOnCurrentStateUnlocked(
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
bool is_downloader = isDownloaderImpl(segment_lock);
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
bool can_update_segment_state = is_downloader || is_last_holder;
size_t current_downloaded_size = getDownloadedSize(segment_lock);
if (is_last_holder && download_state == State::SKIP_CACHE)
LOG_TEST(log, "Complete without state (is_last_holder: {}). File segment info: {}", is_last_holder, getInfoForLogImpl(segment_lock));
if (can_update_segment_state)
{
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
}
if (download_state == State::SKIP_CACHE || is_detached)
return;
if (isDownloaderImpl(segment_lock)
&& download_state != State::DOWNLOADED
&& getDownloadedSize(segment_lock) == range().size())
{
setDownloaded(segment_lock);
}
assertNotDetached(segment_lock);
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
{
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
/// downloader or the only owner of the segment.
bool can_update_segment_state = isDownloaderImpl(segment_lock) || is_last_holder;
if (can_update_segment_state)
download_state = State::PARTIALLY_DOWNLOADED;
}
try
{
completeImpl(cache_lock, segment_lock);
}
catch (...)
{
if (!downloader_id.empty() && isDownloaderImpl(segment_lock))
downloader_id.clear();
cv.notify_all();
throw;
}
cv.notify_all();
}
void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
if (is_last_holder
&& (download_state == State::PARTIALLY_DOWNLOADED || download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION))
{
size_t current_downloaded_size = getDownloadedSize(segment_lock);
if (current_downloaded_size == 0)
{
download_state = State::SKIP_CACHE;
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
cache->remove(key(), offset(), cache_lock, segment_lock);
}
if (current_downloaded_size == range().size())
setDownloaded(segment_lock);
else
{
/**
* Only last holder of current file segment can resize the cell,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
}
download_state = State::PARTIALLY_DOWNLOADED;
markAsDetached(segment_lock);
resetDownloaderImpl(segment_lock);
if (cache_writer)
{
@ -593,10 +500,62 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
}
}
if (!downloader_id.empty() && (isDownloaderImpl(segment_lock) || is_last_holder))
switch (download_state)
{
LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state));
downloader_id.clear();
case State::SKIP_CACHE:
{
if (is_last_holder)
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
}
case State::DOWNLOADED:
{
assert(downloaded_size == range().size());
assert(is_downloaded);
break;
}
case State::DOWNLOADING:
case State::EMPTY:
{
assert(!is_last_holder);
break;
}
case State::PARTIALLY_DOWNLOADED:
case State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
if (is_last_holder)
{
if (current_downloaded_size == 0)
{
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
download_state = State::SKIP_CACHE;
cache->remove(key(), offset(), cache_lock, segment_lock);
}
else
{
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size);
/**
* Only last holder of current file segment can resize the cell,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
/// Resize this file segment by creating a copy file segment with DOWNLOADED state,
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
}
markAsDetached(segment_lock);
}
break;
}
}
LOG_TEST(log, "Completed file segment: {}", getInfoForLogImpl(segment_lock));
@ -793,7 +752,7 @@ FileSegmentsHolder::~FileSegmentsHolder()
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
file_segment->completeBasedOnCurrentState(cache_lock);
file_segment->completeWithoutState(cache_lock);
file_segment_it = file_segments.erase(current_file_segment_it);
}
@ -859,13 +818,20 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
file_segment.getOrSetDownloader();
file_segment.completeWithState(FileSegment::State::DOWNLOADED, /* auto_resize */true);
assert(file_segment.downloaded_size <= file_segment.range().size());
file_segment.segment_range = FileSegment::Range(
file_segment.segment_range.left, file_segment.segment_range.left + file_segment.downloaded_size - 1);
file_segment.reserved_size = file_segment.downloaded_size;
file_segment.completeWithState(FileSegment::State::DOWNLOADED);
on_complete_file_segment_func(file_segment);
}
else
{
std::lock_guard cache_lock(cache->mutex);
file_segment.completeBasedOnCurrentState(cache_lock);
file_segment.completeWithoutState(cache_lock);
}
}

View File

@ -142,7 +142,7 @@ public:
void completeBatchAndResetDownloader();
void completeWithState(State state, bool auto_resize = false);
void completeWithState(State state);
String getInfoForLog() const;
@ -195,12 +195,8 @@ private:
/// FileSegmentsHolder. complete() might check if the caller of the method
/// is the last alive holder of the segment. Therefore, complete() and destruction
/// of the file segment pointer must be done under the same cache mutex.
void completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock);
void completeBasedOnCurrentStateUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void completeImpl(
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
void completeBasedOnCurrentState(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void completeWithoutState(std::lock_guard<std::mutex> & cache_lock);
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);

View File

@ -588,7 +588,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must be done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(UInt64, filesystem_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \

View File

@ -222,9 +222,6 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
{
auto range = file_segment->range();
size_t wait_download_max_tries = settings.filesystem_cache_max_wait_sec;
size_t wait_download_tries = 0;
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
@ -274,16 +271,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
return getCacheReadBuffer(range.left);
}
if (wait_download_tries++ < wait_download_max_tries)
{
download_state = file_segment->wait();
}
else
{
LOG_DEBUG(log, "Retries to wait for file segment download exceeded ({})", wait_download_tries);
download_state = FileSegment::State::SKIP_CACHE;
}
download_state = file_segment->wait();
continue;
}
case FileSegment::State::DOWNLOADED:

View File

@ -80,7 +80,6 @@ struct ReadSettings
size_t remote_fs_read_backoff_max_tries = 4;
bool enable_filesystem_cache = true;
size_t filesystem_cache_max_wait_sec = 1;
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
bool is_file_cache_persistent = false; /// Some files can be made non-evictable.

View File

@ -3451,7 +3451,6 @@ ReadSettings Context::getReadSettings() const
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.enable_filesystem_cache = settings.enable_filesystem_cache;
res.filesystem_cache_max_wait_sec = settings.filesystem_cache_max_wait_sec;
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.enable_filesystem_cache_on_lower_level = settings.enable_filesystem_cache_on_lower_level;