Fix write-throw-cache possible write finish

This commit is contained in:
kssenii 2022-12-15 19:39:41 +01:00
parent 40c7ee6aa2
commit dfefd8dfcd
11 changed files with 143 additions and 86 deletions

View File

@ -619,6 +619,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, enable_filesystem_cache_on_lower_level, true, "If read buffer supports caching inside threadpool, allow it to do it, otherwise cache outside ot threadpool. Do not use this setting, it is needed for testing", 0) \
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, max_query_cache_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be used by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
\

View File

@ -44,10 +44,10 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
const String & source_path_)
: cache(cache_)
, key(key_)
, log(&Poco::Logger::get("FileSegmentRangeWriter"))
, cache_log(cache_log_)
, query_id(query_id_)
, source_path(source_path_)
, current_file_segment_it(file_segments_holder.file_segments.end())
{
}
@ -56,69 +56,66 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
if (finalized)
return false;
if (expected_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because expected write offset is: {}",
offset, expected_write_offset);
}
auto & file_segments = file_segments_holder.file_segments;
if (current_file_segment_it == file_segments.end())
if (file_segments.empty() || file_segments.back()->isDownloaded())
{
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
allocateFileSegment(expected_write_offset, is_persistent);
}
else
auto & file_segment = file_segments.back();
while (size > 0)
{
auto file_segment = *current_file_segment_it;
assert(file_segment->getCurrentWriteOffset() == current_file_segment_write_offset);
if (current_file_segment_write_offset != offset)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot write file segment at offset {}, because current write offset is: {}",
offset, current_file_segment_write_offset);
}
if (file_segment->range().size() == file_segment->getDownloadedSize())
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize();
if (available_size == 0)
{
completeFileSegment(*file_segment);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
file_segment = allocateFileSegment(expected_write_offset, is_persistent);
continue;
}
}
auto & file_segment = *current_file_segment_it;
if (!file_segment->isDownloader()
&& file_segment->getOrSetDownloader() != FileSegment::getCallerId())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Failed to set a downloader. ({})", file_segment->getInfoForLog());
}
auto downloader = file_segment->getOrSetDownloader();
if (downloader != FileSegment::getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog());
size_t size_to_write = std::min(available_size, size);
SCOPE_EXIT({
if (file_segment->isDownloader())
file_segment->completePartAndResetDownloader();
});
bool reserved = file_segment->reserve(size_to_write);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
bool reserved = file_segment->reserve(size);
if (!reserved)
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(
log, "Failed to reserve space in cache (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
LOG_DEBUG(
&Poco::Logger::get("FileSegmentRangeWriter"),
"Unsuccessful space reservation attempt (size: {}, file segment info: {}",
size, file_segment->getInfoForLog());
return false;
}
return false;
}
try
{
file_segment->write(data, size, offset);
}
catch (...)
{
file_segment->write(data, size_to_write, offset);
file_segment->completePartAndResetDownloader();
throw;
size -= size_to_write;
expected_write_offset += size_to_write;
offset += size_to_write;
data += size_to_write;
}
file_segment->completePartAndResetDownloader();
current_file_segment_write_offset += size;
if (file_segments.back()->isDownloader())
file_segments.back()->completePartAndResetDownloader();
return true;
}
@ -129,10 +126,10 @@ void FileSegmentRangeWriter::finalize()
return;
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty() || current_file_segment_it == file_segments.end())
if (file_segments.empty())
return;
completeFileSegment(**current_file_segment_it);
completeFileSegment(*file_segments.back());
finalized = true;
}
@ -149,7 +146,7 @@ FileSegmentRangeWriter::~FileSegmentRangeWriter()
}
}
FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
FileSegmentPtr & FileSegmentRangeWriter::allocateFileSegment(size_t offset, bool is_persistent)
{
/**
* Allocate a new file segment starting `offset`.
@ -168,7 +165,8 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset
auto file_segment = cache->createFileSegmentForDownload(
key, offset, cache->max_file_segment_size, create_settings, cache_lock);
return file_segments_holder.add(std::move(file_segment));
auto & file_segments = file_segments_holder.file_segments;
return *file_segments.insert(file_segments.end(), file_segment);
}
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
@ -199,7 +197,7 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
{
/// File segment can be detached if space reservation failed.
if (file_segment.isDetached())
if (file_segment.isDetached() || file_segment.isCompleted())
return;
file_segment.completeWithoutState();
@ -223,6 +221,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
, is_persistent_cache_file(is_persistent_cache_file_)
, query_id(query_id_)
, enable_cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log)
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
{
}
@ -246,11 +245,11 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
}
/// Write data to cache.
cacheData(working_buffer.begin(), size);
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
}
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool throw_on_error)
{
if (cache_in_error_state_or_disabled)
return;
@ -285,11 +284,17 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size)
return;
}
if (throw_on_error)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}
catch (...)
{
if (throw_on_error)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}

View File

@ -39,7 +39,7 @@ public:
~FileSegmentRangeWriter();
private:
FileSegments::iterator allocateFileSegment(size_t offset, bool is_persistent);
FileSegmentPtr & allocateFileSegment(size_t offset, bool is_persistent);
void appendFilesystemCacheLog(const FileSegment & file_segment);
@ -48,14 +48,14 @@ private:
FileCache * cache;
FileSegment::Key key;
Poco::Logger * log;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileSegmentsHolder file_segments_holder{};
FileSegments::iterator current_file_segment_it;
size_t current_file_segment_write_offset = 0;
size_t expected_write_offset = 0;
bool finalized = false;
};
@ -81,7 +81,7 @@ public:
void finalizeImpl() override;
private:
void cacheData(char * data, size_t size);
void cacheData(char * data, size_t size, bool throw_on_error);
Poco::Logger * log;
@ -95,6 +95,7 @@ private:
bool enable_cache_log;
bool throw_on_error_from_cache;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;

View File

@ -15,6 +15,8 @@ struct WriteSettings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;
bool is_file_cache_persistent = false;
bool throw_on_error_from_cache = false;
bool s3_allow_parallel_part_upload = true;
/// Monitoring

View File

@ -18,7 +18,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -98,7 +97,7 @@ void FileCache::assertInitialized(std::lock_guard<std::mutex> & /* cache_lock */
if (initialization_exception)
std::rethrow_exception(initialization_exception);
else
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized");
}
}
@ -541,12 +540,12 @@ FileSegmentPtr FileCache::createFileSegmentForDownload(
#endif
if (size > max_file_segment_size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Requested size exceeds max file segment size");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Requested size exceeds max file segment size");
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache cell already exists for key `{}` and offset {}",
key.toString(), offset);
@ -738,7 +737,7 @@ bool FileCache::tryReserveForMainList(
auto * cell = getCell(entry_key, entry_offset, cache_lock);
if (!cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache became inconsistent. Key: {}, offset: {}",
key.toString(), offset);
@ -964,7 +963,7 @@ void FileCache::remove(
catch (...)
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false));
}
@ -981,7 +980,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock
/// cache_base_path / key_prefix / key / offset
if (!files.empty())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cache initialization is partially made. "
"This can be a result of a failed first attempt to initialize cache. "
"Please, check log for error messages");
@ -1212,7 +1211,7 @@ FileCache::FileSegmentCell::FileSegmentCell(
}
default:
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}",
FileSegment::stateToString(file_segment->download_state));
}

