Merge pull request #41437 from kssenii/fix-logical-error-write-through-cache

Fix assertion in write-through cache
This commit is contained in:
Kseniia Sumarokova 2022-09-20 11:51:19 +02:00 committed by GitHub
commit 3c7bbad0b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 230 additions and 270 deletions

View File

@ -11,12 +11,16 @@ namespace ProfileEvents
{
extern const Event CachedWriteBufferCacheWriteBytes;
extern const Event CachedWriteBufferCacheWriteMicroseconds;
extern const Event FileSegmentWriteMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
class SwapHelper
@ -31,6 +35,178 @@ namespace
};
}
FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
const String & query_id_,
const String & source_path_)
: cache(cache_)
, key(key_)
, cache_log(cache_log_)
, query_id(query_id_)
, source_path(source_path_)
, current_file_segment_it(file_segments_holder.file_segments.end())
{
}
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
{
if (finalized)
return false;
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else
{
auto file_segment = *current_file_segment_it;
assert(file_segment->getCurrentWriteOffset() == current_file_segment_write_offset);
if (current_file_segment_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because current write offset is: {}",
offset, current_file_segment_write_offset);
}
if (file_segment->range().size() == file_segment->getDownloadedSize())
{
completeFileSegment(*file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
}
auto & file_segment = *current_file_segment_it;
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
SCOPE_EXIT({
if (file_segment->isDownloader())
file_segment->completePartAndResetDownloader();
});
bool reserved = file_segment->reserve(size);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return false;
}
try
{
file_segment->write(data, size, offset);
}
catch (...)
{
file_segment->completePartAndResetDownloader();
throw;
}
file_segment->completePartAndResetDownloader();
current_file_segment_write_offset += size;
return true;
}
void FileSegmentRangeWriter::finalize()
{
if (finalized)
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
return;
completeFileSegment(**current_file_segment_it);
finalized = true;
}
FileSegmentRangeWriter::~FileSegmentRangeWriter()
{
try
{
if (!finalized)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
* File segment capacity will equal `max_file_segment_size`, but actual size is 0.
*/
std::lock_guard cache_lock(cache->mutex);
CreateFileSegmentSettings create_settings
{
.is_persistent = is_persistent,
};
/// We set max_file_segment_size to be downloaded,
/// if we have less size to write, file segment will be resized in complete() method.
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, create_settings, cache_lock);
return file_segments_holder.add(std::move(file_segment));
}
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
{
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize() - 1;
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
cache_log->add(elem);
}
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
return;
file_segment.completeWithoutState();
appendFilesystemCacheLog(file_segment);
}
CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
std::unique_ptr<WriteBuffer> impl_,
FileCachePtr cache_,
@ -47,7 +223,6 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
, is_persistent_cache_file(is_persistent_cache_file_)
, query_id(query_id_)
, enable_cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log)
, cache_log(Context::getGlobalContextInstance()->getFilesystemCacheLog())
{
}
@ -82,8 +257,11 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
if (!cache_writer)
{
cache_writer = std::make_unique<FileSegmentRangeWriter>(
cache.get(), key, [this](const FileSegment & file_segment) { appendFilesystemCacheLog(file_segment); });
std::shared_ptr<FilesystemCacheLog> cache_log;
if (enable_cache_log)
cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog();
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, cache_log, query_id, source_path);
}
Stopwatch watch(CLOCK_MONOTONIC);
@ -119,37 +297,9 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteBytes, size);
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteMicroseconds, watch.elapsedMicroseconds());
current_file_segment_counters.increment(
ProfileEvents::FileSegmentWriteMicroseconds, watch.elapsedMicroseconds());
cache_in_error_state_or_disabled = false;
}
void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
{
auto file_segment_range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(current_file_segment_counters.getPartiallyAtomicSnapshot()),
};
current_file_segment_counters.reset();
cache_log->add(elem);
}
}
void CachedOnDiskWriteBufferFromFile::finalizeImpl()
{
try

View File

@ -13,11 +13,57 @@ class Logger;
namespace DB
{
/**
* We want to write eventually some size, which is not known until the very end.
* Therefore we allocate file segments lazily. Each file segment is assigned capacity
* of max_file_segment_size, but reserved_size remains 0, until call to tryReserve().
* Once current file segment is full (reached max_file_segment_size), we allocate a
* new file segment. All allocated file segments resize in file segments holder.
* If at the end of all writes, the last file segment is not full, then it is resized.
*/
class FileSegmentRangeWriter
{
public:
FileSegmentRangeWriter(
FileCache * cache_, const FileSegment::Key & key_,
std::shared_ptr<FilesystemCacheLog> cache_log_, const String & query_id_, const String & source_path_);
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
bool write(const char * data, size_t size, size_t offset, bool is_persistent);
void finalize();
~FileSegmentRangeWriter();
private:
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
void appendFilesystemCacheLog(const FileSegment & file_segment);
void completeFileSegment(FileSegment & file_segment);
FileCache * cache;
FileSegment::Key key;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileSegmentsHolder file_segments_holder{};
FileSegments::iterator current_file_segment_it;
size_t current_file_segment_write_offset = 0;
bool finalized = false;
};
/**
* Write buffer for filesystem caching on write operations.
*/
class FileSegmentRangeWriter;
class CachedOnDiskWriteBufferFromFile final : public WriteBufferFromFileDecorator
{
public:
@ -36,7 +82,6 @@ public:
private:
void cacheData(char * data, size_t size);
void appendFilesystemCacheLog(const FileSegment & file_segment);
Poco::Logger * log;
@ -49,11 +94,9 @@ private:
const String query_id;
bool enable_cache_log;
std::shared_ptr<FilesystemCacheLog> cache_log;
bool cache_in_error_state_or_disabled = false;
ProfileEvents::Counters current_file_segment_counters;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;
};

View File

@ -125,12 +125,6 @@ size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /*
return downloaded_size;
}
size_t FileSegment::getRemainingSizeToDownload() const
{
std::unique_lock segment_lock(mutex);
return range().size() - getDownloadedSizeUnlocked(segment_lock);
}
bool FileSegment::isDownloaded() const
{
std::lock_guard segment_lock(mutex);
@ -836,186 +830,4 @@ String FileSegmentsHolder::toString()
return ranges;
}
FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,
OnCompleteFileSegmentCallback && on_complete_file_segment_func_)
: cache(cache_)
, key(key_)
, current_file_segment_it(file_segments_holder.file_segments.end())
, on_complete_file_segment_func(on_complete_file_segment_func_)
{
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
* File segment capacity will equal `max_file_segment_size`, but actual size is 0.
*/
std::lock_guard cache_lock(cache->mutex);
CreateFileSegmentSettings create_settings
{
.is_persistent = is_persistent,
};
/// We set max_file_segment_size to be downloaded,
/// if we have less size to write, file segment will be resized in complete() method.
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, create_settings, cache_lock);
return file_segments_holder.add(std::move(file_segment));
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/**
* Complete file segment based on downaloaded size.
*/
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
return;
size_t current_downloaded_size = file_segment.getDownloadedSize();
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
if (current_downloaded_size != file_segment.range().size())
{
/// Current file segment is downloaded as a part of write-through cache
/// and therefore cannot be concurrently accessed. Nevertheless, it can be
/// accessed by cache system tables if someone read from them,
/// therefore we need a mutex.
std::unique_lock segment_lock(file_segment.mutex);
assert(current_downloaded_size <= file_segment.range().size());
file_segment.segment_range = FileSegment::Range(
file_segment.segment_range.left,
file_segment.segment_range.left + current_downloaded_size - 1);
file_segment.reserved_size = current_downloaded_size;
file_segment.setDownloadedUnlocked(segment_lock);
}
file_segment.completeWithoutState();
on_complete_file_segment_func(file_segment);
}
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
{
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
if (finalized)
return false;
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else
{
if (current_file_segment_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because current write offset is: {}",
offset, current_file_segment_write_offset);
}
size_t current_write_offset = (*current_file_segment_it)->getCurrentWriteOffset();
auto current_file_segment = *current_file_segment_it;
if (current_file_segment->getRemainingSizeToDownload() == 0)
{
completeFileSegment(*current_file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else if (current_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot file segment download offset {} does not match current write offset {}",
current_write_offset, offset);
}
}
auto & file_segment = *current_file_segment_it;
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
SCOPE_EXIT({
if (file_segment->isDownloader())
{
file_segment->completePartAndResetDownloader();
}
});
bool reserved = file_segment->reserve(size);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
on_complete_file_segment_func(*file_segment);
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return false;
}
try
{
file_segment->write(data, size, offset);
}
catch (...)
{
file_segment->completePartAndResetDownloader();
throw;
}
file_segment->completePartAndResetDownloader();
current_file_segment_write_offset += size;
return true;
}
void FileSegmentRangeWriter::finalize()
{
if (finalized)
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
return;
completeFileSegment(**current_file_segment_it);
finalized = true;
}
FileSegmentRangeWriter::~FileSegmentRangeWriter()
{
try
{
if (!finalized)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -217,8 +217,6 @@ public:
void resetRemoteFileReader();
size_t getRemainingSizeToDownload() const;
private:
size_t getFirstNonDownloadedOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
size_t getCurrentWriteOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
@ -327,47 +325,4 @@ struct FileSegmentsHolder : private boost::noncopyable
FileSegments file_segments{};
};
/**
* We want to write eventually some size, which is not known until the very end.
* Therefore we allocate file segments lazily. Each file segment is assigned capacity
* of max_file_segment_size, but reserved_size remains 0, until call to tryReserve().
* Once current file segment is full (reached max_file_segment_size), we allocate a
* new file segment. All allocated file segments resize in file segments holder.
* If at the end of all writes, the last file segment is not full, then it is resized.
*/
class FileSegmentRangeWriter
{
public:
using OnCompleteFileSegmentCallback = std::function<void(const FileSegment & file_segment)>;
FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,
/// A callback which is called right after each file segment is completed.
/// It is used to write into filesystem cache log.
OnCompleteFileSegmentCallback && on_complete_file_segment_func_);
~FileSegmentRangeWriter();
bool write(const char * data, size_t size, size_t offset, bool is_persistent);
void finalize();
private:
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
void completeFileSegment(FileSegment & file_segment);
FileCache * cache;
FileSegment::Key key;
FileSegmentsHolder file_segments_holder;
FileSegments::iterator current_file_segment_it;
size_t current_file_segment_write_offset = 0;
bool finalized = false;
OnCompleteFileSegmentCallback on_complete_file_segment_func;
};
}