Merge pull request #36825 from kssenii/cache-fix-1

Removed forceful drop cache command, fix detached status state
This commit is contained in:
alesapin 2022-05-12 17:18:21 +02:00 committed by GitHub
commit e7296a2b28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 147 additions and 93 deletions

View File

@ -400,7 +400,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
if (files[key].contains(offset))
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
keyToStr(key), offset, size, dumpStructureUnlocked(key, cache_lock));
@ -609,7 +609,7 @@ void LRUFileCache::remove(const Key & key)
#endif
}
void LRUFileCache::remove(bool force_remove_unreleasable)
void LRUFileCache::remove()
{
/// Try remove all cached files by cache_base_path.
/// Only releasable file segments are evicted.
@ -626,7 +626,7 @@ void LRUFileCache::remove(bool force_remove_unreleasable)
ErrorCodes::LOGICAL_ERROR,
"Cache is in inconsistent state: LRU queue contains entries with no cache cell");
if (cell->releasable() || force_remove_unreleasable)
if (cell->releasable())
{
auto file_segment = cell->file_segment;
if (file_segment)
@ -647,7 +647,7 @@ void LRUFileCache::remove(
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
if (cell->queue_iterator)
{

View File

@ -26,6 +26,7 @@ class IFileCache : private boost::noncopyable
{
friend class FileSegment;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -42,7 +43,7 @@ public:
virtual void remove(const Key & key) = 0;
virtual void remove(bool force_remove_unreleasable) = 0;
virtual void remove() = 0;
static bool isReadOnly();
@ -143,13 +144,11 @@ public:
FileSegments getSnapshot() const override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
void initialize() override;
void remove(const Key & key) override;
void remove(bool force_remove_unreleasable) override;
void remove() override;
std::vector<String> tryGetCachePaths(const Key & key) override;
@ -272,6 +271,8 @@ private:
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;

View File

@ -107,8 +107,7 @@ String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
if (detached)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot set downloader for a detached file segment");
assertNotDetached(segment_lock);
if (downloader_id.empty())
{
@ -132,6 +131,8 @@ void FileSegment::resetDownloader()
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (downloader_id.empty())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader");
@ -209,7 +210,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
if (!isDownloader())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
@ -224,7 +225,10 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Attempt to write {} bytes to offset: {}, but current download offset is {}",
size, offset_, download_offset);
assertNotDetached();
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
}
if (!cache_writer)
{
@ -273,9 +277,8 @@ void FileSegment::writeInMemory(const char * from, size_t size)
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
assertNotDetached();
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized");
@ -311,7 +314,7 @@ size_t FileSegment::finalizeWrite()
if (size == 0)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
assertNotDetached();
assertNotDetached(segment_lock);
try
{
@ -342,6 +345,11 @@ FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (is_detached)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed");
if (downloader_id.empty())
return download_state;
@ -366,14 +374,19 @@ bool FileSegment::reserve(size_t size)
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
assertNotDetached();
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
auto caller_id = getCallerId();
if (downloader_id != caller_id)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
bool is_downloader = caller_id == downloader_id;
if (!is_downloader)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Space can be reserved only by downloader (current: {}, expected: {})",
caller_id, downloader_id);
}
if (downloaded_size + size > range().size())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
@ -392,6 +405,7 @@ bool FileSegment::reserve(size_t size)
size_t size_to_reserve = size - free_space;
std::lock_guard cache_lock(cache->mutex);
bool reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (reserved)
@ -437,6 +451,8 @@ void FileSegment::completeBatchAndResetDownloader()
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (!isDownloaderImpl(segment_lock))
{
cv.notify_all();
@ -458,7 +474,7 @@ void FileSegment::complete(State state)
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
assertNotDetached();
assertNotDetached(segment_lock);
bool is_downloader = isDownloaderImpl(segment_lock);
if (!is_downloader)
@ -501,12 +517,15 @@ void FileSegment::complete(State state)
void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
completeUnlocked(cache_lock, segment_lock);
}
void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
if (download_state == State::SKIP_CACHE || detached)
if (download_state == State::SKIP_CACHE || is_detached)
return;
if (isDownloaderImpl(segment_lock)
@ -516,7 +535,7 @@ void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std
setDownloaded(segment_lock);
}
assertNotDetached();
assertNotDetached(segment_lock);
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
{
@ -589,6 +608,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
downloader_id.clear();
}
LOG_TEST(log, "Completed file segment: {}", getInfoForLogImpl(segment_lock));
assertCorrectnessImpl(segment_lock);
}
@ -649,15 +669,40 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
}
void FileSegment::assertNotDetached() const
void FileSegment::throwIfDetached() const
{
if (detached)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation not allowed, file segment is detached");
std::lock_guard segment_lock(mutex);
throwIfDetachedUnlocked(segment_lock);
}
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & /* segment_lock */) const
void FileSegment::throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const
{
assert(download_state == State::EMPTY || hasFinalizedState());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogImpl(segment_lock));
}
void FileSegment::assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const
{
if (is_detached)
throwIfDetachedUnlocked(segment_lock);
}
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const
{
/// Detached file segment is allowed to have only a certain subset of states.
/// It should be either EMPTY or one of the finalized states.
if (download_state != State::EMPTY && !hasFinalizedState())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Detached file segment has incorrect state: {}",
getInfoForLogImpl(segment_lock));
}
}
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
@ -684,29 +729,35 @@ bool FileSegment::hasFinalizedState() const
|| download_state == State::SKIP_CACHE;
}
void FileSegment::detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
void FileSegment::detach(
std::lock_guard<std::mutex> & /* cache_lock */,
std::lock_guard<std::mutex> & segment_lock)
{
if (detached)
/// Now detached status can be in 2 cases, which do not do any complex logic:
/// 1. there is only 1 remaining file segment holder
/// && it does not need this segment anymore
/// && this file segment was in cache and needs to be removed
/// 2. in read_from_cache_if_exists_otherwise_bypass_cache case
if (is_detached)
return;
markAsDetached(segment_lock);
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
if (!hasFinalizedState())
{
completeUnlocked(cache_lock, segment_lock);
}
LOG_TEST(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock));
}
void FileSegment::markAsDetached(std::lock_guard<std::mutex> & /* segment_lock */)
{
detached = true;
is_detached = true;
CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments);
}
FileSegment::~FileSegment()
{
std::lock_guard segment_lock(mutex);
if (detached)
if (is_detached)
CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments);
}
@ -726,15 +777,18 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache)
cache = file_segment->cache;
try
{
bool detached = false;
bool is_detached = false;
{
std::lock_guard segment_lock(file_segment->mutex);
detached = file_segment->isDetached(segment_lock);
if (detached)
is_detached = file_segment->isDetached(segment_lock);
if (is_detached)
file_segment->assertDetachedStatus(segment_lock);
}
if (detached)
if (is_detached)
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
@ -742,10 +796,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
continue;
}
}
try
{
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
@ -757,7 +807,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false);
}
}
}
@ -774,5 +823,4 @@ String FileSegmentsHolder::toString()
return ranges;
}
}