View File

@ -19,7 +19,6 @@ namespace DB
namespace ErrorCodes
{
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -65,7 +64,7 @@ FileSegment::FileSegment(
default:
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Can only create cell with either EMPTY, DOWNLOADED or SKIP_CACHE state");
}
}
@ -277,7 +276,7 @@ void FileSegment::resetRemoteFileReader()
void FileSegment::write(const char * from, size_t size, size_t offset)
{
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing zero size is not allowed");
{
std::unique_lock segment_lock(mutex);
@ -293,7 +292,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(segment_lock);
if (offset != first_non_downloaded_offset)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
@ -303,7 +302,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
if (free_reserved_size < size)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
if (current_downloaded_size == range().size())
@ -363,7 +362,7 @@ FileSegment::State FileSegment::wait()
return download_state;
if (download_state == State::EMPTY)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot wait on a file segment with empty state");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot wait on a file segment with empty state");
if (download_state == State::DOWNLOADING)
{
@ -381,7 +380,7 @@ FileSegment::State FileSegment::wait()
bool FileSegment::reserve(size_t size_to_reserve)
{
if (!size_to_reserve)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
size_t expected_downloaded_size;
@ -395,7 +394,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
if (expected_downloaded_size + size_to_reserve > range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size_to_reserve, range().toString(), downloaded_size);
@ -433,9 +432,6 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::m
if (is_downloaded)
return;
setDownloadState(State::DOWNLOADED);
is_downloaded = true;
if (cache_writer)
{
cache_writer->finalize();
@ -497,7 +493,7 @@ void FileSegment::completeWithState(State state)
{
cv.notify_all();
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
ErrorCodes::LOGICAL_ERROR,
"Cannot complete file segment with state: {}", stateToString(state));
}
@ -551,8 +547,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
{
if (is_last_holder)
cache->remove(key(), offset(), cache_lock, segment_lock);
return;
break;
}
case State::DOWNLOADED:
{
@ -604,6 +599,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
}
}
is_completed = true;
LOG_TEST(log, "Completed file segment: {}", getInfoForLogUnlocked(segment_lock));
}
@ -739,6 +735,12 @@ bool FileSegment::isDetached() const
return is_detached;
}
bool FileSegment::isCompleted() const
{
std::unique_lock segment_lock(mutex);
return is_completed;
}
void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::unique_lock<std::mutex> & segment_lock)
{
if (is_detached)

View File

@ -181,6 +181,8 @@ public:
bool isDetached() const;
bool isCompleted() const;
void assertCorrectness() const;
/**
@ -294,6 +296,7 @@ private:
/// "detached" file segment means that it is not owned by cache ("detached" from cache).
/// In general case, all file segments are owned by cache.
bool is_detached = false;
bool is_completed = false;
bool is_downloaded{false};
@ -317,11 +320,6 @@ struct FileSegmentsHolder : private boost::noncopyable
String toString();
FileSegments::iterator add(FileSegmentPtr && file_segment)
{
return file_segments.insert(file_segments.end(), file_segment);
}
FileSegments file_segments{};
};

View File

@ -3720,6 +3720,8 @@ WriteSettings Context::getWriteSettings() const
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
res.remote_throttler = getRemoteWriteThrottler();

View File

@ -100,7 +100,7 @@
<max_size>22548578304</max_size>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
<enable_bypass_cache_with_threashold>1</enable_bypass_cache_with_threashold>
<bypass_cache_threashold>100</bypass_cache_threashold>
<bypass_cache_threashold>100</bypass_cache_threashold>
</s3_cache_6>
<s3_cache_small>
<type>cache</type>
@ -109,6 +109,14 @@
<max_size>1000</max_size>
<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>
</s3_cache_small>
<s3_cache_small_segment_size>
<type>cache</type>
<disk>s3_disk_6</disk>
<path>s3_cache_small_segment_size/</path>
<max_size>22548578304</max_size>
<max_file_segment_size>10Ki</max_file_segment_size>
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
</s3_cache_small_segment_size>
<!-- local disks -->
<local_disk>
<type>local</type>

View File

@ -0,0 +1,3 @@
0
80
100000

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function random {
cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z' | fold -w ${1:-8} | head -n 1
}
${CLICKHOUSE_CLIENT} --multiline --multiquery -q "
drop table if exists ttt;
create table ttt (id Int32, value String) engine=MergeTree() order by tuple() settings storage_policy='s3_cache_small_segment_size';
insert into ttt select number, toString(number) from numbers(100000) settings throw_on_error_from_cache_on_write_operations = 1;
"
query_id=$(random 8)
${CLICKHOUSE_CLIENT} --query_id "$query_id" -q "
select * from ttt format Null settings enable_filesystem_cache_log=1;
"
${CLICKHOUSE_CLIENT} --query_id "$query_id" -q " system flush logs"
${CLICKHOUSE_CLIENT} -q "
select count() from system.filesystem_cache_log where query_id = '$query_id' AND read_type != 'READ_FROM_CACHE';
"
${CLICKHOUSE_CLIENT} -q "
select count() from system.filesystem_cache_log where query_id = '$query_id' AND read_type == 'READ_FROM_CACHE';
"
${CLICKHOUSE_CLIENT} --multiline --multiquery -q "
select count() from ttt;
drop table ttt no delay;
"