mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #40896 from kssenii/fix-memory-usage-for-remote
Fix extra memory allocation for remote read buffers
This commit is contained in:
commit
cadae5a2f6
@ -29,7 +29,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
|
||||
size_t max_single_download_retries_,
|
||||
bool use_external_buffer_,
|
||||
size_t read_until_position_)
|
||||
: ReadBufferFromFileBase(read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, blob_container_client(blob_container_client_)
|
||||
, path(path_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "ReadIndirectBufferFromRemoteFS.h"
|
||||
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,8 +14,8 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_)
|
||||
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, const ReadSettings & settings)
|
||||
: ReadBufferFromFileBase(settings.remote_fs_buffer_size, nullptr, 0)
|
||||
, impl(impl_)
|
||||
{
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromRemoteFSGather;
|
||||
struct ReadSettings;
|
||||
|
||||
/**
|
||||
* Reads data from S3/HDFS/Web using stored paths in metadata.
|
||||
@ -18,7 +19,7 @@ class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
|
||||
{
|
||||
|
||||
public:
|
||||
explicit ReadIndirectBufferFromRemoteFS(std::shared_ptr<ReadBufferFromRemoteFSGather> impl_);
|
||||
explicit ReadIndirectBufferFromRemoteFS(std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, const ReadSettings & settings);
|
||||
|
||||
off_t seek(off_t offset_, int whence) override;
|
||||
|
||||
|
@ -112,7 +112,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl), disk_read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings_ptr->min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
|
@ -70,11 +70,12 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
|
||||
auto hdfs_path = path.substr(begin_of_path);
|
||||
auto hdfs_uri = path.substr(0, begin_of_path);
|
||||
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config, disk_read_settings);
|
||||
return std::make_unique<ReadBufferFromHDFS>(
|
||||
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
|
||||
};
|
||||
|
||||
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(s3_impl), disk_read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings_ptr->min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
|
||||
}
|
||||
else
|
||||
{
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(web_impl));
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(web_impl), read_settings);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), min_bytes_for_seek);
|
||||
}
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ ReadBufferFromS3::ReadBufferFromS3(
|
||||
size_t offset_,
|
||||
size_t read_until_position_,
|
||||
bool restricted_seek_)
|
||||
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
|
@ -41,8 +41,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
const std::string & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(read_settings_.remote_fs_buffer_size)
|
||||
size_t read_until_position_,
|
||||
bool use_external_buffer_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size)
|
||||
, hdfs_uri(hdfs_uri_)
|
||||
, hdfs_file_path(hdfs_file_path_)
|
||||
, builder(createHDFSBuilder(hdfs_uri_, config_))
|
||||
@ -132,10 +133,12 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
|
||||
const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_)
|
||||
size_t read_until_position_,
|
||||
bool use_external_buffer_)
|
||||
: ReadBufferFromFileBase(read_settings_.remote_fs_buffer_size, nullptr, 0)
|
||||
, impl(std::make_unique<ReadBufferFromHDFSImpl>(
|
||||
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_))
|
||||
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_, use_external_buffer_))
|
||||
, use_external_buffer(use_external_buffer_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -146,7 +149,18 @@ size_t ReadBufferFromHDFS::getFileSize()
|
||||
|
||||
bool ReadBufferFromHDFS::nextImpl()
|
||||
{
|
||||
impl->position() = impl->buffer().begin() + offset();
|
||||
if (use_external_buffer)
|
||||
{
|
||||
impl->set(internal_buffer.begin(), internal_buffer.size());
|
||||
assert(working_buffer.begin() != nullptr);
|
||||
assert(!internal_buffer.empty());
|
||||
}
|
||||
else
|
||||
{
|
||||
impl->position() = impl->buffer().begin() + offset();
|
||||
assert(!impl->hasPendingData());
|
||||
}
|
||||
|
||||
auto result = impl->next();
|
||||
|
||||
if (result)
|
||||
|
@ -29,7 +29,8 @@ public:
|
||||
const String & hdfs_file_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const ReadSettings & read_settings_,
|
||||
size_t read_until_position_ = 0);
|
||||
size_t read_until_position_ = 0,
|
||||
bool use_external_buffer = false);
|
||||
|
||||
~ReadBufferFromHDFS() override;
|
||||
|
||||
@ -49,6 +50,7 @@ public:
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
|
||||
bool use_external_buffer;
|
||||
};
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user