From 692b247379f9c333ee9bcf3976d536b7817c2c0c Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 7 Mar 2022 14:30:57 +0100 Subject: [PATCH] Fix some corner cases --- src/Common/FileCache.cpp | 34 +++++++++++++++++++ src/Common/FileCache.h | 4 +++ src/Disks/IDiskRemote.cpp | 13 +++++++ ...chronousReadIndirectBufferFromRemoteFS.cpp | 16 +++++---- src/Disks/IO/CachedReadBufferFromRemoteFS.cpp | 12 ++++--- src/Disks/IO/CachedReadBufferFromRemoteFS.h | 2 ++ src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 8 +++++ src/Disks/IO/ReadBufferFromRemoteFSGather.h | 2 ++ src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 2 ++ src/IO/ThreadPoolReader.cpp | 22 +++++++++++- 10 files changed, 104 insertions(+), 11 deletions(-) diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 0cf1fd9d27a..1476337459c 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -429,6 +429,40 @@ bool LRUFileCache::tryReserve( return true; } +void LRUFileCache::remove(const Key & key) +{ + std::lock_guard cache_lock(mutex); + + auto it = files.find(key); + if (it == files.end()) + return; + + auto & offsets = it->second; + + std::vector to_remove; + to_remove.reserve(offsets.size()); + + for (auto & [offset, cell] : offsets) + to_remove.push_back(&cell); + + for (auto & cell : to_remove) + { + auto file_segment = cell->file_segment; + if (file_segment) + { + std::lock_guard segment_lock(file_segment->mutex); + remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); + } + } + + auto key_path = getPathInLocalCache(key); + + files.erase(key); + + if (fs::exists(key_path)) + fs::remove(key_path); +} + void LRUFileCache::remove( Key key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index f6295bd73f5..d86c13361fd 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -41,6 +41,8 @@ public: /// Restore cache from local filesystem. virtual void initialize() = 0; + virtual void remove(const Key & key) = 0; + static bool shouldBypassCache(); /// Cache capacity in bytes. @@ -115,6 +117,8 @@ public: void initialize() override; + void remove(const Key & key) override; + private: using FileKeyAndOffset = std::pair; using LRUQueue = std::list; diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index f62542acbad..5fa0f9c703c 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -281,7 +282,16 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p if (metadata.ref_count == 0) { for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects) + { fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path); + + if (cache) + { + auto key = cache->hash(remote_fs_object_path); + cache->remove(key); + } + } + return false; } else /// In other case decrement number of references, save metadata and delete hardlink. @@ -441,6 +451,7 @@ void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_onl { RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); removeMetadata(path, fs_paths_keeper); + if (!delete_metadata_only) removeFromRemoteFS(fs_paths_keeper); } @@ -449,6 +460,7 @@ void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_onl void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only) { RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + if (metadata_disk->exists(path)) { removeMetadata(path, fs_paths_keeper); @@ -475,6 +487,7 @@ void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadat { RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); removeMetadataRecursive(path, fs_paths_keeper); + if (!delete_metadata_only) removeFromRemoteFS(fs_paths_keeper); } diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index a5cd3845e4b..e693a8e9ea8 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -87,8 +87,8 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead() return false; if (file_offset_of_buffer_end > *read_until_position) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", - file_offset_of_buffer_end, *read_until_position); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}, info: {})", + file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog()); } else if (must_read_until_position) throw Exception(ErrorCodes::LOGICAL_ERROR, @@ -136,8 +136,11 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos if (prefetch_future.valid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition"); - read_until_position = position; - impl->setReadUntilPosition(*read_until_position); + if (position > read_until_position) + { + read_until_position = position; + impl->setReadUntilPosition(*read_until_position); + } } @@ -171,7 +174,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() LOG_TEST(log, "Current size: {}, offset: {}", size, offset); /// If prefetch_future is valid, size should always be greater than zero. - assert(offset < size && size > 0); + assert(offset < size); ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); } @@ -188,7 +191,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() auto offset = result.offset; LOG_TEST(log, "Current size: {}, offset: {}", size, offset); - assert(offset < size || size == 0); + assert(offset < size); if (size) { @@ -198,6 +201,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + assert(file_offset_of_buffer_end == impl->getImplementationBufferOffset()); prefetch_future = {}; return size; diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index d430a65debe..9356719097a 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -604,17 +604,16 @@ bool CachedReadBufferFromRemoteFS::nextImpl() } } - /// Local filesystem does not support bounded reads. - if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) + if (std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) { size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; size = std::min(size, remaining_size_to_read); - implementation_buffer->buffer().resize(size); + implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size); } file_offset_of_buffer_end += size; - assert(read_type == ReadType::CACHED || file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd()); + // assert(read_type == ReadType::CACHED || file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd()); } swap(*implementation_buffer); @@ -671,6 +670,11 @@ size_t CachedReadBufferFromRemoteFS::getTotalSizeToRead() return read_until_position - file_offset_of_buffer_end; } +void CachedReadBufferFromRemoteFS::setReadUntilPosition(size_t) +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "Method `setReadUntilPosition()` not allowed"); +} + off_t CachedReadBufferFromRemoteFS::getPosition() { return file_offset_of_buffer_end - available(); diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index 6d48a3a5f64..f4dea9d9fa6 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -31,6 +31,8 @@ public: String getInfoForLog() override; + void setReadUntilPosition(size_t position) override; + private: void initialize(size_t offset, size_t size); diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 9a529c70cf7..06e31799abf 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -269,5 +269,13 @@ String ReadBufferFromRemoteFSGather::getInfoForLog() return current_buf->getInfoForLog(); } +size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const +{ + if (!current_buf) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer not initialized"); + + return current_buf->getFileOffsetOfBufferEnd(); +} + } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 5c1ef07afbd..25bfe0b7e16 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -52,6 +52,8 @@ public: String getInfoForLog(); + size_t getImplementationBufferOffset() const; + protected: virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0; diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 590ce9f8f6a..3bfaaf514fa 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -74,6 +74,8 @@ std::future ThreadPoolRemoteFSReader::submit(Reques ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); + thread_status.detachQuery(); + return Result{ .size = bytes_read, .offset = offset }; }); diff --git a/src/IO/ThreadPoolReader.cpp b/src/IO/ThreadPoolReader.cpp index 0c2791c6f68..c06a769dfbc 100644 --- a/src/IO/ThreadPoolReader.cpp +++ b/src/IO/ThreadPoolReader.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -184,9 +185,26 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss); - auto task = std::make_shared>([request, fd] + ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup() + ? CurrentThread::get().getThreadGroup() + : MainThreadStatus::getInstance().getThreadGroup(); + + ContextPtr query_context; + if (CurrentThread::isInitialized) + query_context = CurrentThread::get().getQueryContext(); + + auto task = std::make_shared>([request, fd, running_group, query_context] { + ThreadStatus thread_status; + + if (query_context) + thread_status.attachQueryContext(query_context); + + if (running_group) + thread_status.attachQuery(running_group); + setThreadName("ThreadPoolRead"); + Stopwatch watch(CLOCK_MONOTONIC); size_t bytes_read = 0; @@ -219,6 +237,8 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + thread_status.detachQuery(); + return Result{ .size = bytes_read, .offset = request.ignore }; });