Merge pull request #63348 from ClickHouse/small-refactoring-for-tmp-data-in-cache

Make write to temporary data in cache do all checks and assertions as during write to ordinary cache
This commit is contained in:
Kseniia Sumarokova 2024-06-29 09:40:50 +00:00 committed by GitHub
commit 5a6666d330
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 95 additions and 62 deletions

View File

@ -187,13 +187,6 @@ size_t FileSegment::getDownloadedSize() const
return downloaded_size;
}
void FileSegment::setDownloadedSize(size_t delta)
{
auto lk = lock();
downloaded_size += delta;
assert(downloaded_size == std::filesystem::file_size(getPath()));
}
bool FileSegment::isDownloaded() const
{
auto lk = lock();
@ -311,6 +304,11 @@ FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
return remote_file_reader;
}
FileSegment::LocalCacheWriterPtr FileSegment::getLocalCacheWriter()
{
return cache_writer;
}
void FileSegment::resetRemoteFileReader()
{
auto lk = lock();
@ -340,33 +338,31 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
remote_file_reader = remote_file_reader_;
}
void FileSegment::write(char * from, size_t size, size_t offset)
void FileSegment::write(char * from, size_t size, size_t offset_in_file)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentWriteMicroseconds);
if (!size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
auto file_segment_path = getPath();
{
auto lk = lock();
assertIsDownloaderUnlocked("write", lk);
assertNotDetachedUnlocked(lk);
}
if (!size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
const auto file_segment_path = getPath();
{
auto lk = lock();
assertIsDownloaderUnlocked("write", lk);
assertNotDetachedUnlocked(lk);
}
{
if (download_state != State::DOWNLOADING)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected DOWNLOADING state, got {}", stateToString(download_state));
const size_t first_non_downloaded_offset = getCurrentWriteOffset();
if (offset != first_non_downloaded_offset)
if (offset_in_file != first_non_downloaded_offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
size, offset_in_file, first_non_downloaded_offset);
const size_t current_downloaded_size = getDownloadedSize();
chassert(reserved_size >= current_downloaded_size);
@ -396,10 +392,10 @@ void FileSegment::write(char * from, size_t size, size_t offset)
#endif
if (!cache_writer)
cache_writer = std::make_unique<WriteBufferFromFile>(file_segment_path, /* buf_size */0);
cache_writer = std::make_unique<WriteBufferFromFile>(getPath(), /* buf_size */0);
/// Size is equal to offset as offset for write buffer points to data end.
cache_writer->set(from, size, /* offset */size);
cache_writer->set(from, /* size */size, /* offset */size);
/// Reset the buffer when finished.
SCOPE_EXIT({ cache_writer->set(nullptr, 0); });
/// Flush the buffer.
@ -435,7 +431,6 @@ void FileSegment::write(char * from, size_t size, size_t offset)
}
throw;
}
catch (Exception & e)
{
@ -445,7 +440,7 @@ void FileSegment::write(char * from, size_t size, size_t offset)
throw;
}
chassert(getCurrentWriteOffset() == offset + size);
chassert(getCurrentWriteOffset() == offset_in_file + size);
}
FileSegment::State FileSegment::wait(size_t offset)
@ -828,7 +823,7 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
};
const auto file_path = getPath();
if (segment_kind != FileSegmentKind::Temporary)
{
std::lock_guard lk(write_mutex);
if (downloaded_size == 0)

View File

@ -48,7 +48,7 @@ friend class FileCache; /// Because of reserved_size in tryReserve().
public:
using Key = FileCacheKey;
using RemoteFileReaderPtr = std::shared_ptr<ReadBufferFromFileBase>;
using LocalCacheWriterPtr = std::unique_ptr<WriteBufferFromFile>;
using LocalCacheWriterPtr = std::shared_ptr<WriteBufferFromFile>;
using Downloader = std::string;
using DownloaderId = std::string;
using Priority = IFileCachePriority;
@ -204,7 +204,7 @@ public:
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
/// Write data into reserved space.
void write(char * from, size_t size, size_t offset);
void write(char * from, size_t size, size_t offset_in_file);
// Invariant: if state() != DOWNLOADING and remote file reader is present, the reader's
// available() == 0, and getFileOffsetOfBufferEnd() == our getCurrentWriteOffset().
@ -212,6 +212,7 @@ public:
// The reader typically requires its internal_buffer to be assigned from the outside before
// calling next().
RemoteFileReaderPtr getRemoteFileReader();
LocalCacheWriterPtr getLocalCacheWriter();
RemoteFileReaderPtr extractRemoteFileReader();
@ -219,8 +220,6 @@ public:
void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_);
void setDownloadedSize(size_t delta);
void setDownloadFailed();
private:

View File

