mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Better
This commit is contained in:
parent
cef509e5b5
commit
75a6b21f19
@ -17,6 +17,36 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
class HDFSPathKeeper : public RemoteFSPathKeeper
|
||||
{
|
||||
public:
|
||||
using Chunk = std::vector<std::string>;
|
||||
using Chunks = std::list<Chunk>;
|
||||
|
||||
HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
|
||||
|
||||
void addPath(const String & path) override
|
||||
{
|
||||
if (chunks.empty() || chunks.back().size() >= chunk_limit)
|
||||
{
|
||||
chunks.push_back(Chunks::value_type());
|
||||
chunks.back().reserve(chunk_limit);
|
||||
}
|
||||
chunks.back().push_back(path.data());
|
||||
}
|
||||
|
||||
void removePaths(const std::function<void(Chunk &&)> & remove_chunk_func)
|
||||
{
|
||||
for (auto & chunk : chunks)
|
||||
remove_chunk_func(std::move(chunk));
|
||||
}
|
||||
|
||||
private:
|
||||
Chunks chunks;
|
||||
};
|
||||
|
||||
|
||||
/// Reads data from HDFS using stored paths in metadata.
|
||||
class ReadIndirectBufferFromHDFS final : public ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>
|
||||
{
|
||||
@ -54,7 +84,7 @@ DiskHDFS::DiskHDFS(
|
||||
SettingsPtr settings_,
|
||||
const String & metadata_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_)
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS")
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS", settings_->thread_pool_size)
|
||||
, config(config_)
|
||||
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
||||
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
||||
@ -98,13 +128,20 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)
|
||||
RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const
|
||||
{
|
||||
for (const auto & chunk : fs_paths_keeper)
|
||||
return std::make_shared<HDFSPathKeeper>(settings->objects_chunk_size_to_delete);
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
auto * hdfs_paths_keeper = dynamic_cast<HDFSPathKeeper *>(fs_paths_keeper.get());
|
||||
hdfs_paths_keeper->removePaths([&](std::vector<std::string> && chunk)
|
||||
{
|
||||
for (const auto & hdfs_object_path : chunk)
|
||||
{
|
||||
const String & hdfs_path = hdfs_object_path.GetKey();
|
||||
const String & hdfs_path = hdfs_object_path;
|
||||
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
|
||||
|
||||
/// Add path from root to file name
|
||||
@ -112,7 +149,7 @@ void DiskHDFS::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)
|
||||
if (res == -1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -120,7 +157,10 @@ namespace
|
||||
{
|
||||
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
return std::make_unique<DiskHDFSSettings>(config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
|
||||
return std::make_unique<DiskHDFSSettings>(
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getInt(config_prefix + ".thread_pool_size", 16),
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,9 +12,19 @@ namespace DB
|
||||
struct DiskHDFSSettings
|
||||
{
|
||||
size_t min_bytes_for_seek;
|
||||
DiskHDFSSettings(int min_bytes_for_seek_) : min_bytes_for_seek(min_bytes_for_seek_) {}
|
||||
int thread_pool_size;
|
||||
int objects_chunk_size_to_delete;
|
||||
|
||||
DiskHDFSSettings(
|
||||
int min_bytes_for_seek_,
|
||||
int thread_pool_size_,
|
||||
int objects_chunk_size_to_delete_)
|
||||
: min_bytes_for_seek(min_bytes_for_seek_)
|
||||
, thread_pool_size(thread_pool_size_)
|
||||
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Storage for persisting data in HDFS and metadata on the local disk.
|
||||
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
|
||||
@ -22,9 +32,6 @@ struct DiskHDFSSettings
|
||||
*/
|
||||
class DiskHDFS final : public IDiskRemote
|
||||
{
|
||||
|
||||
friend class DiskHDFSReservation;
|
||||
|
||||
public:
|
||||
using SettingsPtr = std::unique_ptr<DiskHDFSSettings>;
|
||||
|
||||
@ -47,7 +54,9 @@ public:
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
|
||||
void removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) override;
|
||||
void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override;
|
||||
|
||||
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
|
||||
|
||||
private:
|
||||
String getRandomName() { return toString(UUIDHelpers::generateV4()); }
|
||||
|
@ -1,7 +1,5 @@
|
||||
#include <Disks/IDiskRemote.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
@ -177,7 +175,7 @@ IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_keeper)
|
||||
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
|
||||
@ -195,7 +193,7 @@ void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_
|
||||
{
|
||||
file.remove();
|
||||
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
|
||||
fs_paths_keeper.addPath(remote_fs_root_path + remote_fs_object_path);
|
||||
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
@ -220,7 +218,7 @@ void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeper & fs_paths_keeper)
|
||||
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
@ -286,8 +284,8 @@ IDiskRemote::IDiskRemote(
|
||||
const String & remote_fs_root_path_,
|
||||
const String & metadata_path_,
|
||||
const String & log_name_,
|
||||
std::unique_ptr<Executor> executor_)
|
||||
: IDisk(std::move(executor_))
|
||||
size_t thread_pool_size)
|
||||
: IDisk(std::make_unique<AsyncExecutor>(log_name_, thread_pool_size))
|
||||
, log(&Poco::Logger::get(log_name_))
|
||||
, name(name_)
|
||||
, remote_fs_root_path(remote_fs_root_path_)
|
||||
@ -348,7 +346,7 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
|
||||
|
||||
void IDiskRemote::removeFileIfExists(const String & path)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
if (Poco::File(metadata_path + path).exists())
|
||||
{
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
@ -359,7 +357,7 @@ void IDiskRemote::removeFileIfExists(const String & path)
|
||||
|
||||
void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
if (!keep_in_remote_fs)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
@ -368,7 +366,7 @@ void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
|
||||
|
||||
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
removeMetaRecursive(path, fs_paths_keeper);
|
||||
if (!keep_in_remote_fs)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
@ -487,5 +485,3 @@ bool IDiskRemote::tryReserve(UInt64 bytes)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -1,48 +1,36 @@
|
||||
#pragma once
|
||||
#include <Common/config.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <atomic>
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include "Disks/Executor.h"
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <utility>
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API).
|
||||
/// Used for both S3 and HDFS.
|
||||
class RemoteFSPathKeeper : public std::list<Aws::Vector<Aws::S3::Model::ObjectIdentifier>>
|
||||
/// Helper class to collect paths into chunks of maximum size.
|
||||
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
|
||||
class RemoteFSPathKeeper
|
||||
{
|
||||
public:
|
||||
void addPath(const String & path)
|
||||
{
|
||||
if (empty() || back().size() >= chunk_limit)
|
||||
{ /// add one more chunk
|
||||
push_back(value_type());
|
||||
back().reserve(chunk_limit);
|
||||
}
|
||||
RemoteFSPathKeeper(size_t chunk_limit_) : chunk_limit(chunk_limit_) {}
|
||||
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(path);
|
||||
back().push_back(obj);
|
||||
}
|
||||
virtual ~RemoteFSPathKeeper() = default;
|
||||
|
||||
private:
|
||||
/// limit for one DeleteObject request
|
||||
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
const static size_t chunk_limit = 1000;
|
||||
virtual void addPath(const String & path) = 0;
|
||||
|
||||
protected:
|
||||
size_t chunk_limit;
|
||||
};
|
||||
|
||||
using RemoteFSPathKeeperPtr = std::shared_ptr<RemoteFSPathKeeper>;
|
||||
|
||||
|
||||
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
|
||||
class IDiskRemote : public IDisk
|
||||
{
|
||||
@ -55,7 +43,7 @@ public:
|
||||
const String & remote_fs_root_path_,
|
||||
const String & metadata_path_,
|
||||
const String & log_name_,
|
||||
std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>());
|
||||
size_t thread_pool_size);
|
||||
|
||||
struct Metadata;
|
||||
|
||||
@ -125,7 +113,9 @@ public:
|
||||
|
||||
ReservationPtr reserve(UInt64 bytes) override;
|
||||
|
||||
virtual void removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) = 0;
|
||||
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0;
|
||||
|
||||
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
|
||||
|
||||
protected:
|
||||
Poco::Logger * log;
|
||||
@ -135,9 +125,9 @@ protected:
|
||||
const String metadata_path;
|
||||
|
||||
private:
|
||||
void removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_keeper);
|
||||
void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
|
||||
|
||||
void removeMetaRecursive(const String & path, RemoteFSPathKeeper & fs_paths_keeper);
|
||||
void removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
|
||||
|
||||
bool tryReserve(UInt64 bytes);
|
||||
|
||||
@ -245,6 +235,49 @@ private:
|
||||
CurrentMetrics::Increment metric_increment;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
/// Runs tasks asynchronously using thread pool.
|
||||
class AsyncExecutor : public Executor
|
||||
{
|
||||
public:
|
||||
explicit AsyncExecutor(const String & name_, int thread_pool_size)
|
||||
: name(name_)
|
||||
, pool(ThreadPool(thread_pool_size)) {}
|
||||
|
||||
std::future<void> execute(std::function<void()> task) override
|
||||
{
|
||||
auto promise = std::make_shared<std::promise<void>>();
|
||||
pool.scheduleOrThrowOnError(
|
||||
[promise, task]()
|
||||
{
|
||||
try
|
||||
{
|
||||
task();
|
||||
promise->set_value();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("Failed to run async task");
|
||||
|
||||
try
|
||||
{
|
||||
promise->set_exception(std::current_exception());
|
||||
}
|
||||
catch (...) {}
|
||||
}
|
||||
});
|
||||
|
||||
return promise->get_future();
|
||||
}
|
||||
|
||||
void setMaxThreads(size_t threads)
|
||||
{
|
||||
pool.setMaxThreads(threads);
|
||||
}
|
||||
|
||||
private:
|
||||
String name;
|
||||
ThreadPool pool;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -19,8 +19,12 @@
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -35,6 +39,38 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
/// Remove s3 objects in chunks. Default chunk limit for one DeleteObject request is 1000:
|
||||
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
class S3PathKeeper : public RemoteFSPathKeeper
|
||||
{
|
||||
public:
|
||||
using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>;
|
||||
using Chunks = std::list<Chunk>;
|
||||
|
||||
S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
|
||||
|
||||
void addPath(const String & path) override
|
||||
{
|
||||
if (chunks.empty() || chunks.back().size() >= chunk_limit)
|
||||
{
|
||||
/// add one more chunk
|
||||
chunks.push_back(Chunks::value_type());
|
||||
chunks.back().reserve(chunk_limit);
|
||||
}
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(path);
|
||||
chunks.back().push_back(obj);
|
||||
}
|
||||
|
||||
void removePaths(const std::function<void(Chunk &&)> & remove_chunk_func)
|
||||
{
|
||||
for (auto & chunk : chunks)
|
||||
remove_chunk_func(std::move(chunk));
|
||||
}
|
||||
|
||||
private:
|
||||
Chunks chunks;
|
||||
};
|
||||
|
||||
String getRandomName()
|
||||
{
|
||||
@ -95,46 +131,6 @@ private:
|
||||
size_t buf_size;
|
||||
};
|
||||
|
||||
/// Runs tasks asynchronously using thread pool.
|
||||
class AsyncExecutor : public Executor
|
||||
{
|
||||
public:
|
||||
explicit AsyncExecutor(int thread_pool_size) : pool(ThreadPool(thread_pool_size)) { }
|
||||
|
||||
std::future<void> execute(std::function<void()> task) override
|
||||
{
|
||||
auto promise = std::make_shared<std::promise<void>>();
|
||||
pool.scheduleOrThrowOnError(
|
||||
[promise, task]()
|
||||
{
|
||||
try
|
||||
{
|
||||
task();
|
||||
promise->set_value();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("DiskS3", "Failed to run async task");
|
||||
|
||||
try
|
||||
{
|
||||
promise->set_exception(std::current_exception());
|
||||
} catch (...) { }
|
||||
}
|
||||
});
|
||||
|
||||
return promise->get_future();
|
||||
}
|
||||
|
||||
void setMaxThreads(size_t threads)
|
||||
{
|
||||
pool.setMaxThreads(threads);
|
||||
}
|
||||
private:
|
||||
ThreadPool pool;
|
||||
};
|
||||
|
||||
|
||||
DiskS3::DiskS3(
|
||||
String name_,
|
||||
String bucket_,
|
||||
@ -142,7 +138,7 @@ DiskS3::DiskS3(
|
||||
String metadata_path_,
|
||||
SettingsPtr settings_,
|
||||
GetDiskSettings settings_getter_)
|
||||
: IDiskRemote(name_, s3_root_path_, metadata_path_, "DiskS3", std::make_unique<AsyncExecutor>(settings_->thread_pool_size))
|
||||
: IDiskRemote(name_, s3_root_path_, metadata_path_, "DiskS3", settings_->thread_pool_size)
|
||||
, bucket(std::move(bucket_))
|
||||
, current_settings(std::move(settings_))
|
||||
, settings_getter(settings_getter_)
|
||||
@ -158,6 +154,30 @@ String DiskS3::getUniqueId(const String & path) const
|
||||
return id;
|
||||
}
|
||||
|
||||
RemoteFSPathKeeperPtr DiskS3::createFSPathKeeper() const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
return std::make_shared<S3PathKeeper>(settings->objects_chunk_size_to_delete);
|
||||
}
|
||||
|
||||
void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
auto * s3_paths_keeper = dynamic_cast<S3PathKeeper *>(fs_paths_keeper.get());
|
||||
|
||||
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
|
||||
{
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(chunk);
|
||||
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = settings->client->DeleteObjects(request);
|
||||
throwIfError(outcome);
|
||||
});
|
||||
}
|
||||
|
||||
void DiskS3::moveFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
@ -228,26 +248,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
|
||||
}
|
||||
|
||||
void DiskS3::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)
|
||||
{
|
||||
if (fs_paths_keeper.empty())
|
||||
return;
|
||||
|
||||
auto settings = current_settings.get();
|
||||
for (const auto & chunk : fs_paths_keeper)
|
||||
{
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(chunk);
|
||||
|
||||
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = settings->client->DeleteObjects(request);
|
||||
throwIfError(outcome);
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
@ -919,7 +919,8 @@ DiskS3Settings::DiskS3Settings(
|
||||
size_t min_bytes_for_seek_,
|
||||
bool send_metadata_,
|
||||
int thread_pool_size_,
|
||||
int list_object_keys_size_)
|
||||
int list_object_keys_size_,
|
||||
int objects_chunk_size_to_delete_)
|
||||
: client(client_)
|
||||
, s3_max_single_read_retries(s3_max_single_read_retries_)
|
||||
, s3_min_upload_part_size(s3_min_upload_part_size_)
|
||||
@ -928,6 +929,7 @@ DiskS3Settings::DiskS3Settings(
|
||||
, send_metadata(send_metadata_)
|
||||
, thread_pool_size(thread_pool_size_)
|
||||
, list_object_keys_size(list_object_keys_size_)
|
||||
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,8 @@ struct DiskS3Settings
|
||||
size_t min_bytes_for_seek_,
|
||||
bool send_metadata_,
|
||||
int thread_pool_size_,
|
||||
int list_object_keys_size_);
|
||||
int list_object_keys_size_,
|
||||
int objects_chunk_size_to_delete_);
|
||||
|
||||
std::shared_ptr<Aws::S3::S3Client> client;
|
||||
size_t s3_max_single_read_retries;
|
||||
@ -38,6 +39,7 @@ struct DiskS3Settings
|
||||
bool send_metadata;
|
||||
int thread_pool_size;
|
||||
int list_object_keys_size;
|
||||
int objects_chunk_size_to_delete;
|
||||
};
|
||||
|
||||
|
||||
@ -78,7 +80,9 @@ public:
|
||||
size_t buf_size,
|
||||
WriteMode mode) override;
|
||||
|
||||
void removeFromRemoteFS(const RemoteFSPathKeeper & keeper) final override;
|
||||
void removeFromRemoteFS(RemoteFSPathKeeperPtr keeper) override;
|
||||
|
||||
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
|
||||
|
||||
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
|
||||
void moveFile(const String & from_path, const String & to_path) override;
|
||||
|
@ -156,7 +156,8 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getBool(config_prefix + ".send_metadata", false),
|
||||
config.getInt(config_prefix + ".thread_pool_size", 16),
|
||||
config.getInt(config_prefix + ".list_object_keys_size", 1000));
|
||||
config.getInt(config_prefix + ".list_object_keys_size", 1000),
|
||||
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user