diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 2e05e7a7202..5d5851d0b84 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -57,7 +57,7 @@ String IFileCache::getPathInLocalCache(const Key & key) return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str; } -bool IFileCache::shouldBypassCache() +bool IFileCache::isReadOnly() { return !CurrentThread::isInitialized() || !CurrentThread::get().getQueryContext() @@ -708,7 +708,7 @@ bool LRUFileCache::isLastFileSegmentHolder( return cell->file_segment.use_count() == 2; } -FileSegmentsHolder LRUFileCache::getAll() +FileSegments LRUFileCache::getSnapshot() const { std::lock_guard cache_lock(mutex); @@ -717,10 +717,10 @@ FileSegmentsHolder LRUFileCache::getAll() for (const auto & [key, cells_by_offset] : files) { for (const auto & [offset, cell] : cells_by_offset) - file_segments.push_back(cell.file_segment); + file_segments.push_back(FileSegment::getSnapshot(cell.file_segment)); } - return FileSegmentsHolder(std::move(file_segments)); + return file_segments; } std::vector LRUFileCache::tryGetCachePaths(const Key & key) diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 089bdb633c0..e706376bc89 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -44,7 +44,7 @@ public: virtual void tryRemoveAll() = 0; - static bool shouldBypassCache(); + static bool isReadOnly(); /// Cache capacity in bytes. size_t capacity() const { return max_size; } @@ -72,10 +72,10 @@ public: */ virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0; - virtual FileSegmentsHolder getAll() = 0; - virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0; + virtual FileSegments getSnapshot() const = 0; + /// For debug. virtual String dumpStructure(const Key & key) = 0; @@ -124,7 +124,7 @@ public: FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override; - FileSegmentsHolder getAll() override; + FileSegments getSnapshot() const override; FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override; diff --git a/src/Common/FileCacheSettings.h b/src/Common/FileCacheSettings.h index 53c28400c86..0b34e1e3d82 100644 --- a/src/Common/FileCacheSettings.h +++ b/src/Common/FileCacheSettings.h @@ -2,7 +2,7 @@ #include -namespace Poco { namespace Util { class AbstractConfiguration; }} +namespace Poco { namespace Util { class AbstractConfiguration; } } namespace DB { diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index d8e7a994df4..4def08c6817 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -73,6 +73,12 @@ size_t FileSegment::getDownloadOffset() const return range().left + getDownloadedSize(segment_lock); } +size_t FileSegment::getDownloadedSize() const +{ + std::lock_guard segment_lock(mutex); + return getDownloadedSize(segment_lock); +} + size_t FileSegment::getDownloadedSize(std::lock_guard & /* segment_lock */) const { if (download_state == State::DOWNLOADED) @@ -84,24 +90,15 @@ size_t FileSegment::getDownloadedSize(std::lock_guard & /* segment_l String FileSegment::getCallerId() { - return getCallerIdImpl(false); + return getCallerIdImpl(); } -String FileSegment::getCallerIdImpl(bool allow_non_strict_checking) +String FileSegment::getCallerIdImpl() { - if (IFileCache::shouldBypassCache()) - { - /// getCallerId() can be called from completeImpl(), which can be called from complete(). - /// complete() is called from destructor of CachedReadBufferFromRemoteFS when there is no query id anymore. - /// Allow non strict checking in this case. This works correctly as if getCallerIdImpl() is called from destructor, - /// then we know that caller is not a downloader, because downloader is reset each nextImpl() call either - /// manually or via SCOPE_EXIT. - - if (allow_non_strict_checking) - return "None"; - - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot use cache without query id"); - } + if (!CurrentThread::isInitialized() + || !CurrentThread::get().getQueryContext() + || CurrentThread::getQueryId().size == 0) + return "None:" + toString(getThreadId()); return CurrentThread::getQueryId().toString() + ":" + toString(getThreadId()); } @@ -244,15 +241,9 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) { std::lock_guard segment_lock(mutex); - auto info = getInfoForLogImpl(segment_lock); - e.addMessage("while writing into cache, info: " + info); + wrapWithCacheInfo(e, "while writing into cache", segment_lock); - LOG_ERROR(log, "Failed to write to cache. File segment info: {}", info); - - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; - - cache_writer->finalize(); - cache_writer.reset(); + setDownloadFailed(segment_lock); cv.notify_all(); @@ -265,7 +256,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) void FileSegment::writeInMemory(const char * from, size_t size) { if (!size) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed"); + throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Attempt to write zero size cache file"); if (availableSize() < size) throw Exception( @@ -284,14 +275,13 @@ void FileSegment::writeInMemory(const char * from, size_t size) { cache_writer->write(from, size); } - catch (...) + catch (Exception & e) { - LOG_ERROR(log, "Failed to write to cache. File segment info: {}", getInfoForLogImpl(segment_lock)); + wrapWithCacheInfo(e, "while writing into cache", segment_lock); - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + setDownloadFailed(segment_lock); - cache_writer->finalize(); - cache_writer.reset(); + cv.notify_all(); throw; } @@ -313,23 +303,23 @@ size_t FileSegment::finalizeWrite() { cache_writer->next(); } - catch (...) + catch (Exception & e) { - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + wrapWithCacheInfo(e, "while writing into cache", segment_lock); - cache_writer->finalize(); - cache_writer.reset(); + setDownloadFailed(segment_lock); + + cv.notify_all(); throw; } downloaded_size += size; - cache_writer.reset(); - downloader_id.clear(); - download_state = State::DOWNLOADED; if (downloaded_size != range().size()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} == {}", downloaded_size, range().size()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected downloaded size to equal file segment size ({} == {})", downloaded_size, range().size()); + + setDownloaded(segment_lock); return size; } @@ -398,6 +388,20 @@ void FileSegment::setDownloaded(std::lock_guard & /* segment_lock */ { download_state = State::DOWNLOADED; is_downloaded = true; + downloader_id.clear(); + + if (cache_writer) + { + cache_writer->finalize(); + cache_writer.reset(); + remote_file_reader.reset(); + } +} + +void FileSegment::setDownloadFailed(std::lock_guard & /* segment_lock */) +{ + download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + downloader_id.clear(); if (cache_writer) { @@ -455,7 +459,7 @@ void FileSegment::complete(State state) } catch (...) { - if (!downloader_id.empty() && downloader_id == getCallerIdImpl(true)) + if (!downloader_id.empty() && downloader_id == getCallerIdImpl()) downloader_id.clear(); cv.notify_all(); @@ -480,7 +484,7 @@ void FileSegment::complete(std::lock_guard & cache_lock) /// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the /// downloader or the only owner of the segment. - bool can_update_segment_state = downloader_id == getCallerIdImpl(true) + bool can_update_segment_state = downloader_id == getCallerIdImpl() || cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); if (can_update_segment_state) @@ -489,11 +493,11 @@ void FileSegment::complete(std::lock_guard & cache_lock) try { - completeImpl(cache_lock, segment_lock, /* allow_non_strict_checking */true); + completeImpl(cache_lock, segment_lock); } catch (...) { - if (!downloader_id.empty() && downloader_id == getCallerIdImpl(true)) + if (!downloader_id.empty() && downloader_id == getCallerIdImpl()) downloader_id.clear(); cv.notify_all(); @@ -503,7 +507,7 @@ void FileSegment::complete(std::lock_guard & cache_lock) cv.notify_all(); } -void FileSegment::completeImpl(std::lock_guard & cache_lock, std::lock_guard & segment_lock, bool allow_non_strict_checking) +void FileSegment::completeImpl(std::lock_guard & cache_lock, std::lock_guard & segment_lock) { bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); @@ -539,7 +543,7 @@ void FileSegment::completeImpl(std::lock_guard & cache_lock, std::lo } } - if (!downloader_id.empty() && (downloader_id == getCallerIdImpl(allow_non_strict_checking) || is_last_holder)) + if (!downloader_id.empty() && (downloader_id == getCallerIdImpl() || is_last_holder)) { LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state)); downloader_id.clear(); @@ -566,6 +570,11 @@ String FileSegment::getInfoForLogImpl(std::lock_guard & segment_lock return info.str(); } +void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const +{ + e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogImpl(segment_lock))); +} + String FileSegment::stateToString(FileSegment::State state) { switch (state) @@ -599,6 +608,22 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard & /* segment assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0); } +FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment) +{ + auto snapshot = std::make_shared( + file_segment->offset(), + file_segment->range().size(), + file_segment->key(), + nullptr, + file_segment->state()); + + snapshot->hits_count = file_segment->getHitsCount(); + snapshot->ref_count = file_segment.use_count(); + snapshot->downloaded_size = file_segment->getDownloadedSize(); + + return snapshot; +} + FileSegmentsHolder::~FileSegmentsHolder() { /// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index c9e4146c726..a02d8e85a46 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -97,6 +97,11 @@ public: void write(const char * from, size_t size, size_t offset_); + /** + * writeInMemory and finalizeWrite are used together to write a single file with delay. + * Both can be called only once, one after another. Used for writing cache via threadpool + * on wrote operations. TODO: this solution is temporary, until adding a separate cache layer. + */ void writeInMemory(const char * from, size_t size); size_t finalizeWrite(); @@ -121,18 +126,24 @@ public: size_t getDownloadOffset() const; + size_t getDownloadedSize() const; + void completeBatchAndResetDownloader(); void complete(State state); String getInfoForLog() const; - size_t hits() const { return hits_num; } + size_t getHitsCount() const { return hits_count; } - void hit() { ++hits_num; } + size_t getRefCount() const { return ref_count; } + + void incrementHitsCount() { ++hits_count; } void assertCorrectness() const; + static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment); + private: size_t availableSize() const { return reserved_size - downloaded_size; } @@ -141,6 +152,9 @@ private: void assertCorrectnessImpl(std::lock_guard & segment_lock) const; void setDownloaded(std::lock_guard & segment_lock); + void setDownloadFailed(std::lock_guard & segment_lock); + + void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const; bool lastFileSegmentHolder() const; @@ -152,9 +166,9 @@ private: void completeImpl( std::lock_guard & cache_lock, - std::lock_guard & segment_lock, bool allow_non_strict_checking = false); + std::lock_guard & segment_lock); - static String getCallerIdImpl(bool allow_non_strict_checking = false); + static String getCallerIdImpl(); void resetDownloaderImpl(std::lock_guard & segment_lock); @@ -188,7 +202,8 @@ private: bool detached = false; std::atomic is_downloaded{false}; - std::atomic hits_num = 0; /// cache hits. + std::atomic hits_count = 0; /// cache hits. + std::atomic ref_count = 0; /// Used for getting snapshot state }; struct FileSegmentsHolder : private boost::noncopyable diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index 178caa0c496..a86f13f55af 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -206,6 +206,8 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode return DiskDecorator::writeFile(path, buf_size, mode, settings); WriteSettings current_settings = settings; + /// There are two different cache implementations. Disable second one if the first is enabled. + /// The firts will soon be removed, this disabling is temporary. current_settings.enable_filesystem_cache_on_write_operations = false; LOG_TEST(log, "Write file {} to cache", backQuote(path)); diff --git a/src/Disks/DiskWebServer.h b/src/Disks/DiskWebServer.h index 94ba32939da..6341b582174 100644 --- a/src/Disks/DiskWebServer.h +++ b/src/Disks/DiskWebServer.h @@ -77,7 +77,7 @@ public: UInt64 getTotalSpace() const final override { return std::numeric_limits::max(); } UInt64 getAvailableSpace() const final override { return std::numeric_limits::max(); } -UInt64 getUnreservedSpace() const final override { return std::numeric_limits::max(); } + UInt64 getUnreservedSpace() const final override { return std::numeric_limits::max(); } /// Read-only part diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index 0f430e69a5e..fb1c0ddc378 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -343,9 +343,9 @@ void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeepe } } -std::vector IDiskRemote::getRemotePaths(const String & path) const +std::vector IDiskRemote::getRemotePaths(const String & local_path) const { - auto metadata = readMetadata(path); + auto metadata = readMetadata(local_path); std::vector remote_paths; for (const auto & [remote_path, _] : metadata.remote_fs_objects) @@ -354,16 +354,16 @@ std::vector IDiskRemote::getRemotePaths(const String & path) const return remote_paths; } -void IDiskRemote::getRemotePathsRecursive(const String & path, std::vector & paths_map) +void IDiskRemote::getRemotePathsRecursive(const String & local_path, std::vector & paths_map) { - if (metadata_disk->isFile(path)) + if (metadata_disk->isFile(local_path)) { - paths_map.emplace_back(path, getRemotePaths(path)); + paths_map.emplace_back(local_path, getRemotePaths(local_path)); } else { - for (auto it = iterateDirectory(path); it->isValid(); it->next()) - IDiskRemote::getRemotePathsRecursive(fs::path(path) / it->name(), paths_map); + for (auto it = iterateDirectory(local_path); it->isValid(); it->next()) + IDiskRemote::getRemotePathsRecursive(fs::path(local_path) / it->name(), paths_map); } } diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index 6b16a1f753c..a8a299391bf 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -68,9 +68,9 @@ public: String getCacheBasePath() const final override; - std::vector getRemotePaths(const String & path) const final override; + std::vector getRemotePaths(const String & local_path) const final override; - void getRemotePathsRecursive(const String & path, std::vector & paths_map) override; + void getRemotePathsRecursive(const String & local_path, std::vector & paths_map) override; /// Methods for working with metadata. For some operations (like hardlink /// creation) metadata can be updated concurrently from multiple threads diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index a2e60a1937e..b9f7457447e 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -389,7 +389,7 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext() implementation_buffer = getImplementationBuffer(*current_file_segment_it); if (read_type == ReadType::CACHED) - (*current_file_segment_it)->hit(); + (*current_file_segment_it)->incrementHitsCount(); LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString()); return true; @@ -573,8 +573,6 @@ bool CachedReadBufferFromRemoteFS::nextImpl() bool CachedReadBufferFromRemoteFS::nextImplStep() { - assertCacheAllowed(); - last_caller_id = FileSegment::getCallerId(); if (!initialized) @@ -623,7 +621,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep() implementation_buffer = getImplementationBuffer(*current_file_segment_it); if (read_type == ReadType::CACHED) - (*current_file_segment_it)->hit(); + (*current_file_segment_it)->incrementHitsCount(); } assert(!internal_buffer.empty()); @@ -820,12 +818,6 @@ std::optional CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset() return std::nullopt; } -void CachedReadBufferFromRemoteFS::assertCacheAllowed() const -{ - if (IFileCache::shouldBypassCache() && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache used when not allowed"); -} - String CachedReadBufferFromRemoteFS::getInfoForLog() { auto implementation_buffer_read_range_str = diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index 5d632e62c0f..5fc9ec39246 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -50,8 +50,6 @@ private: bool nextImplStep(); - void assertCacheAllowed() const; - enum class ReadType { CACHED, diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 18c61e1d704..7014b21e8b4 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -38,7 +38,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S current_path = path; auto cache = settings.remote_fs_cache; - bool with_cache = cache && settings.enable_filesystem_cache && !IFileCache::shouldBypassCache(); + bool with_cache = cache && settings.enable_filesystem_cache; auto remote_file_reader_creator = [=, this]() { diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 15d0eece624..b1ae42d03d6 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -1,6 +1,5 @@ #include "ThreadPoolRemoteFSReader.h" -#include #include #include #include @@ -51,25 +50,6 @@ std::future ThreadPoolRemoteFSReader::submit(Reques if (CurrentThread::isInitialized()) query_context = CurrentThread::get().getQueryContext(); - if (!query_context) - { - if (!shared_query_context) - { - ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr; - if (global_context) - { - shared_query_context = Context::createCopy(global_context); - shared_query_context->makeQueryContext(); - } - } - - if (shared_query_context) - { - shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())); - query_context = shared_query_context; - } - } - auto task = std::make_shared>([request, running_group, query_context] { ThreadStatus thread_status; diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index a2a1e77c834..b2d5f11724a 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -15,7 +15,6 @@ class ThreadPoolRemoteFSReader : public IAsynchronousReader private: ThreadPool pool; - ContextMutablePtr shared_query_context; public: ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_); diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 07d27f67d1e..d879953bd9e 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -230,7 +230,7 @@ std::unique_ptr DiskS3::readFile(const String & path, co ReadSettings disk_read_settings{read_settings}; if (cache) { - if (IFileCache::shouldBypassCache()) + if (IFileCache::isReadOnly()) disk_read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true; disk_read_settings.remote_fs_cache = cache; @@ -272,7 +272,8 @@ std::unique_ptr DiskS3::writeFile(const String & path, LOG_TRACE(log, "{} to file by path: {}. S3 path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_name); - bool cache_on_insert = fs::path(path).extension() != ".tmp" + bool cache_on_write = cache + && fs::path(path).extension() != ".tmp" && write_settings.enable_filesystem_cache_on_write_operations && FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations; @@ -285,7 +286,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, settings->s3_upload_part_size_multiply_parts_count_threshold, settings->s3_max_single_part_upload_size, std::move(object_metadata), - buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), blob_name, cache_on_insert ? cache : nullptr); + buf_size, threadPoolCallbackRunner(getThreadPoolWriter()), blob_name, cache_on_write ? cache : nullptr); auto create_metadata_callback = [this, path, blob_name, mode] (size_t count) { diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index 64550e9430b..f036d6a08c8 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -33,7 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock & /*buffer auto worker = read_workers.emplace_back(std::make_shared(std::move(reader))); - schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, nullptr); + schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }); return true; } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 86f4366ec8d..c85f3989531 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -100,28 +100,6 @@ void WriteBufferFromS3::nextImpl() ? CurrentThread::get().getThreadGroup() : MainThreadStatus::getInstance().getThreadGroup(); - if (CurrentThread::isInitialized()) - query_context = CurrentThread::get().getQueryContext(); - - if (!query_context) - { - if (!shared_query_context) - { - ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr; - if (global_context) - { - shared_query_context = Context::createCopy(global_context); - shared_query_context->makeQueryContext(); - } - } - - if (shared_query_context) - { - shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())); - query_context = shared_query_context; - } - } - if (cacheEnabled()) { if (blob_name.empty()) @@ -132,8 +110,10 @@ void WriteBufferFromS3::nextImpl() current_download_offset += size; size_t remaining_size = size; - for (const auto & file_segment : file_segments_holder->file_segments) + auto & file_segments = file_segments_holder->file_segments; + for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end(); ++file_segment_it) { + auto & file_segment = *file_segment_it; size_t current_size = std::min(file_segment->range().size(), remaining_size); remaining_size -= current_size; @@ -143,6 +123,7 @@ void WriteBufferFromS3::nextImpl() } else { + file_segments.erase(file_segment_it, file_segments.end()); break; } } @@ -190,7 +171,7 @@ WriteBufferFromS3::~WriteBufferFromS3() bool WriteBufferFromS3::cacheEnabled() const { - return cache != nullptr && !IFileCache::shouldBypassCache(); + return cache != nullptr; } void WriteBufferFromS3::preFinalize() @@ -317,7 +298,7 @@ void WriteBufferFromS3::writePart() /// Releasing lock and condvar notification. bg_tasks_condvar.notify_one(); } - }, query_context); + }); } else { @@ -454,7 +435,7 @@ void WriteBufferFromS3::makeSinglepartUpload() /// Releasing lock and condvar notification. bg_tasks_condvar.notify_one(); } - }, query_context); + }); } else { diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index d1e51b0c7f9..8e91bbc04da 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -33,7 +33,7 @@ namespace Aws::S3::Model namespace DB { -using ScheduleFunc = std::function, ContextPtr)>; +using ScheduleFunc = std::function)>; class WriteBufferFromFile; /** @@ -128,8 +128,6 @@ private: size_t current_download_offset = 0; std::optional file_segments_holder; static void finalizeCacheIfNeeded(std::optional &); - ContextMutablePtr shared_query_context; - ContextPtr query_context; }; } diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index af26452e8e6..3464bb31664 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -3,6 +3,7 @@ namespace DB { +/// Settings to be passed to IDisk::writeFile() struct WriteSettings { bool enable_filesystem_cache_on_write_operations = false; diff --git a/src/Interpreters/threadPoolCallbackRunner.cpp b/src/Interpreters/threadPoolCallbackRunner.cpp index 9eeea986d09..288079e49d2 100644 --- a/src/Interpreters/threadPoolCallbackRunner.cpp +++ b/src/Interpreters/threadPoolCallbackRunner.cpp @@ -9,19 +9,14 @@ namespace DB CallbackRunner threadPoolCallbackRunner(ThreadPool & pool) { - return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback, ContextPtr query_context) mutable + return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback) mutable { pool->scheduleOrThrow( - [&, callback = std::move(callback), thread_group, query_context]() + [&, callback = std::move(callback), thread_group]() { if (thread_group) CurrentThread::attachTo(thread_group); - std::optional query_scope; - - if (query_context && !CurrentThread::get().getQueryContext()) - query_scope.emplace(query_context); - SCOPE_EXIT_SAFE({ if (thread_group) CurrentThread::detachQueryIfNotDetached(); diff --git a/src/Interpreters/threadPoolCallbackRunner.h b/src/Interpreters/threadPoolCallbackRunner.h index 8d9d5d4d45b..59d06f2f1bc 100644 --- a/src/Interpreters/threadPoolCallbackRunner.h +++ b/src/Interpreters/threadPoolCallbackRunner.h @@ -7,7 +7,7 @@ namespace DB { /// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously -using CallbackRunner = std::function, ContextPtr)>; +using CallbackRunner = std::function)>; /// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()' CallbackRunner threadPoolCallbackRunner(ThreadPool & pool); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 005d8093bba..4fb993bfcc7 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -27,7 +27,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( MergeTreeWriterSettings writer_settings( global_settings, - WriteSettings{}, + data_part->storage.getContext()->getWriteSettings(), storage_settings, index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), /* rewrite_primary_key = */false); diff --git a/src/Storages/System/StorageSystemFilesystemCache.cpp b/src/Storages/System/StorageSystemFilesystemCache.cpp index 08a62c47f27..f3ead8a95f0 100644 --- a/src/Storages/System/StorageSystemFilesystemCache.cpp +++ b/src/Storages/System/StorageSystemFilesystemCache.cpp @@ -22,6 +22,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes() {"state", std::make_shared()}, {"cache_hits", std::make_shared()}, {"references", std::make_shared()}, + {"downloaded_size", std::make_shared()}, }; } @@ -37,9 +38,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex for (const auto & [cache_base_path, cache_data] : caches) { const auto & cache = cache_data.cache; - auto holder = cache->getAll(); + auto file_segments = cache->getSnapshot(); - for (const auto & file_segment : holder.file_segments) + for (const auto & file_segment : file_segments) { res_columns[0]->insert(cache_base_path); res_columns[1]->insert(cache->getPathInLocalCache(file_segment->key(), file_segment->offset())); @@ -49,8 +50,9 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex res_columns[3]->insert(range.right); res_columns[4]->insert(range.size()); res_columns[5]->insert(FileSegment::stateToString(file_segment->state())); - res_columns[6]->insert(file_segment->hits()); - res_columns[7]->insert(file_segment.use_count()); + res_columns[6]->insert(file_segment->getHitsCount()); + res_columns[7]->insert(file_segment->getRefCount()); + res_columns[8]->insert(file_segment->getDownloadedSize()); } } } diff --git a/src/Storages/System/StorageSystemFilesystemCache.h b/src/Storages/System/StorageSystemFilesystemCache.h index 0f0bd81e760..1d9d28d7b50 100644 --- a/src/Storages/System/StorageSystemFilesystemCache.h +++ b/src/Storages/System/StorageSystemFilesystemCache.h @@ -7,12 +7,14 @@ namespace DB { /** + * Usgae example. How to get mapping from local paths to remote paths: * SELECT * cache_path, * cache_hits, * remote_path, * local_path, - * file_segment_range, + * file_segment_range_begin, + * file_segment_range_end, * size, * state * FROM