Fix TemporaryDataOnDisk with cache

This commit is contained in:
kssenii 2023-01-06 18:24:38 +01:00
parent ce3014669b
commit 4fd0be7f03
6 changed files with 14 additions and 26 deletions

View File

@ -843,6 +843,7 @@ bool FileCache::tryReserveInCache(
for (const auto & offset_to_delete : transaction->delete_offsets)
{
auto * cell = transaction->getOffsets().get(offset_to_delete);
transaction->queue_lock = lock;
transaction->remove(cell->file_segment);
}
}

View File

@ -301,21 +301,6 @@ void FileSegment::resetRemoteFileReader()
remote_file_reader.reset();
}
std::unique_ptr<WriteBufferFromFile> FileSegment::detachWriter()
{
auto lock = segment_guard.lock();
if (!cache_writer)
{
if (detached_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writer is already detached");
cache_writer = std::make_unique<WriteBufferFromFile>(file_path);
}
detached_writer = true;
return std::move(cache_writer);
}
void FileSegment::write(const char * from, size_t size, size_t offset)
{
if (!size)
@ -359,9 +344,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
"Cache writer was finalized (downloaded size: {}, state: {})",
current_downloaded_size, stateToString(download_state));
if (detached_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer was detached");
cache_writer = std::make_unique<WriteBufferFromFile>(file_path);
}
}
@ -836,7 +818,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
return file_segments.erase(file_segments.begin());
}
auto lock = file_segment.cache->main_priority->lock();
auto lock = file_segment.cache->main_priority->lockShared();
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false);
@ -844,10 +826,13 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
{
auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator;
if (queue_iter)
queue_iter->use(lock);
queue_iter->use(*lock);
if (!file_segment.isCompleted())
{
key_transaction->queue_lock = lock;
file_segment.completeUnlocked(*key_transaction);
}
}
return file_segments.erase(file_segments.begin());

View File

@ -258,8 +258,6 @@ public:
void setDownloadedSize(size_t delta);
LocalCacheWriterPtr detachWriter();
private:
size_t getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock &) const;
size_t getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock &) const;
@ -313,7 +311,6 @@ private:
RemoteFileReaderPtr remote_file_reader;
LocalCacheWriterPtr cache_writer;
bool detached_writer = false;
/// downloaded_size should always be less or equal to reserved_size
size_t downloaded_size = 0;

View File

@ -27,7 +27,7 @@ namespace
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_)
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPathInLocalCache())), file_segment(file_segment_)
{
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())

View File

@ -60,9 +60,11 @@ TemporaryFileStream & TemporaryDataOnDisk::createStreamToCacheFile(const Block &
if (!file_cache)
throw Exception("TemporaryDataOnDiskScope has no cache", ErrorCodes::LOGICAL_ERROR);
auto holder = file_cache->set(FileSegment::Key::random(), 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true));
auto key = FileSegment::Key::random();
auto holder = file_cache->set(key, 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true));
std::lock_guard lock(mutex);
std::filesystem::create_directories(file_cache->getPathInLocalCache(key));
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(holder), header, this));
return *tmp_stream;
}

View File

@ -536,7 +536,10 @@ TEST_F(FileCacheTest, writeBuffer)
segment_settings.kind = FileSegmentKind::Temporary;
segment_settings.unbounded = true;
auto holder = cache.set(cache.createKeyForPath(key), 0, 3, segment_settings);
auto cache_key = cache.createKeyForPath(key);
auto holder = cache.set(cache_key, 0, 3, segment_settings);
/// The same is done in TemporaryDataOnDisk::createStreamToCacheFile.
std::filesystem::create_directories(cache.getPathInLocalCache(cache_key));
EXPECT_EQ(holder->size(), 1);
auto & segment = holder->front();
WriteBufferToFileSegment out(&segment);