diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 357fe10b908..9bc3c1cf521 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -843,6 +843,7 @@ bool FileCache::tryReserveInCache( for (const auto & offset_to_delete : transaction->delete_offsets) { auto * cell = transaction->getOffsets().get(offset_to_delete); + transaction->queue_lock = lock; transaction->remove(cell->file_segment); } } diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 4066a303505..e859f566c35 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -301,21 +301,6 @@ void FileSegment::resetRemoteFileReader() remote_file_reader.reset(); } -std::unique_ptr FileSegment::detachWriter() -{ - auto lock = segment_guard.lock(); - - if (!cache_writer) - { - if (detached_writer) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Writer is already detached"); - - cache_writer = std::make_unique(file_path); - } - detached_writer = true; - return std::move(cache_writer); -} - void FileSegment::write(const char * from, size_t size, size_t offset) { if (!size) @@ -359,9 +344,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset) "Cache writer was finalized (downloaded size: {}, state: {})", current_downloaded_size, stateToString(download_state)); - if (detached_writer) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer was detached"); - cache_writer = std::make_unique(file_path); } } @@ -836,7 +818,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() return file_segments.erase(file_segments.begin()); } - auto lock = file_segment.cache->main_priority->lock(); + auto lock = file_segment.cache->main_priority->lockShared(); /// File segment pointer must be reset right after calling complete() and /// under the same mutex, because complete() checks for segment pointers. auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false); @@ -844,10 +826,13 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() { auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator; if (queue_iter) - queue_iter->use(lock); + queue_iter->use(*lock); if (!file_segment.isCompleted()) + { + key_transaction->queue_lock = lock; file_segment.completeUnlocked(*key_transaction); + } } return file_segments.erase(file_segments.begin()); diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index c81b490c2ca..4df28de524c 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -258,8 +258,6 @@ public: void setDownloadedSize(size_t delta); - LocalCacheWriterPtr detachWriter(); - private: size_t getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock &) const; size_t getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock &) const; @@ -313,7 +311,6 @@ private: RemoteFileReaderPtr remote_file_reader; LocalCacheWriterPtr cache_writer; - bool detached_writer = false; /// downloaded_size should always be less or equal to reserved_size size_t downloaded_size = 0; diff --git a/src/Interpreters/Cache/WriteBufferToFileSegment.cpp b/src/Interpreters/Cache/WriteBufferToFileSegment.cpp index 16906e9440e..9f9358951e8 100644 --- a/src/Interpreters/Cache/WriteBufferToFileSegment.cpp +++ b/src/Interpreters/Cache/WriteBufferToFileSegment.cpp @@ -27,7 +27,7 @@ namespace } WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_) - : WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_) + : WriteBufferFromFileDecorator(std::make_unique(file_segment_->getPathInLocalCache())), file_segment(file_segment_) { auto downloader = file_segment->getOrSetDownloader(); if (downloader != FileSegment::getCallerId()) diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp index b9c3a78984b..901c81ae40f 100644 --- a/src/Interpreters/TemporaryDataOnDisk.cpp +++ b/src/Interpreters/TemporaryDataOnDisk.cpp @@ -60,9 +60,11 @@ TemporaryFileStream & TemporaryDataOnDisk::createStreamToCacheFile(const Block & if (!file_cache) throw Exception("TemporaryDataOnDiskScope has no cache", ErrorCodes::LOGICAL_ERROR); - auto holder = file_cache->set(FileSegment::Key::random(), 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true)); + auto key = FileSegment::Key::random(); + auto holder = file_cache->set(key, 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true)); std::lock_guard lock(mutex); + std::filesystem::create_directories(file_cache->getPathInLocalCache(key)); TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique(std::move(holder), header, this)); return *tmp_stream; } diff --git a/src/Interpreters/tests/gtest_lru_file_cache.cpp b/src/Interpreters/tests/gtest_lru_file_cache.cpp index f0eca094d98..e896605c629 100644 --- a/src/Interpreters/tests/gtest_lru_file_cache.cpp +++ b/src/Interpreters/tests/gtest_lru_file_cache.cpp @@ -536,7 +536,10 @@ TEST_F(FileCacheTest, writeBuffer) segment_settings.kind = FileSegmentKind::Temporary; segment_settings.unbounded = true; - auto holder = cache.set(cache.createKeyForPath(key), 0, 3, segment_settings); + auto cache_key = cache.createKeyForPath(key); + auto holder = cache.set(cache_key, 0, 3, segment_settings); + /// The same is done in TemporaryDataOnDisk::createStreamToCacheFile. + std::filesystem::create_directories(cache.getPathInLocalCache(cache_key)); EXPECT_EQ(holder->size(), 1); auto & segment = holder->front(); WriteBufferToFileSegment out(&segment);