@ -944,14 +944,7 @@ KeyMetadata::iterator LockedKey::removeFileSegmentImpl(
try
{
const auto path = key_metadata->getFileSegmentPath(*file_segment);
if (file_segment->segment_kind == FileSegmentKind::Temporary)
{
/// FIXME: For temporary file segment the requirement is not as strong because
/// the implementation of "temporary data in cache" creates files in advance.
if (fs::exists(path))
fs::remove(path);
}
else if (file_segment->downloaded_size == 0)
if (file_segment->downloaded_size == 0)
{
chassert(!fs::exists(path));
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/EmptyReadBuffer.h>
#include <base/scope_guard.h>
@ -33,21 +34,20 @@ namespace
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPath()))
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, file_segment(file_segment_)
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment_holder_)
: WriteBufferFromFileDecorator(
segment_holder_->size() == 1
? std::make_unique<WriteBufferFromFile>(segment_holder_->front().getPath())
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, file_segment(&segment_holder_->front())
, segment_holder(std::move(segment_holder_))
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
if (segment_holder->size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment");
}
/// If it throws an exception, the file segment will be incomplete, so you should not use it in the future.
@ -82,9 +82,6 @@ void WriteBufferToFileSegment::nextImpl()
reserve_stat_msg += fmt::format("{} hold {}, can release {}; ",
toString(kind), ReadableSize(stat.non_releasable_size), ReadableSize(stat.releasable_size));
if (std::filesystem::exists(file_segment->getPath()))
std::filesystem::remove(file_segment->getPath());
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Failed to reserve {} bytes for {}: {}(segment info: {})",
bytes_to_write,
file_segment->getKind() == FileSegmentKind::Temporary ? "temporary file" : "the file in cache",
@ -95,17 +92,37 @@ void WriteBufferToFileSegment::nextImpl()
try
{
SwapHelper swap(*this, *impl);
/// Write data to the underlying buffer.
impl->next();
file_segment->write(working_buffer.begin(), bytes_to_write, written_bytes);
written_bytes += bytes_to_write;
}
catch (...)
{
LOG_WARNING(getLogger("WriteBufferToFileSegment"), "Failed to write to the underlying buffer ({})", file_segment->getInfoForLog());
throw;
}
}
file_segment->setDownloadedSize(bytes_to_write);
void WriteBufferToFileSegment::finalizeImpl()
{
next();
auto cache_writer = file_segment->getLocalCacheWriter();
if (cache_writer)
{
SwapHelper swap(*this, *cache_writer);
cache_writer->finalize();
}
}
void WriteBufferToFileSegment::sync()
{
next();
auto cache_writer = file_segment->getLocalCacheWriter();
if (cache_writer)
{
SwapHelper swap(*this, *cache_writer);
cache_writer->sync();
}
}
std::unique_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
@ -114,7 +131,10 @@ std::unique_ptr<ReadBuffer> WriteBufferToFileSegment::getReadBufferImpl()
* because in case destructor called without `getReadBufferImpl` called, data won't be read.
*/
finalize();
return std::make_unique<ReadBufferFromFile>(file_segment->getPath());
if (file_segment->getDownloadedSize() > 0)
return std::make_unique<ReadBufferFromFile>(file_segment->getPath());
else
return std::make_unique<EmptyReadBuffer>();
}
}

View File

@ -9,7 +9,7 @@ namespace DB
class FileSegment;
class WriteBufferToFileSegment : public WriteBufferFromFileDecorator, public IReadableWriteBuffer
class WriteBufferToFileSegment : public WriteBufferFromFileBase, public IReadableWriteBuffer
{
public:
explicit WriteBufferToFileSegment(FileSegment * file_segment_);
@ -17,6 +17,13 @@ public:
void nextImpl() override;
std::string getFileName() const override { return file_segment->getPath(); }
void sync() override;
protected:
void finalizeImpl() override;
private:
std::unique_ptr<ReadBuffer> getReadBufferImpl() override;
@ -29,6 +36,7 @@ private:
FileSegmentsHolderPtr segment_holder;
const size_t reserve_space_lock_wait_timeout_milliseconds;
size_t written_bytes = 0;
};

View File

@ -3,6 +3,8 @@
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Formats/NativeWriter.h>
@ -224,25 +226,37 @@ struct TemporaryFileStream::OutputWriter
bool finalized = false;
};
TemporaryFileStream::Reader::Reader(const String & path, const Block & header_, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
TemporaryFileStream::Reader::Reader(const String & path_, const Block & header_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
, header(header_)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
TemporaryFileStream::Reader::Reader(const String & path, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
TemporaryFileStream::Reader::Reader(const String & path_, size_t size_)
: path(path_)
, size(size_ ? std::min<size_t>(size_, DBMS_DEFAULT_BUFFER_SIZE) : DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
Block TemporaryFileStream::Reader::read()
{
return in_reader.read();
if (!in_reader)
{
if (fs::exists(path))
in_file_buf = std::make_unique<ReadBufferFromFile>(path, size);
else
in_file_buf = std::make_unique<ReadBufferFromEmptyFile>();
in_compressed_buf = std::make_unique<CompressedReadBuffer>(*in_file_buf);
if (header.has_value())
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, header.value(), DBMS_TCP_PROTOCOL_VERSION);
else
in_reader = std::make_unique<NativeReader>(*in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION);
}
return in_reader->read();
}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)

View File

@ -151,9 +151,13 @@ public:
Block read();
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
const std::string path;
const size_t size;
const std::optional<Block> header;
std::unique_ptr<ReadBufferFromFileBase> in_file_buf;
std::unique_ptr<CompressedReadBuffer> in_compressed_buf;
std::unique_ptr<NativeReader> in_reader;
};
struct Stat