View File

@ -25,8 +25,10 @@ using FileSegments = std::list<FileSegmentPtr>;
class FileSegment : boost::noncopyable
{
friend class LRUFileCache;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -149,9 +151,15 @@ public:
void assertCorrectness() const;
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & cache_lock);
static FileSegmentPtr getSnapshot(
const FileSegmentPtr & file_segment,
std::lock_guard<std::mutex> & cache_lock);
void detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void detach(
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetached() const;
private:
size_t availableSize() const { return reserved_size - downloaded_size; }
@ -159,11 +167,14 @@ private:
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached() const;
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return detached; }
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return is_detached; }
void markAsDetached(std::lock_guard<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const;
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const;
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -197,6 +208,10 @@ private:
size_t downloaded_size = 0;
size_t reserved_size = 0;
/// global locking order rule:
/// 1. cache lock
/// 2. segment lock
mutable std::mutex mutex;
std::condition_variable cv;
@ -215,7 +230,7 @@ private:
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool detached = false;
bool is_detached = false;
std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
@ -227,6 +242,7 @@ private:
struct FileSegmentsHolder : private boost::noncopyable
{
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {}
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
~FileSegmentsHolder();

View File

@ -119,9 +119,9 @@ TEST(LRUFileCache, get)
assertRange(1, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::EMPTY);
/// Exception because space not reserved.
EXPECT_THROW(download(segments[0]), DB::Exception);
/// EXPECT_THROW(download(segments[0]), DB::Exception);
/// Exception because space can be reserved only by downloader
EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
/// EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
ASSERT_TRUE(segments[0]->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segments[0]->reserve(segments[0]->range().size()));

View File

@ -663,18 +663,18 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
return false;
SCOPE_EXIT({
/// Save state of current file segment before it is completed.
nextimpl_step_log_info = getInfoForLog();
if (current_file_segment_it == file_segments_holder->file_segments.end())
return;
auto & file_segment = *current_file_segment_it;
bool download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment)
try
{
try
/// Save state of current file segment before it is completed.
nextimpl_step_log_info = getInfoForLog();
if (current_file_segment_it == file_segments_holder->file_segments.end())
return;
auto & file_segment = *current_file_segment_it;
bool download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment)
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
@ -683,13 +683,13 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
file_segment->completeBatchAndResetDownloader();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
assert(!file_segment->isDownloader());
assert(!file_segment->isDownloader());
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
bytes_to_predownload = 0;
@ -742,13 +742,12 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment != file_segment->isDownloader())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type),
file_segment->getCallerId(),
file_segment->getDownloader(),
file_segment->state());
"Incorrect segment state. Having read type: {}, file segment info: {}",
toString(read_type), file_segment->getInfoForLog());
}
if (!result)
{
@ -818,11 +817,6 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
swap(*implementation_buffer);
if (download_current_segment)
file_segment->completeBatchAndResetDownloader();
assert(!file_segment->isDownloader());
LOG_TEST(
log,
"Key: {}. Returning with {} bytes, buffer position: {} (offset: {}, predownloaded: {}), "
@ -858,6 +852,11 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
cache_file_size ? std::to_string(*cache_file_size) : "None");
}
if (download_current_segment)
file_segment->completeBatchAndResetDownloader();
assert(!file_segment->isDownloader());
return result;
}

