mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Merge pull request #35275 from kssenii/fix-logical-error-in-cache
Fix logical error in remote fs cache
This commit is contained in:
commit
663c8e9e4a
@ -46,7 +46,16 @@ FileSegment::State FileSegment::state() const
|
||||
size_t FileSegment::getDownloadOffset() const
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
return range().left + downloaded_size;
|
||||
return range().left + getDownloadedSize(segment_lock);
|
||||
}
|
||||
|
||||
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
|
||||
{
|
||||
if (download_state == State::DOWNLOADED)
|
||||
return downloaded_size;
|
||||
|
||||
std::lock_guard download_lock(download_mutex);
|
||||
return downloaded_size;
|
||||
}
|
||||
|
||||
String FileSegment::getCallerId()
|
||||
@ -174,7 +183,12 @@ void FileSegment::write(const char * from, size_t size)
|
||||
try
|
||||
{
|
||||
cache_writer->write(from, size);
|
||||
|
||||
std::lock_guard download_lock(download_mutex);
|
||||
|
||||
cache_writer->next();
|
||||
|
||||
downloaded_size += size;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -189,9 +203,6 @@ void FileSegment::write(const char * from, size_t size)
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
std::lock_guard segment_lock(mutex);
|
||||
downloaded_size += size;
|
||||
}
|
||||
|
||||
FileSegment::State FileSegment::wait()
|
||||
@ -225,15 +236,15 @@ bool FileSegment::reserve(size_t size)
|
||||
{
|
||||
std::lock_guard segment_lock(mutex);
|
||||
|
||||
auto caller_id = getCallerId();
|
||||
if (downloader_id != caller_id)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
|
||||
|
||||
if (downloaded_size + size > range().size())
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
||||
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
|
||||
size, range().toString(), downloaded_size);
|
||||
|
||||
auto caller_id = getCallerId();
|
||||
if (downloader_id != caller_id)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
|
||||
|
||||
assert(reserved_size >= downloaded_size);
|
||||
}
|
||||
|
||||
@ -323,7 +334,7 @@ void FileSegment::complete()
|
||||
if (download_state == State::SKIP_CACHE || detached)
|
||||
return;
|
||||
|
||||
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
|
||||
if (download_state != State::DOWNLOADED && getDownloadedSize(segment_lock) == range().size())
|
||||
setDownloaded(segment_lock);
|
||||
|
||||
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
|
||||
@ -350,10 +361,11 @@ void FileSegment::completeImpl(bool allow_non_strict_checking)
|
||||
|
||||
if (!download_can_continue)
|
||||
{
|
||||
if (!downloaded_size)
|
||||
size_t current_downloaded_size = getDownloadedSize(segment_lock);
|
||||
if (current_downloaded_size == 0)
|
||||
{
|
||||
download_state = State::SKIP_CACHE;
|
||||
LOG_TEST(log, "Remove cell {} (downloaded: {})", range().toString(), downloaded_size);
|
||||
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
|
||||
cache->remove(key(), offset(), cache_lock, segment_lock);
|
||||
|
||||
detached = true;
|
||||
@ -366,7 +378,7 @@ void FileSegment::completeImpl(bool allow_non_strict_checking)
|
||||
* in FileSegmentsHolder represent a contiguous range, so we can resize
|
||||
* it only when nobody needs it.
|
||||
*/
|
||||
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
|
||||
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size);
|
||||
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
|
||||
|
||||
detached = true;
|
||||
@ -397,7 +409,7 @@ String FileSegment::getInfoForLog() const
|
||||
WriteBufferFromOwnString info;
|
||||
info << "File segment: " << range().toString() << ", ";
|
||||
info << "state: " << download_state << ", ";
|
||||
info << "downloaded size: " << downloaded_size << ", ";
|
||||
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
|
||||
info << "downloader id: " << downloader_id << ", ";
|
||||
info << "caller id: " << getCallerId();
|
||||
|
||||
|
@ -129,6 +129,7 @@ private:
|
||||
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
|
||||
static String getCallerIdImpl(bool allow_non_strict_checking = false);
|
||||
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);
|
||||
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
|
||||
|
||||
const Range segment_range;
|
||||
|
||||
@ -144,6 +145,14 @@ private:
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
|
||||
/// Protects downloaded_size access with actual write into fs.
|
||||
/// downloaded_size is not protected by download_mutex in methods which
|
||||
/// can never be run in parallel to FileSegment::write() method
|
||||
/// as downloaded_size is updated only in FileSegment::write() method.
|
||||
/// Such methods are identified by isDownloader() check at their start,
|
||||
/// e.g. they are executed strictly by the same thread, sequentially.
|
||||
mutable std::mutex download_mutex;
|
||||
|
||||
Key file_key;
|
||||
IFileCache * cache;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user