Merge pull request #50033 from kssenii/disk-object-storage-minor-changes

Get rid of indirect write buffer in object storages
This commit is contained in:
alesapin 2023-05-22 14:08:17 +02:00 committed by GitHub
commit cc3897a84a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 50 additions and 61 deletions

View File

@ -255,6 +255,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
std::nullopt,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"),

View File

@ -149,6 +149,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
};
};

View File

@ -26,7 +26,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
size_t max_single_part_upload_size_,
size_t buf_size_,
const WriteSettings & write_settings_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, log(&Poco::Logger::get("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_)
, blob_path(blob_path_)

View File

@ -6,7 +6,7 @@
#include <memory>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <azure/storage/blobs.hpp>
@ -21,7 +21,7 @@ class Logger;
namespace DB
{
class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
{
public:
using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
@ -37,6 +37,9 @@ public:
void nextImpl() override;
std::string getFileName() const override { return blob_path; }
void sync() override { next(); }
private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries, size_t cost = 0);

View File

@ -1,11 +1,11 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#include "WriteBufferWithFinalizeCallback.h"
namespace DB
{
WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
WriteBufferWithFinalizeCallback::WriteBufferWithFinalizeCallback(
std::unique_ptr<WriteBuffer> impl_,
CreateMetadataCallback && create_callback_,
FinalizeCallback && create_callback_,
const String & remote_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, create_metadata_callback(std::move(create_callback_))
@ -14,7 +14,7 @@ WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
}
WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS()
WriteBufferWithFinalizeCallback::~WriteBufferWithFinalizeCallback()
{
try
{
@ -26,7 +26,7 @@ WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS()
}
}
void WriteIndirectBufferFromRemoteFS::finalizeImpl()
void WriteBufferWithFinalizeCallback::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();
if (create_metadata_callback)

View File

@ -8,25 +8,25 @@
namespace DB
{
using CreateMetadataCallback = std::function<void(size_t bytes_count)>;
using FinalizeCallback = std::function<void(size_t bytes_count)>;
/// Stores data in S3/HDFS and adds the object path and object size to metadata file on local FS.
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
class WriteBufferWithFinalizeCallback final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromRemoteFS(
WriteBufferWithFinalizeCallback(
std::unique_ptr<WriteBuffer> impl_,
CreateMetadataCallback && create_callback_,
FinalizeCallback && create_callback_,
const String & remote_path_);
~WriteIndirectBufferFromRemoteFS() override;
~WriteBufferWithFinalizeCallback() override;
String getFileName() const override { return remote_path; }
private:
void finalizeImpl() override;
CreateMetadataCallback create_metadata_callback;
FinalizeCallback create_metadata_callback;
String remote_path;
};

View File

@ -129,7 +129,6 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes>,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
@ -138,14 +137,12 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
LOG_TEST(log, "Writing file: {}", object.remote_path);
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.remote_path,
settings.get()->max_single_part_upload_size,
buf_size,
patchSettings(write_settings));
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.remote_path);
}
void AzureObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const

View File

@ -7,7 +7,6 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>
@ -83,7 +82,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -97,13 +97,12 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
const StoredObject & object,
WriteMode mode, // Cached doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
/// Add cache relating settings to WriteSettings.
auto modified_write_settings = IObjectStorage::patchSettings(write_settings);
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, modified_write_settings);
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations

View File

@ -43,7 +43,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -1,5 +1,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/IO/WriteBufferWithFinalizeCallback.h>
#include <Common/checkStackSize.h>
#include <ranges>
#include <Common/logger_useful.h>
@ -658,14 +659,16 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
return object_storage.writeObject(
auto impl = object_storage.writeObject(
object,
/// We always use mode Rewrite because we simulate append using metadata and different files
WriteMode::Rewrite,
object_attributes,
std::move(create_metadata_callback),
buf_size,
settings);
return std::make_unique<WriteBufferWithFinalizeCallback>(
std::move(impl), std::move(create_metadata_callback), object.remote_path);
}

View File

@ -9,7 +9,6 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Common/getRandomASCIIString.h>
@ -83,7 +82,6 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
@ -93,11 +91,9 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
"HDFS API doesn't support custom attributes/metadata for stored objects");
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
return std::make_unique<WriteBufferFromHDFS>(
object.remote_path, config, settings->replication, patchSettings(write_settings), buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), object.remote_path);
}

