Merge pull request #40759 from kssenii/fix-write-through-cache-logical-error

Fix logical error from write-though cache from stress test
This commit is contained in:
Kseniia Sumarokova 2022-08-31 11:55:18 +02:00 committed by GitHub
commit 9abcfce529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 35 deletions

View File

@ -840,37 +840,31 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
return;
size_t current_downloaded_size = file_segment.getDownloadedSize();
if (current_downloaded_size > 0)
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
if (current_downloaded_size != file_segment.range().size())
{
file_segment.getOrSetDownloader();
/// Current file segment is downloaded as a part of write-through cache
/// and therefore cannot be concurrently accessed. Nevertheless, it can be
/// accessed by cache system tables if someone read from them,
/// therefore we need a mutex.
std::lock_guard segment_lock(file_segment.mutex);
{
/// file_segment->complete(DOWNLOADED) is not enough, because file segment capacity
/// was initially set with a margin as `max_file_segment_size`. => We need to always
/// resize to actual size after download finished.
/// Current file segment is downloaded as a part of write-through cache
/// and therefore cannot be concurrently accessed. Nevertheless, it can be
/// accessed by cache system tables if someone read from them,
/// therefore we need a mutex.
std::lock_guard segment_lock(file_segment.mutex);
assert(file_segment.downloaded_size <= file_segment.range().size());
file_segment.segment_range = FileSegment::Range(
file_segment.segment_range.left,
file_segment.segment_range.left + file_segment.downloaded_size - 1);
file_segment.reserved_size = file_segment.downloaded_size;
}
file_segment.completeWithState(FileSegment::State::DOWNLOADED);
on_complete_file_segment_func(file_segment);
assert(current_downloaded_size <= file_segment.range().size());
file_segment.segment_range = FileSegment::Range(
file_segment.segment_range.left,
file_segment.segment_range.left + current_downloaded_size - 1);
file_segment.reserved_size = current_downloaded_size;
}
else
{
std::lock_guard cache_lock(cache->mutex);
file_segment.completeWithoutState(cache_lock);
}
on_complete_file_segment_func(file_segment);
}
bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset, bool is_persistent)
@ -899,22 +893,27 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
offset, current_file_segment_write_offset);
}
if ((*current_file_segment_it)->getRemainingSizeToDownload() == 0)
auto current_file_segment = *current_file_segment_it;
if (current_file_segment->getRemainingSizeToDownload() == 0)
{
completeFileSegment(**current_file_segment_it);
completeFileSegment(*current_file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
}
else if ((*current_file_segment_it)->getDownloadOffset() != offset)
else if (current_file_segment->getDownloadOffset() != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot file segment download offset {} does not match current write offset {}",
(*current_file_segment_it)->getDownloadOffset(), offset);
current_file_segment->getDownloadOffset(), offset);
}
}
auto & file_segment = *current_file_segment_it;
file_segment->getOrSetDownloader();
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
SCOPE_EXIT({
file_segment->resetDownloader();
});

View File

@ -318,6 +318,7 @@ The server successfully detected this situation and will download merged part fr
\
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \
M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \
M(FileSegmentWriteMicroseconds, "Metric per file segment. Time spend writing cache") \
M(FileSegmentCacheWriteMicroseconds, "Metric per file segment. Time spend writing data to cache") \
M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent predownloading data to cache (predownloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \
M(FileSegmentUsedBytes, "Metric per file segment. How many bytes were actually used from current file segment") \

View File

@ -80,7 +80,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_size = file_segment_range.size(),
.cache_attempted = true,
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
current_file_segment_counters.getPartiallyAtomicSnapshot()),

View File

@ -11,6 +11,7 @@ namespace ProfileEvents
{
extern const Event CachedWriteBufferCacheWriteBytes;
extern const Event CachedWriteBufferCacheWriteMicroseconds;
extern const Event FileSegmentWriteMicroseconds;
}
namespace DB
@ -118,6 +119,9 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteBytes, size);
ProfileEvents::increment(ProfileEvents::CachedWriteBufferCacheWriteMicroseconds, watch.elapsedMicroseconds());
current_file_segment_counters.increment(
ProfileEvents::FileSegmentWriteMicroseconds, watch.elapsedMicroseconds());
}
void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment & file_segment)
@ -134,7 +138,7 @@ void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.cache_attempted = false,
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(current_file_segment_counters.getPartiallyAtomicSnapshot()),
};

View File

@ -81,7 +81,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_range = { 0, current_file_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
.cache_attempted = false,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())

View File

@ -42,7 +42,7 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"cache_attempted", std::make_shared<DataTypeUInt8>()},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
{"read_buffer_id", std::make_shared<DataTypeString>()},
};
@ -62,7 +62,7 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(cache_type));
columns[i++]->insert(cache_attempted);
columns[i++]->insert(read_from_cache_attempted);
if (profile_counters)
{

View File

@ -41,7 +41,7 @@ struct FilesystemCacheLogElement
std::pair<size_t, size_t> requested_range{};
CacheType cache_type{};
size_t file_segment_size;
bool cache_attempted;
bool read_from_cache_attempted;
String read_buffer_id;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;