From 545c6c8be4a1e66e27985d2a33242b7b2c6a047f Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 29 Aug 2022 17:50:27 +0200 Subject: [PATCH] Fix --- src/Common/FileSegment.cpp | 56 +++++++++---------- src/Common/ProfileEvents.cpp | 1 + .../IO/CachedOnDiskReadBufferFromFile.cpp | 2 +- .../IO/CachedOnDiskWriteBufferFromFile.cpp | 6 +- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- src/Interpreters/FilesystemCacheLog.cpp | 4 +- src/Interpreters/FilesystemCacheLog.h | 2 +- 7 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 9252d27f754..b12cd7ab18e 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -818,37 +818,30 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) if (file_segment.isDetached()) return; - if (file_segment.getDownloadedSize() > 0) + /// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity + /// was initially set with a margin as `max_file_segment_size`. => We need to always + /// resize to actual size after download finished. + if (file_segment.downloaded_size != file_segment.range().size()) { - file_segment.getOrSetDownloader(); + /// Current file segment is downloaded as a part of write-through cache + /// and therefore cannot be concurrently accessed. Nevertheless, it can be + /// accessed by cache system tables if someone read from them, + /// therefore we need a mutex. + std::lock_guard segment_lock(file_segment.mutex); - { - /// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity - /// was initially set with a margin as `max_file_segment_size`. => We need to always - /// resize to actual size after download finished. - - /// Current file segment is downloaded as a part of write-through cache - /// and therefore cannot be concurrently accessed. Nevertheless, it can be - /// accessed by cache system tables if someone read from them, - /// therefore we need a mutex. - std::lock_guard segment_lock(file_segment.mutex); - - assert(file_segment.downloaded_size <= file_segment.range().size()); - file_segment.segment_range = FileSegment::Range( - file_segment.segment_range.left, - file_segment.segment_range.left + file_segment.downloaded_size - 1); - file_segment.reserved_size = file_segment.downloaded_size; - } - - file_segment.completeWithState(FileSegment::State::DOWNLOADED); - - on_complete_file_segment_func(file_segment); + assert(file_segment.downloaded_size <= file_segment.range().size()); + file_segment.segment_range = FileSegment::Range( + file_segment.segment_range.left, + file_segment.segment_range.left + file_segment.downloaded_size - 1); + file_segment.reserved_size = file_segment.downloaded_size; } - else + { std::lock_guard cache_lock(cache->mutex); file_segment.completeWithoutState(cache_lock); } + + on_complete_file_segment_func(file_segment); } bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent) @@ -877,22 +870,27 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset offset, current_file_segment_write_offset); } - if ((*current_file_segment_it)->getRemainingSizeToDownload() == 0) + auto current_file_segment = *current_file_segment_it; + if (current_file_segment->getRemainingSizeToDownload() == 0) { - completeFileSegment(**current_file_segment_it); + completeFileSegment(*current_file_segment); current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); } - else if ((*current_file_segment_it)->getDownloadOffset() != offset) + else if (current_file_segment->getDownloadOffset() != offset) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Cannot file segment download offset {} does not match current write offset {}", - (*current_file_segment_it)->getDownloadOffset(), offset); + current_file_segment->getDownloadOffset(), offset); } } auto & file_segment = *current_file_segment_it; - file_segment->getOrSetDownloader(); + + auto downloader = file_segment->getOrSetDownloader(); + if (downloader != FileSegment::getCallerId()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog()); + SCOPE_EXIT({ file_segment->resetDownloader(); }); diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 297b3bdb59d..5f6de294c51 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -318,6 +318,7 @@ The server successfully detected this situation and will download merged part fr \ M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \ M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \ + M(FileSegmentWriteMicroseconds, "Metric per file segment. Time spend writing cache") \ M(FileSegmentCacheWriteMicroseconds, "Metric per file segment. Time spend writing data to cache") \ M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent predownloading data to cache (predownloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \ M(FileSegmentUsedBytes, "Metric per file segment. How many bytes were actually used from current file segment") \ diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index dac59c596f5..991b4dae864 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -80,7 +80,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog( .file_segment_range = { file_segment_range.left, file_segment_range.right }, .requested_range = { first_offset, read_until_position }, .file_segment_size = file_segment_range.size(), - .cache_attempted = true, + .read_from_cache_attempted = true, .read_buffer_id = current_buffer_id, .profile_counters = std::make_shared( current_file_segment_counters.getPartiallyAtomicSnapshot()), diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index 300ce774d61..21c120fd4c8 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -11,6 +11,7 @@ namespace ProfileEvents { extern const Event CachedWriteBufferCacheWriteBytes; extern const Event CachedWriteBufferCacheWriteMicroseconds; + extern const Event FileSegmentWriteMicroseconds; } namespace DB @@ -118,6 +119,9 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size) ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteBytes, size); ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteMicroseconds, watch.elapsedMicroseconds()); + + current_file_segment_counters.increment( + ProfileEvents::FileSegmentWriteMicroseconds, watch.elapsedMicroseconds()); } void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment & file_segment) @@ -134,7 +138,7 @@ void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment .requested_range = {}, .cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE, .file_segment_size = file_segment_range.size(), - .cache_attempted = false, + .read_from_cache_attempted = false, .read_buffer_id = {}, .profile_counters = std::make_shared(current_file_segment_counters.getPartiallyAtomicSnapshot()), }; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 148a20a7ba0..01d4154199a 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -81,7 +81,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog() .file_segment_range = { 0, current_file_size }, .cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE, .file_segment_size = total_bytes_read_from_current_file, - .cache_attempted = false, + .read_from_cache_attempted = false, }; if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog()) diff --git a/src/Interpreters/FilesystemCacheLog.cpp b/src/Interpreters/FilesystemCacheLog.cpp index 547df77786e..a0c8986a8ca 100644 --- a/src/Interpreters/FilesystemCacheLog.cpp +++ b/src/Interpreters/FilesystemCacheLog.cpp @@ -42,7 +42,7 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes() {"total_requested_range", std::make_shared(types)}, {"size", std::make_shared()}, {"read_type", std::make_shared()}, - {"cache_attempted", std::make_shared()}, + {"read_from_cache_attempted", std::make_shared()}, {"ProfileEvents", std::make_shared(std::make_shared(), std::make_shared())}, {"read_buffer_id", std::make_shared()}, }; @@ -62,7 +62,7 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(Tuple{requested_range.first, requested_range.second}); columns[i++]->insert(file_segment_size); columns[i++]->insert(typeToString(cache_type)); - columns[i++]->insert(cache_attempted); + columns[i++]->insert(read_from_cache_attempted); if (profile_counters) { diff --git a/src/Interpreters/FilesystemCacheLog.h b/src/Interpreters/FilesystemCacheLog.h index 00863c36c79..bf5361ef324 100644 --- a/src/Interpreters/FilesystemCacheLog.h +++ b/src/Interpreters/FilesystemCacheLog.h @@ -41,7 +41,7 @@ struct FilesystemCacheLogElement std::pair requested_range{}; CacheType cache_type{}; size_t file_segment_size; - bool cache_attempted; + bool read_from_cache_attempted; String read_buffer_id; std::shared_ptr profile_counters;