View File

@ -81,7 +81,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -48,8 +48,6 @@ struct ObjectMetadata
std::optional<ObjectAttributes> attributes;
};
using FinalizeCallback = std::function<void(size_t bytes_count)>;
/// Base class for all object storages which implement some subset of ordinary filesystem operations.
///
/// Examples of object storages are S3, Azure Blob Storage, HDFS.
@ -119,7 +117,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) = 0;

View File

@ -7,7 +7,6 @@
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -124,7 +123,6 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> /* attributes */,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & /* write_settings */)
{
@ -132,9 +130,7 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
throw Exception(ErrorCodes::BAD_ARGUMENTS, "LocalObjectStorage doesn't support append to files");
LOG_TEST(log, "Write object: {}", object.remote_path);
auto impl = std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(
std::move(impl), std::move(finalize_callback), object.remote_path);
return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
}
void LocalObjectStorage::removeObject(const StoredObject & object)

View File

@ -41,7 +41,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -8,7 +8,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/ReadBufferFromS3.h>
@ -160,8 +159,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
const StoredObject & object,
WriteMode mode, // S3 doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size [[maybe_unused]],
size_t buf_size,
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings);
@ -174,17 +172,15 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
return std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
attributes,
std::move(scheduler),
disk_write_settings);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(
std::move(s3_buffer), std::move(finalize_callback), object.remote_path);
}
void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const

View File

@ -97,7 +97,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -9,7 +9,6 @@
#include <IO/WriteHelpers.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -211,7 +210,6 @@ std::unique_ptr<WriteBufferFromFileBase> WebObjectStorage::writeObject( /// NOLI
const StoredObject & /* object */,
WriteMode /* mode */,
std::optional<ObjectAttributes> /* attributes */,
FinalizeCallback && /* finalize_callback */,
size_t /* buf_size */,
const WriteSettings & /* write_settings */)
{

View File

@ -51,7 +51,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -92,6 +92,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
client,
uri.bucket,
uri.key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings
);

View File

@ -79,11 +79,13 @@ WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
const WriteSettings & write_settings_)
: bucket(bucket_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
, request_settings(request_settings_)
, upload_settings(request_settings.getUploadSettings())

View File

@ -5,7 +5,7 @@
#if USE_AWS_S3
#include <base/types.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
@ -24,13 +24,14 @@ namespace DB
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
* Each chunk is written as a part to S3.
*/
class WriteBufferFromS3 final : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromS3 final : public WriteBufferFromFileBase
{
public:
WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
@ -39,8 +40,9 @@ public:
~WriteBufferFromS3() override;
void nextImpl() override;
void preFinalize() override;
std::string getFileName() const override { return key; }
void sync() override { next(); }
public:
class IBufferAllocationPolicy
{
public:

View File

@ -529,6 +529,7 @@ public:
client,
bucket,
file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
std::nullopt,
getAsyncPolicy().getScheduler());

View File

@ -107,8 +107,9 @@ WriteBufferFromHDFS::WriteBufferFromHDFS(
const WriteSettings & write_settings_,
size_t buf_size_,
int flags_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_, replication_, write_settings_, flags_))
, filename(hdfs_name_)
{
}

View File

@ -5,7 +5,7 @@
#if USE_HDFS
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <fcntl.h>
#include <string>
@ -17,7 +17,7 @@ namespace DB
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class WriteBufferFromHDFS final : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromHDFS final : public WriteBufferFromFileBase
{
public:
@ -29,19 +29,20 @@ public:
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
int flags = O_WRONLY);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
~WriteBufferFromHDFS() override;
void nextImpl() override;
void sync() override;
std::string getFileName() const override { return filename; }
private:
void finalizeImpl() override;
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
const std::string filename;
};
}

View File

@ -775,6 +775,7 @@ public:
configuration_.client,
bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
configuration_.request_settings,
std::nullopt,
threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelWrite"),