Add optin read_from_cache_if_exists_otherwise_bypass_cache (for merges)

This commit is contained in:
kssenii 2022-03-23 19:46:28 +01:00
parent d2a3cfe5dc
commit d4161b5925
10 changed files with 38 additions and 9 deletions

View File

@ -57,7 +57,7 @@ String IFileCache::getPathInLocalCache(const Key & key)
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
}
bool IFileCache::shouldBypassCache()
bool IFileCache::isReadOnly()
{
return !CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext()

View File

@ -43,7 +43,8 @@ public:
virtual void tryRemoveAll() = 0;
static bool shouldBypassCache();
/// If cache can be used as read only. (For merges, for example).
static bool isReadOnly();
/// Cache capacity in bytes.
size_t capacity() const { return max_size; }

View File

@ -89,7 +89,8 @@ String FileSegment::getCallerId()
String FileSegment::getCallerIdImpl(bool allow_non_strict_checking)
{
if (IFileCache::shouldBypassCache())
/// Cache is read only, if it is read operation (which can potentially do cache writes), but there is no query attached.
if (IFileCache::isReadOnly())
{
/// getCallerId() can be called from completeImpl(), which can be called from complete().
/// complete() is called from destructor of CachedReadBufferFromRemoteFS when there is no query id anymore.

View File

@ -124,6 +124,21 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
size_t wait_download_tries = 0;
auto download_state = file_segment->state();
if (settings.remote_fs_read_from_cache_if_exists_otherwise_bypass_cache)
{
if (download_state == FileSegment::State::DOWNLOADED)
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
}
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(file_segment, read_type);
}
}
while (true)
{
switch (download_state)
@ -544,8 +559,7 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
bool CachedReadBufferFromRemoteFS::nextImplStep()
{
if (IFileCache::shouldBypassCache())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Using cache when not allowed");
assertCacheAllowed();
if (!initialized)
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
@ -758,6 +772,12 @@ std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset()
return std::nullopt;
}
void CachedReadBufferFromRemoteFS::assertCacheAllowed() const
{
if (IFileCache::isReadOnly() && !settings.remote_fs_read_from_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache used when not allowed");
}
String CachedReadBufferFromRemoteFS::getInfoForLog()
{
return fmt::format("Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, file segment info: {}",

View File

@ -50,6 +50,8 @@ private:
bool nextImplStep();
void assertCacheAllowed() const;
enum class ReadType
{
CACHED,

View File

@ -38,7 +38,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
current_path = path;
auto cache = settings.remote_fs_cache;
bool with_cache = cache && settings.remote_fs_enable_cache && !IFileCache::shouldBypassCache();
bool with_cache = cache && settings.remote_fs_enable_cache;
auto remote_file_reader_creator = [=, this]()
{
@ -49,6 +49,9 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
if (with_cache)
{
if (IFileCache::isReadOnly())
settings.remote_fs_read_from_cache_if_exists_otherwise_bypass_cache = true;
return std::make_shared<CachedReadBufferFromRemoteFS>(
path, cache, remote_file_reader_creator, settings, read_until_position ? read_until_position : file_size);
}

View File

@ -79,6 +79,7 @@ struct ReadSettings
size_t remote_fs_read_backoff_max_tries = 4;
bool remote_fs_enable_cache = true;
size_t remote_fs_cache_max_wait_sec = 1;
bool remote_fs_read_from_cache_if_exists_otherwise_bypass_cache = false;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -160,7 +160,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
bool WriteBufferFromS3::cacheEnabled() const
{
return cache && IFileCache::shouldBypassCache() == false;
return cache != nullptr;
}
void WriteBufferFromS3::preFinalize()

View File

@ -261,7 +261,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()),
ctx->compression_codec,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size);
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
global_ctx->rows_written = 0;
ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0;

View File

@ -9,7 +9,7 @@
<data_cache_enabled>1</data_cache_enabled>
<cache_enabled>0</cache_enabled>
<data_cache_max_size>22548578304</data_cache_max_size>
<cache_on_insert>1<cache_on_insert>
<cache_on_write_operations>1</cache_on_write_operations>
</s3_cache>
</disks>
<policies>