Backport #71261 to 24.10: Fix memory usage increase bug in 24.10 during remote read

This commit is contained in:
robot-clickhouse 2024-11-05 12:11:25 +00:00
parent d5af396e9a
commit f6e8b350d3
9 changed files with 43 additions and 22 deletions

View File

@ -46,11 +46,13 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(0, nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, buffer_size(buffer_size_)
, reader(reader_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -112,7 +114,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
last_prefetch_info.submit_time = std::chrono::system_clock::now();
last_prefetch_info.priority = priority;
prefetch_buffer.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
prefetch_buffer.resize(buffer_size);
prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -211,7 +213,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
}
else
{
memory.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
memory.resize(buffer_size);
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);

View File

@ -27,6 +27,7 @@ public:
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
size_t buffer_size_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
@ -53,6 +54,7 @@ public:
private:
const ImplPtr impl;
const ReadSettings read_settings;
const size_t buffer_size;
IAsynchronousReader & reader;
size_t file_offset_of_buffer_end = 0;

View File

@ -41,6 +41,8 @@ public:
~CachedOnDiskReadBufferFromFile() override;
bool isCached() const override { return true; }
bool nextImpl() override;
off_t seek(off_t off, int whence) override;

View File

@ -18,24 +18,14 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!settings.enable_filesystem_cache)
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
return std::min<size_t>(std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(
settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
bool use_external_buffer_,
size_t buffer_size)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : buffer_size, nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))

View File

@ -28,7 +28,8 @@ public:
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_);
bool use_external_buffer_,
size_t buffer_size);
~ReadBufferFromRemoteFSGather() override;
@ -84,6 +85,4 @@ private:
LoggerPtr log;
};
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size);
}

View File

@ -532,19 +532,33 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return impl;
};
/// Avoid cache fragmentation by choosing bigger buffer size.
bool prefer_bigger_buffer_size = object_storage->supportsCache() && read_settings.enable_filesystem_cache;
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: settings.remote_fs_buffer_size;
size_t total_objects_size = file_size ? *file_size : getTotalSize(storage_objects);
if (total_objects_size)
buffer_size = std::min(buffer_size, total_objects_size);
const bool use_async_buffer = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
storage_objects,
read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */use_async_buffer);
/* use_external_buffer */use_async_buffer,
/* buffer_size */use_async_buffer ? 0 : buffer_size);
if (use_async_buffer)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
std::move(impl),
reader,
read_settings,
buffer_size,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());

View File

@ -51,7 +51,7 @@ TEST_F(AsynchronousBoundedReadBufferTest, setReadUntilPosition)
for (bool with_prefetch : {false, true})
{
AsynchronousBoundedReadBuffer read_buffer(createReadBufferFromFileBase(file_path, {}), remote_fs_reader, {});
AsynchronousBoundedReadBuffer read_buffer(createReadBufferFromFileBase(file_path, {}), remote_fs_reader, {}, DBMS_DEFAULT_BUFFER_SIZE);
read_buffer.setReadUntilPosition(20);
auto try_read = [&](size_t count)

View File

@ -60,6 +60,8 @@ public:
/// file offset and what getPosition() returns.
virtual bool isRegularLocalFile(size_t * /*out_view_offsee*/) { return false; }
virtual bool isCached() const { return false; }
protected:
std::optional<size_t> file_size;
ProfileCallback profile_callback;

View File

@ -517,9 +517,19 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
bool prefer_bigger_buffer_size = impl->isCached();
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: read_settings.remote_fs_buffer_size;
if (object_size)
buffer_size = std::min<size_t>(object_size, buffer_size);
auto & reader = context_->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
impl = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, modified_read_settings,
std::move(impl),
reader,
modified_read_settings,
buffer_size,
context_->getAsyncReadCounters(),
context_->getFilesystemReadPrefetchesLog());