Get rid of metadata in buffers

This commit is contained in:
alesapin 2022-04-08 16:03:00 +02:00
parent ad9fdc1d79
commit 60e36abfe9
8 changed files with 64 additions and 35 deletions

View File

@ -71,8 +71,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
blob_container_client, metadata, settings->max_single_read_retries,
settings->max_single_download_retries, read_settings);
blob_container_client, metadata.remote_fs_root_path, metadata.remote_fs_objects,
settings->max_single_read_retries, settings->max_single_download_retries, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{

View File

@ -166,9 +166,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
remote_path = remote_path.string().substr(url.size());
RemoteMetadata meta(path, remote_path);
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
meta.remote_fs_objects.emplace_back(remote_path, iter->second.size);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta, getContext(), read_settings);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta.remote_fs_root_path, meta.remote_fs_objects, getContext(), read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{

View File

@ -82,7 +82,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, remote_fs_root_path, metadata, read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, remote_fs_root_path, remote_fs_root_path, metadata.remote_fs_objects, read_settings);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}
@ -91,8 +91,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &)
{
/// Path to store new HDFS object.
auto file_name = getRandomName();
auto hdfs_path = fs::path(remote_fs_root_path) / file_name;
std::string file_name = getRandomName();
std::string hdfs_path = fs::path(remote_fs_root_path) / file_name;
LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_disk->getPath() + path), hdfs_path);

View File

@ -122,7 +122,8 @@ void IDiskRemote::Metadata::load()
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', *buf);
remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size};
remote_fs_objects[i].relative_path = remote_fs_object_path;
remote_fs_objects[i].bytes_size = remote_fs_object_size;
}
readIntText(ref_count, *buf);
@ -638,7 +639,7 @@ String IDiskRemote::getUniqueId(const String & path) const
auto metadata = readMetadata(path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path;
return id;
}

View File

@ -13,7 +13,6 @@
#include <Common/ThreadPool.h>
#include <filesystem>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
@ -22,6 +21,24 @@ namespace CurrentMetrics
namespace DB
{
/// Path to blob with it's size
struct BlobPathWithSize
{
std::string relative_path;
uint64_t bytes_size;
BlobPathWithSize() = default;
BlobPathWithSize(const BlobPathWithSize & other) = default;
BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_)
: relative_path(relative_path_)
, bytes_size(bytes_size_)
{}
};
/// List of blobs with their sizes
using BlobsPathToSize = std::vector<BlobPathWithSize>;
/// Helper class to collect paths into chunks of maximum size.
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
class RemoteFSPathKeeper
@ -191,10 +208,8 @@ using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
struct RemoteMetadata
{
using PathAndSize = std::pair<String, size_t>;
/// Remote FS objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
std::vector<BlobPathWithSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;

View File

@ -43,7 +43,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
auto remote_file_reader_creator = [=, this]()
{
return std::make_unique<ReadBufferFromS3>(
client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries,
client_ptr, bucket, fs::path(common_path_prefix) / path, max_single_read_retries,
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
};
@ -83,9 +83,13 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const
#endif
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const ReadSettings & settings_)
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
const ReadSettings & settings_)
: ReadBuffer(nullptr, 0)
, metadata(metadata_)
, common_path_prefix(common_path_prefix_)
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
@ -118,9 +122,9 @@ void ReadBufferFromRemoteFSGather::initialize()
{
/// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
for (size_t i = 0; i < blobs_to_read.size(); ++i)
{
const auto & [file_path, size] = metadata.remote_fs_objects[i];
const auto & [file_path, size] = blobs_to_read[i];
if (size > current_buf_offset)
{
@ -137,7 +141,7 @@ void ReadBufferFromRemoteFSGather::initialize()
current_buf_offset -= size;
}
current_buf_idx = metadata.remote_fs_objects.size();
current_buf_idx = blobs_to_read.size();
current_buf = nullptr;
}
@ -167,12 +171,12 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
{
/// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= metadata.remote_fs_objects.size())
if (current_buf_idx + 1 >= blobs_to_read.size())
return false;
++current_buf_idx;
const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx];
const auto & [path, size] = blobs_to_read[current_buf_idx];
current_buf = createImplementationBuffer(path, size);
return true;
@ -201,7 +205,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
if (!result)
result = current_buf->next();
if (metadata.remote_fs_objects.size() == 1)
if (blobs_to_read.size() == 1)
{
file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd();
}
@ -254,8 +258,8 @@ String ReadBufferFromRemoteFSGather::getFileName() const
size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;
for (const auto & object : metadata.remote_fs_objects)
size += object.second;
for (const auto & object : blobs_to_read)
size += object.bytes_size;
return size;
}

View File

@ -16,6 +16,8 @@ namespace Poco { class Logger; }
namespace DB
{
/**
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
* This class works like a proxy to allow transition from one file into multiple.
@ -26,7 +28,8 @@ friend class ReadIndirectBufferFromRemoteFS;
public:
ReadBufferFromRemoteFSGather(
const RemoteMetadata & metadata_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
const ReadSettings & settings_);
String getFileName() const;
@ -56,7 +59,9 @@ public:
protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0;
RemoteMetadata metadata;
std::string common_path_prefix;
BlobsPathToSize blobs_to_read;
ReadSettings settings;
@ -100,10 +105,11 @@ public:
ReadBufferFromS3Gather(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
IDiskRemote::Metadata metadata_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
size_t max_single_read_retries_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_, settings_)
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, max_single_read_retries(max_single_read_retries_)
@ -127,11 +133,12 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
public:
ReadBufferFromAzureBlobStorageGather(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
IDiskRemote::Metadata metadata_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_, settings_)
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
, blob_container_client(blob_container_client_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
@ -153,10 +160,11 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
public:
ReadBufferFromWebServerGather(
const String & uri_,
RemoteMetadata metadata_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
ContextPtr context_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_, settings_)
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
, uri(uri_)
, context(context_)
{
@ -178,9 +186,10 @@ public:
ReadBufferFromHDFSGather(
const Poco::Util::AbstractConfiguration & config_,
const String & hdfs_uri_,
IDiskRemote::Metadata metadata_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_, settings_)
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
, config(config_)
{
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);

View File

@ -237,7 +237,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
}
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
settings->client, bucket, metadata,
settings->client, bucket, metadata.remote_fs_root_path, metadata.remote_fs_objects,
settings->s3_max_single_read_retries, disk_read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)