View File

@ -312,12 +312,12 @@ BlockIO InterpreterSystemQuery::execute()
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache_data] : caches)
cache_data.cache->remove(query.force_removal);
cache_data.cache->remove();
}
else
{
auto cache = FileCacheFactory::instance().get(query.filesystem_cache_path);
cache->remove(query.force_removal);
cache->remove();
}
break;
}

View File

@ -200,8 +200,6 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
{
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
if (force_removal)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FORCE";
}
}

View File

@ -91,9 +91,7 @@ public:
String disk;
UInt64 seconds{};
/// Values for `drop filesystem cache` system query.
String filesystem_cache_path;
bool force_removal = false;
String getID(char) const override { return "SYSTEM query"; }

View File

@ -360,8 +360,6 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
ASTPtr ast;
if (path_parser.parse(pos, ast, expected))
res->filesystem_cache_path = ast->as<ASTLiteral>()->value.safeGet<String>();
if (ParserKeyword{"FORCE"}.ignore(pos, expected))
res->force_removal = true;
break;
}

View File

@ -28,5 +28,3 @@ SELECT count() FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE './s3_cache/';
SELECT count() FROM system.filesystem_cache;
2
EXPLAIN SYNTAX SYSTEM DROP FILESYSTEM CACHE './s3_cache/' FORCE;
SYSTEM DROP FILESYSTEM CACHE ./s3_cache/ FORCE

View File

@ -31,6 +31,4 @@ SELECT * FROM test2 FORMAT Null;
SELECT count() FROM system.filesystem_cache;
SYSTEM DROP FILESYSTEM CACHE './s3_cache/';
SELECT count() FROM system.filesystem_cache;
EXPLAIN SYNTAX SYSTEM DROP FILESYSTEM CACHE './s3_cache/' FORCE;
SELECT count() FROM system.filesystem_cache;