Merge pull request #48386 from ClickHouse/vdimir/reset-downloader-tmp-data-cache

This commit is contained in:
Vladimir C 2023-04-06 11:23:16 +02:00 committed by GitHub
commit a2b71a0054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 11 deletions

View File

@ -201,7 +201,7 @@ void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] std::unique_loc
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
/// range().size() can equal 0 in case of write-though cache.
if (current_downloaded_size != 0 && current_downloaded_size == range().size())
if (!is_unbound && current_downloaded_size != 0 && current_downloaded_size == range().size())
setDownloadedUnlocked(segment_lock);
else
setDownloadState(State::PARTIALLY_DOWNLOADED);
@ -343,7 +343,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
ErrorCodes::LOGICAL_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
if (current_downloaded_size == range().size())
if (!is_unbound && current_downloaded_size == range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
if (!cache_writer)
@ -689,7 +689,8 @@ String FileSegment::getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", ";
info << "caller id: " << getCallerId() << ", ";
info << "detached: " << is_detached << ", ";
info << "kind: " << toString(segment_kind);
info << "kind: " << toString(segment_kind) << ", ";
info << "unbound: " << is_unbound;
return info.str();
}
@ -785,6 +786,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
snapshot->download_state = file_segment->download_state;
snapshot->segment_kind = file_segment->getKind();
snapshot->is_unbound = file_segment->is_unbound;
return snapshot;
}
@ -905,6 +907,8 @@ String FileSegmentsHolder::toString()
if (!ranges.empty())
ranges += ", ";
ranges += file_segment->range().toString();
if (file_segment->is_unbound)
ranges += "(unbound)";
}
return ranges;
}

View File

@ -159,6 +159,7 @@ public:
FileSegmentKind getKind() const { return segment_kind; }
bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; }
bool isUnbound() const { return is_unbound; }
using UniqueId = std::pair<FileCacheKey, size_t>;
UniqueId getUniqueId() const { return std::pair(key(), offset()); }

View File

@ -2,6 +2,8 @@
#include <Interpreters/Cache/FileSegment.h>
#include <IO/SwapHelper.h>
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
namespace DB
@ -10,20 +12,23 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(file_segment_->detachWriter()), file_segment(file_segment_)
{
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
}
/// If it throws an exception, the file segment will be incomplete, so you should not use it in the future.
void WriteBufferToFileSegment::nextImpl()
{
auto downloader [[maybe_unused]] = file_segment->getOrSetDownloader();
chassert(downloader == FileSegment::getCallerId());
SCOPE_EXIT({
file_segment->completePartAndResetDownloader();
});
size_t bytes_to_write = offset();
/// In case of an error, we don't need to finalize the file segment

View File

@ -562,7 +562,7 @@ TEST_F(FileCacheTest, writeBuffer)
DB::FileCache cache(cache_base_path, settings);
cache.initialize();
auto write_to_cache = [&cache](const String & key, const Strings & data)
auto write_to_cache = [&cache](const String & key, const Strings & data, bool flush)
{
CreateFileSegmentSettings segment_settings;
segment_settings.kind = FileSegmentKind::Temporary;
@ -572,14 +572,31 @@ TEST_F(FileCacheTest, writeBuffer)
EXPECT_EQ(holder.file_segments.size(), 1);
auto & segment = holder.file_segments.front();
WriteBufferToFileSegment out(segment.get());
std::list<std::thread> threads;
std::mutex mu;
for (const auto & s : data)
{
/// Write from diffetent threads to check
/// that no assertions inside cache related to downloaderId are triggered
threads.emplace_back([&]
{
std::unique_lock lock(mu);
out.write(s.data(), s.size());
/// test different buffering scenarios
if (flush)
{
out.next();
}
});
}
for (auto & t : threads)
t.join();
return holder;
};
std::vector<fs::path> file_segment_paths;
{
auto holder = write_to_cache("key1", {"abc", "defg"});
auto holder = write_to_cache("key1", {"abc", "defg"}, false);
file_segment_paths.emplace_back(holder.file_segments.front()->getPathInLocalCache());
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 7);
@ -587,7 +604,7 @@ TEST_F(FileCacheTest, writeBuffer)
ASSERT_EQ(cache.getUsedCacheSize(), 7);
{
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"});
auto holder2 = write_to_cache("key2", {"1", "22", "333", "4444", "55555"}, true);
file_segment_paths.emplace_back(holder2.file_segments.front()->getPathInLocalCache());
ASSERT_EQ(fs::file_size(file_segment_paths.back()), 15);

View File

@ -26,6 +26,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
{"downloaded_size", std::make_shared<DataTypeUInt64>()},
{"persistent", std::make_shared<DataTypeNumber<UInt8>>()},
{"kind", std::make_shared<DataTypeString>()},
{"unbound", std::make_shared<DataTypeNumber<UInt8>>()},
};
}
@ -62,6 +63,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[8]->insert(file_segment->getDownloadedSize());
res_columns[9]->insert(file_segment->isPersistent());
res_columns[10]->insert(toString(file_segment->getKind()));
res_columns[11]->insert(file_segment->isUnbound());
}
}
}