mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #38436 from kssenii/remote-path-separation-in-object-storage
more consistent work with paths in object storages
This commit is contained in:
commit
50eb364a56
@ -75,7 +75,7 @@ public:
|
||||
void startup(ContextPtr context) override;
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
|
||||
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
|
||||
std::vector<String> getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); }
|
||||
PathsWithSize getObjectStoragePaths(const String & path) const override { return delegate->getObjectStoragePaths(path); }
|
||||
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
|
||||
|
||||
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
|
||||
|
@ -318,10 +318,10 @@ String DiskRestartProxy::getCacheBasePath() const
|
||||
return DiskDecorator::getCacheBasePath();
|
||||
}
|
||||
|
||||
std::vector<String> DiskRestartProxy::getRemotePaths(const String & path) const
|
||||
PathsWithSize DiskRestartProxy::getObjectStoragePaths(const String & path) const
|
||||
{
|
||||
ReadLock lock (mutex);
|
||||
return DiskDecorator::getRemotePaths(path);
|
||||
return DiskDecorator::getObjectStoragePaths(path);
|
||||
}
|
||||
|
||||
void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
||||
|
@ -65,7 +65,7 @@ public:
|
||||
String getUniqueId(const String & path) const override;
|
||||
bool checkUniqueId(const String & id) const override;
|
||||
String getCacheBasePath() const override;
|
||||
std::vector<String> getRemotePaths(const String & path) const override;
|
||||
PathsWithSize getObjectStoragePaths(const String & path) const override;
|
||||
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
||||
|
||||
void restart(ContextPtr context);
|
||||
|
@ -170,10 +170,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
||||
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||
remote_path = remote_path.string().substr(url.size());
|
||||
|
||||
std::vector<BlobPathWithSize> blobs_to_read;
|
||||
PathsWithSize blobs_to_read;
|
||||
blobs_to_read.emplace_back(remote_path, iter->second.size);
|
||||
|
||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, path, blobs_to_read, getContext(), read_settings);
|
||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, blobs_to_read, getContext(), read_settings);
|
||||
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
|
@ -169,7 +169,7 @@ public:
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
|
||||
}
|
||||
|
||||
std::vector<String> getRemotePaths(const String &) const override { return {}; }
|
||||
PathsWithSize getObjectStoragePaths(const String &) const override { return {}; }
|
||||
|
||||
void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &) override {}
|
||||
|
||||
|
@ -219,13 +219,13 @@ public:
|
||||
|
||||
/// Returns a list of paths because for Log family engines there might be
|
||||
/// multiple files in remote fs for single clickhouse file.
|
||||
virtual std::vector<String> getRemotePaths(const String &) const
|
||||
virtual PathsWithSize getObjectStoragePaths(const String &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePaths() not implemented for disk: {}`", getType());
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getObjectStoragePaths() not implemented for disk: {}`", getType());
|
||||
}
|
||||
|
||||
/// For one local path there might be multiple remote paths in case of Log family engines.
|
||||
using LocalPathWithRemotePaths = std::pair<String, std::vector<String>>;
|
||||
using LocalPathWithRemotePaths = std::pair<String, PathsWithSize>;
|
||||
|
||||
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &)
|
||||
{
|
||||
|
@ -40,7 +40,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
||||
appendFilesystemCacheLog();
|
||||
}
|
||||
|
||||
current_file_path = fs::path(common_path_prefix) / path;
|
||||
current_file_path = path;
|
||||
current_file_size = file_size;
|
||||
total_bytes_read_from_current_file = 0;
|
||||
|
||||
@ -50,18 +50,30 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
||||
#if USE_AWS_S3
|
||||
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(const String & path, size_t file_size)
|
||||
{
|
||||
auto remote_path = fs::path(common_path_prefix) / path;
|
||||
auto remote_file_reader_creator = [=, this]()
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client_ptr, bucket, remote_path, version_id, max_single_read_retries,
|
||||
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
|
||||
client_ptr,
|
||||
bucket,
|
||||
path,
|
||||
version_id,
|
||||
max_single_read_retries,
|
||||
settings,
|
||||
/* use_external_buffer */true,
|
||||
/* offset */0,
|
||||
read_until_position,
|
||||
/* restricted_seek */true);
|
||||
};
|
||||
|
||||
if (with_cache)
|
||||
{
|
||||
return std::make_shared<CachedReadBufferFromRemoteFS>(
|
||||
remote_path, settings.remote_fs_cache, remote_file_reader_creator, settings, query_id, read_until_position ? read_until_position : file_size);
|
||||
path,
|
||||
settings.remote_fs_cache,
|
||||
remote_file_reader_creator,
|
||||
settings,
|
||||
query_id,
|
||||
read_until_position ? read_until_position : file_size);
|
||||
}
|
||||
|
||||
return remote_file_reader_creator();
|
||||
@ -72,34 +84,46 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(con
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
||||
{
|
||||
current_file_path = path;
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(blob_container_client, path, max_single_read_retries,
|
||||
max_single_download_retries, settings.remote_fs_buffer_size, /* use_external_buffer */true, read_until_position);
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(
|
||||
blob_container_client,
|
||||
path,
|
||||
max_single_read_retries,
|
||||
max_single_download_retries,
|
||||
settings.remote_fs_buffer_size,
|
||||
/* use_external_buffer */true,
|
||||
read_until_position);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
||||
{
|
||||
current_file_path = path;
|
||||
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, /* use_external_buffer */true, read_until_position);
|
||||
return std::make_unique<ReadBufferFromWebServer>(
|
||||
fs::path(uri) / path,
|
||||
context,
|
||||
settings,
|
||||
/* use_external_buffer */true,
|
||||
read_until_position);
|
||||
}
|
||||
|
||||
|
||||
#if USE_HDFS
|
||||
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
||||
{
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, settings.remote_fs_buffer_size);
|
||||
size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||
auto hdfs_path = path.substr(begin_of_path);
|
||||
auto hdfs_uri = path.substr(0, begin_of_path);
|
||||
LOG_TEST(log, "HDFS uri: {}, path: {}", hdfs_path, hdfs_uri);
|
||||
|
||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBuffer(nullptr, 0)
|
||||
, common_path_prefix(common_path_prefix_)
|
||||
, blobs_to_read(blobs_to_read_)
|
||||
, settings(settings_)
|
||||
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
|
||||
|
@ -27,8 +27,7 @@ friend class ReadIndirectBufferFromRemoteFS;
|
||||
|
||||
public:
|
||||
ReadBufferFromRemoteFSGather(
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
const ReadSettings & settings_);
|
||||
|
||||
~ReadBufferFromRemoteFSGather() override;
|
||||
@ -54,9 +53,7 @@ public:
|
||||
protected:
|
||||
virtual SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) = 0;
|
||||
|
||||
std::string common_path_prefix;
|
||||
|
||||
BlobsPathToSize blobs_to_read;
|
||||
PathsWithSize blobs_to_read;
|
||||
|
||||
ReadSettings settings;
|
||||
|
||||
@ -69,6 +66,8 @@ protected:
|
||||
|
||||
String query_id;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
private:
|
||||
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size);
|
||||
|
||||
@ -95,8 +94,6 @@ private:
|
||||
*/
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
size_t total_bytes_read_from_current_file = 0;
|
||||
|
||||
bool enable_cache_log = false;
|
||||
@ -112,11 +109,10 @@ public:
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & version_id_,
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
size_t max_single_read_retries_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, bucket(bucket_)
|
||||
, version_id(version_id_)
|
||||
@ -142,12 +138,11 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
|
||||
public:
|
||||
ReadBufferFromAzureBlobStorageGather(
|
||||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
size_t max_single_read_retries_,
|
||||
size_t max_single_download_retries_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||
, blob_container_client(blob_container_client_)
|
||||
, max_single_read_retries(max_single_read_retries_)
|
||||
, max_single_download_retries(max_single_download_retries_)
|
||||
@ -169,11 +164,10 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
|
||||
public:
|
||||
ReadBufferFromWebServerGather(
|
||||
const String & uri_,
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
ContextPtr context_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||
, uri(uri_)
|
||||
, context(context_)
|
||||
{
|
||||
@ -194,25 +188,19 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather
|
||||
public:
|
||||
ReadBufferFromHDFSGather(
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
const String & hdfs_uri_,
|
||||
const std::string & common_path_prefix_,
|
||||
const BlobsPathToSize & blobs_to_read_,
|
||||
const PathsWithSize & blobs_to_read_,
|
||||
const ReadSettings & settings_)
|
||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||
, config(config_)
|
||||
{
|
||||
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);
|
||||
hdfs_directory = hdfs_uri_.substr(begin_of_path);
|
||||
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
||||
}
|
||||
|
||||
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
|
||||
|
||||
private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
String hdfs_uri;
|
||||
String hdfs_directory;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
}
|
||||
|
@ -19,7 +19,8 @@ class ReadBufferFromWebServer : public SeekableReadBuffer
|
||||
{
|
||||
public:
|
||||
explicit ReadBufferFromWebServer(
|
||||
const String & url_, ContextPtr context_,
|
||||
const String & url_,
|
||||
ContextPtr context_,
|
||||
const ReadSettings & settings_ = {},
|
||||
bool use_external_buffer_ = false,
|
||||
size_t read_until_position = 0);
|
||||
|
@ -67,16 +67,18 @@ std::unique_ptr<SeekableReadBuffer> AzureObjectStorage::readObject( /// NOLINT
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings,
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
auto settings_ptr = settings.get();
|
||||
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
||||
client.get(), common_path_prefix, blobs_to_read,
|
||||
settings_ptr->max_single_read_retries, settings_ptr->max_single_download_retries, read_settings);
|
||||
client.get(),
|
||||
paths_to_read,
|
||||
settings_ptr->max_single_read_retries,
|
||||
settings_ptr->max_single_download_retries,
|
||||
read_settings);
|
||||
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
@ -111,7 +113,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
||||
void AzureObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
@ -134,10 +136,10 @@ void AzureObjectStorage::removeObject(const std::string & path)
|
||||
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
||||
void AzureObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
for (const auto & path : paths)
|
||||
for (const auto & [path, _] : paths)
|
||||
{
|
||||
auto delete_info = client_ptr->DeleteBlob(path);
|
||||
if (!delete_info.Value.Deleted)
|
||||
@ -151,10 +153,10 @@ void AzureObjectStorage::removeObjectIfExists(const std::string & path)
|
||||
auto delete_info = client_ptr->DeleteBlob(path);
|
||||
}
|
||||
|
||||
void AzureObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
||||
void AzureObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
for (const auto & path : paths)
|
||||
for (const auto & [path, _] : paths)
|
||||
auto delete_info = client_ptr->DeleteBlob(path);
|
||||
}
|
||||
|
||||
|
@ -59,8 +59,7 @@ public:
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & blobs_to_read,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
@ -74,15 +73,16 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
||||
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void removeObject(const std::string & path) override;
|
||||
|
||||
void removeObjects(const std::vector<std::string> & paths) override;
|
||||
void removeObjects(const PathsWithSize & paths) override;
|
||||
|
||||
void removeObjectIfExists(const std::string & path) override;
|
||||
|
||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
||||
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||
|
||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||
|
||||
@ -95,11 +95,19 @@ public:
|
||||
|
||||
void startup() override {}
|
||||
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
void applyNewSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
String getObjectsNamespace() const override { return ""; }
|
||||
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||
const std::string & new_namespace,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
|
||||
private:
|
||||
const String name;
|
||||
|
@ -108,9 +108,9 @@ DiskObjectStorage::DiskObjectStorage(
|
||||
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
|
||||
{}
|
||||
|
||||
std::vector<String> DiskObjectStorage::getRemotePaths(const String & local_path) const
|
||||
PathsWithSize DiskObjectStorage::getObjectStoragePaths(const String & local_path) const
|
||||
{
|
||||
return metadata_storage->getRemotePaths(local_path);
|
||||
return metadata_storage->getObjectStoragePaths(local_path);
|
||||
}
|
||||
|
||||
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
||||
@ -120,7 +120,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
|
||||
{
|
||||
try
|
||||
{
|
||||
paths_map.emplace_back(local_path, getRemotePaths(local_path));
|
||||
paths_map.emplace_back(local_path, getObjectStoragePaths(local_path));
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
@ -244,9 +244,9 @@ String DiskObjectStorage::getUniqueId(const String & path) const
|
||||
{
|
||||
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
|
||||
String id;
|
||||
auto blobs_paths = metadata_storage->getRemotePaths(path);
|
||||
auto blobs_paths = metadata_storage->getObjectStoragePaths(path);
|
||||
if (!blobs_paths.empty())
|
||||
id = blobs_paths[0];
|
||||
id = blobs_paths[0].path;
|
||||
return id;
|
||||
}
|
||||
|
||||
@ -438,7 +438,11 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
|
||||
std::optional<size_t> read_hint,
|
||||
std::optional<size_t> file_size) const
|
||||
{
|
||||
return object_storage->readObjects(remote_fs_root_path, metadata_storage->getBlobs(path), settings, read_hint, file_size);
|
||||
return object_storage->readObjects(
|
||||
metadata_storage->getObjectStoragePaths(path),
|
||||
settings,
|
||||
read_hint,
|
||||
file_size);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||
|
@ -49,7 +49,7 @@ public:
|
||||
|
||||
const String & getPath() const override { return metadata_storage->getPath(); }
|
||||
|
||||
std::vector<String> getRemotePaths(const String & local_path) const override;
|
||||
PathsWithSize getObjectStoragePaths(const String & local_path) const override;
|
||||
|
||||
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
||||
|
||||
|
@ -11,6 +11,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_FORMAT;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
||||
@ -26,14 +27,14 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
||||
|
||||
assertChar('\n', buf);
|
||||
|
||||
UInt32 remote_fs_objects_count;
|
||||
readIntText(remote_fs_objects_count, buf);
|
||||
UInt32 storage_objects_count;
|
||||
readIntText(storage_objects_count, buf);
|
||||
assertChar('\t', buf);
|
||||
readIntText(total_size, buf);
|
||||
assertChar('\n', buf);
|
||||
remote_fs_objects.resize(remote_fs_objects_count);
|
||||
storage_objects.resize(storage_objects_count);
|
||||
|
||||
for (size_t i = 0; i < remote_fs_objects_count; ++i)
|
||||
for (size_t i = 0; i < storage_objects_count; ++i)
|
||||
{
|
||||
String remote_fs_object_path;
|
||||
size_t remote_fs_object_size;
|
||||
@ -50,8 +51,8 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
||||
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
||||
}
|
||||
assertChar('\n', buf);
|
||||
remote_fs_objects[i].relative_path = remote_fs_object_path;
|
||||
remote_fs_objects[i].bytes_size = remote_fs_object_size;
|
||||
storage_objects[i].relative_path = remote_fs_object_path;
|
||||
storage_objects[i].bytes_size = remote_fs_object_size;
|
||||
}
|
||||
|
||||
readIntText(ref_count, buf);
|
||||
@ -75,12 +76,12 @@ void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
||||
writeIntText(VERSION_READ_ONLY_FLAG, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
writeIntText(remote_fs_objects.size(), buf);
|
||||
writeIntText(storage_objects.size(), buf);
|
||||
writeChar('\t', buf);
|
||||
writeIntText(total_size, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
|
||||
for (const auto & [remote_fs_object_path, remote_fs_object_size] : storage_objects)
|
||||
{
|
||||
writeIntText(remote_fs_object_size, buf);
|
||||
writeChar('\t', buf);
|
||||
@ -119,8 +120,11 @@ DiskObjectStorageMetadata::DiskObjectStorageMetadata(
|
||||
|
||||
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
|
||||
{
|
||||
if (!remote_fs_root_path.empty() && path.starts_with(remote_fs_root_path))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected relative path");
|
||||
|
||||
total_size += size;
|
||||
remote_fs_objects.emplace_back(path, size);
|
||||
storage_objects.emplace_back(path, size);
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,6 +12,17 @@ namespace DB
|
||||
struct DiskObjectStorageMetadata
|
||||
{
|
||||
private:
|
||||
struct RelativePathWithSize
|
||||
{
|
||||
String relative_path;
|
||||
size_t bytes_size;
|
||||
|
||||
RelativePathWithSize() = default;
|
||||
|
||||
RelativePathWithSize(const String & relative_path_, size_t bytes_size_)
|
||||
: relative_path(relative_path_), bytes_size(bytes_size_) {}
|
||||
};
|
||||
|
||||
/// Metadata file version.
|
||||
static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1;
|
||||
static constexpr uint32_t VERSION_RELATIVE_PATHS = 2;
|
||||
@ -19,8 +30,8 @@ private:
|
||||
|
||||
const std::string & common_metadata_path;
|
||||
|
||||
/// Remote FS objects paths and their sizes.
|
||||
std::vector<BlobPathWithSize> remote_fs_objects;
|
||||
/// Relative paths of blobs.
|
||||
std::vector<RelativePathWithSize> storage_objects;
|
||||
|
||||
/// URI
|
||||
const std::string & remote_fs_root_path;
|
||||
@ -60,9 +71,9 @@ public:
|
||||
return remote_fs_root_path;
|
||||
}
|
||||
|
||||
std::vector<BlobPathWithSize> getBlobs() const
|
||||
std::vector<RelativePathWithSize> getBlobsRelativePaths() const
|
||||
{
|
||||
return remote_fs_objects;
|
||||
return storage_objects;
|
||||
}
|
||||
|
||||
bool isReadOnly() const
|
||||
|
@ -84,13 +84,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
|
||||
{
|
||||
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
|
||||
|
||||
auto blobs = disk->metadata_storage->getBlobs(path);
|
||||
for (const auto & [key, _] : blobs)
|
||||
auto objects = disk->metadata_storage->getObjectStoragePaths(path);
|
||||
for (const auto & [object_path, _] : objects)
|
||||
{
|
||||
ObjectAttributes metadata {
|
||||
{"path", path}
|
||||
};
|
||||
updateObjectMetadata(disk->remote_fs_root_path + key, metadata);
|
||||
updateObjectMetadata(object_path, metadata);
|
||||
}
|
||||
}
|
||||
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
|
||||
@ -346,7 +346,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
|
||||
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
|
||||
|
||||
std::vector<std::future<void>> results;
|
||||
auto restore_files = [this, &source_object_storage, &restore_information, &results](const BlobsPathToSize & keys)
|
||||
auto restore_files = [this, &source_object_storage, &restore_information, &results](const PathsWithSize & keys)
|
||||
{
|
||||
std::vector<String> keys_names;
|
||||
for (const auto & [key, size] : keys)
|
||||
@ -379,7 +379,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
|
||||
return true;
|
||||
};
|
||||
|
||||
BlobsPathToSize children;
|
||||
PathsWithSize children;
|
||||
source_object_storage->listPrefix(restore_information.source_path, children);
|
||||
|
||||
restore_files(children);
|
||||
@ -456,7 +456,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
||||
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path;
|
||||
|
||||
std::set<String> renames;
|
||||
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys)
|
||||
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const PathsWithSize & keys)
|
||||
{
|
||||
const String rename = "rename";
|
||||
const String hardlink = "hardlink";
|
||||
@ -523,7 +523,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
||||
return true;
|
||||
};
|
||||
|
||||
BlobsPathToSize children;
|
||||
PathsWithSize children;
|
||||
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
||||
restore_file_operations(children);
|
||||
|
||||
|
@ -65,7 +65,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
std::string path;
|
||||
bool delete_metadata_only;
|
||||
bool remove_from_cache{false};
|
||||
std::vector<std::string> paths_to_remove;
|
||||
PathsWithSize paths_to_remove;
|
||||
bool if_exists;
|
||||
|
||||
RemoveObjectStorageOperation(
|
||||
@ -96,13 +96,13 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
try
|
||||
{
|
||||
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
|
||||
auto remote_objects = metadata_storage.getRemotePaths(path);
|
||||
auto objects = metadata_storage.getObjectStoragePaths(path);
|
||||
|
||||
tx->unlinkMetadata(path);
|
||||
|
||||
if (hardlink_count == 0)
|
||||
{
|
||||
paths_to_remove = remote_objects;
|
||||
paths_to_remove = objects;
|
||||
remove_from_cache = true;
|
||||
}
|
||||
}
|
||||
@ -134,7 +134,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
if (remove_from_cache)
|
||||
{
|
||||
for (const auto & path_to_remove : paths_to_remove)
|
||||
object_storage.removeFromCache(path_to_remove);
|
||||
object_storage.removeFromCache(path_to_remove.path);
|
||||
}
|
||||
|
||||
}
|
||||
@ -143,10 +143,10 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
{
|
||||
std::string path;
|
||||
std::unordered_map<std::string, std::vector<std::string>> paths_to_remove;
|
||||
std::unordered_map<std::string, PathsWithSize> paths_to_remove;
|
||||
bool keep_all_batch_data;
|
||||
NameSet file_names_remove_metadata_only;
|
||||
std::vector<std::string> path_to_remove_from_cache;
|
||||
PathsWithSize path_to_remove_from_cache;
|
||||
|
||||
RemoveRecursiveObjectStorageOperation(
|
||||
IObjectStorage & object_storage_,
|
||||
@ -169,14 +169,14 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
||||
try
|
||||
{
|
||||
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path_to_remove);
|
||||
auto remote_objects = metadata_storage.getRemotePaths(path_to_remove);
|
||||
auto objects_paths = metadata_storage.getObjectStoragePaths(path_to_remove);
|
||||
|
||||
tx->unlinkMetadata(path_to_remove);
|
||||
|
||||
if (hardlink_count == 0)
|
||||
{
|
||||
paths_to_remove[path_to_remove] = remote_objects;
|
||||
path_to_remove_from_cache.insert(path_to_remove_from_cache.end(), remote_objects.begin(), remote_objects.end());
|
||||
paths_to_remove[path_to_remove] = objects_paths;
|
||||
path_to_remove_from_cache.insert(path_to_remove_from_cache.end(), objects_paths.begin(), objects_paths.end());
|
||||
}
|
||||
|
||||
}
|
||||
@ -217,7 +217,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
||||
{
|
||||
if (!keep_all_batch_data)
|
||||
{
|
||||
std::vector<std::string> remove_from_remote;
|
||||
PathsWithSize remove_from_remote;
|
||||
for (auto && [local_path, remote_paths] : paths_to_remove)
|
||||
{
|
||||
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
|
||||
@ -228,7 +228,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
||||
object_storage.removeObjects(remove_from_remote);
|
||||
}
|
||||
|
||||
for (const auto & path_to_remove : path_to_remove_from_cache)
|
||||
for (const auto & [path_to_remove, _] : path_to_remove_from_cache)
|
||||
object_storage.removeFromCache(path_to_remove);
|
||||
}
|
||||
};
|
||||
@ -238,7 +238,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
|
||||
{
|
||||
std::string path_from;
|
||||
std::string path_to;
|
||||
std::vector<std::string> blobs_to_remove;
|
||||
PathsWithSize blobs_to_remove;
|
||||
|
||||
ReplaceFileObjectStorageOperation(
|
||||
IObjectStorage & object_storage_,
|
||||
@ -254,7 +254,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
|
||||
{
|
||||
if (metadata_storage.exists(path_to))
|
||||
{
|
||||
blobs_to_remove = metadata_storage.getRemotePaths(path_to);
|
||||
blobs_to_remove = metadata_storage.getObjectStoragePaths(path_to);
|
||||
tx->replaceFile(path_from, path_to);
|
||||
}
|
||||
else
|
||||
@ -328,14 +328,15 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
void execute(MetadataTransactionPtr tx) override
|
||||
{
|
||||
tx->createEmptyMetadataFile(to_path);
|
||||
auto source_blobs = metadata_storage.getBlobs(from_path);
|
||||
auto source_blobs = metadata_storage.getObjectStoragePaths(from_path); /// Full paths
|
||||
|
||||
for (const auto & [blob_from, size] : source_blobs)
|
||||
{
|
||||
auto blob_name = getRandomASCIIString();
|
||||
|
||||
auto blob_to = fs::path(remote_fs_root_path) / blob_name;
|
||||
|
||||
object_storage.copyObject(fs::path(remote_fs_root_path) / blob_from, blob_to);
|
||||
object_storage.copyObject(blob_from, blob_to);
|
||||
|
||||
tx->addBlobToMetadata(to_path, blob_name, size);
|
||||
|
||||
|
@ -49,13 +49,12 @@ std::unique_ptr<SeekableReadBuffer> HDFSObjectStorage::readObject( /// NOLINT
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings,
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
{
|
||||
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, common_path_prefix, common_path_prefix, blobs_to_read, read_settings);
|
||||
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, paths_to_read, read_settings);
|
||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||
}
|
||||
@ -69,7 +68,9 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
||||
const WriteSettings &)
|
||||
{
|
||||
if (attributes.has_value())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
throw Exception(
|
||||
ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
|
||||
/// Single O_WRONLY in libhdfs adds O_TRUNC
|
||||
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
|
||||
@ -80,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
||||
}
|
||||
|
||||
|
||||
void HDFSObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
||||
void HDFSObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||
{
|
||||
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||
int32_t num_entries;
|
||||
@ -104,10 +105,10 @@ void HDFSObjectStorage::removeObject(const std::string & path)
|
||||
|
||||
}
|
||||
|
||||
void HDFSObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
||||
void HDFSObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||
{
|
||||
for (const auto & hdfs_path : paths)
|
||||
removeObject(hdfs_path);
|
||||
for (const auto & [path, _] : paths)
|
||||
removeObject(path);
|
||||
}
|
||||
|
||||
void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
|
||||
@ -116,15 +117,17 @@ void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
|
||||
removeObject(path);
|
||||
}
|
||||
|
||||
void HDFSObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
||||
void HDFSObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||
{
|
||||
for (const auto & hdfs_path : paths)
|
||||
removeObjectIfExists(hdfs_path);
|
||||
for (const auto & [path, _] : paths)
|
||||
removeObjectIfExists(path);
|
||||
}
|
||||
|
||||
ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
throw Exception(
|
||||
ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
}
|
||||
|
||||
void HDFSObjectStorage::copyObject( /// NOLINT
|
||||
@ -133,7 +136,9 @@ void HDFSObjectStorage::copyObject( /// NOLINT
|
||||
std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
if (object_to_attributes.has_value())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
throw Exception(
|
||||
ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||
|
||||
auto in = readObject(object_from);
|
||||
auto out = writeObject(object_to, WriteMode::Rewrite);
|
||||
|
@ -61,8 +61,7 @@ public:
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
@ -76,15 +75,16 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
||||
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void removeObject(const std::string & path) override;
|
||||
|
||||
void removeObjects(const std::vector<std::string> & paths) override;
|
||||
void removeObjects(const PathsWithSize & paths) override;
|
||||
|
||||
void removeObjectIfExists(const std::string & path) override;
|
||||
|
||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
||||
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||
|
||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||
|
||||
@ -97,11 +97,18 @@ public:
|
||||
|
||||
void startup() override;
|
||||
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
void applyNewSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
String getObjectsNamespace() const override { return ""; }
|
||||
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||
const std::string & new_namespace,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
@ -110,8 +117,6 @@ private:
|
||||
HDFSFSPtr hdfs_fs;
|
||||
|
||||
SettingsPtr settings;
|
||||
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -119,11 +119,9 @@ public:
|
||||
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
|
||||
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
|
||||
|
||||
/// Return list of paths corresponding to metadata stored in local path
|
||||
virtual std::vector<std::string> getRemotePaths(const std::string & path) const = 0;
|
||||
|
||||
/// Return [(remote_path, size_in_bytes), ...] for metadata path
|
||||
virtual BlobsPathToSize getBlobs(const std::string & path) const = 0;
|
||||
/// Return [(object_storage_path, size_in_bytes), ...] for metadata path
|
||||
/// object_storage_path is a full path to the blob.
|
||||
virtual PathsWithSize getObjectStoragePaths(const std::string & path) const = 0;
|
||||
};
|
||||
|
||||
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
||||
|
@ -25,23 +25,23 @@ class WriteBufferFromFileBase;
|
||||
|
||||
using ObjectAttributes = std::map<std::string, std::string>;
|
||||
|
||||
/// Path to blob with it's size
|
||||
struct BlobPathWithSize
|
||||
/// Path to a file and its size.
|
||||
/// Path can be either relative or absolute - according to the context of use.
|
||||
struct PathWithSize
|
||||
{
|
||||
std::string relative_path;
|
||||
std::string path;
|
||||
uint64_t bytes_size;
|
||||
|
||||
BlobPathWithSize() = default;
|
||||
BlobPathWithSize(const BlobPathWithSize & other) = default;
|
||||
PathWithSize() = default;
|
||||
|
||||
BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_)
|
||||
: relative_path(relative_path_)
|
||||
PathWithSize(const std::string & path_, uint64_t bytes_size_)
|
||||
: path(path_)
|
||||
, bytes_size(bytes_size_)
|
||||
{}
|
||||
};
|
||||
|
||||
/// List of blobs with their sizes
|
||||
using BlobsPathToSize = std::vector<BlobPathWithSize>;
|
||||
/// List of paths with their sizes
|
||||
using PathsWithSize = std::vector<PathWithSize>;
|
||||
|
||||
struct ObjectMetadata
|
||||
{
|
||||
@ -65,8 +65,8 @@ public:
|
||||
/// Path exists or not
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
|
||||
/// List on prefix, return children with their sizes.
|
||||
virtual void listPrefix(const std::string & path, BlobsPathToSize & children) const = 0;
|
||||
/// List on prefix, return children (relative paths) with their sizes.
|
||||
virtual void listPrefix(const std::string & path, PathsWithSize & children) const = 0;
|
||||
|
||||
/// Get object metadata if supported. It should be possible to receive
|
||||
/// at least size of object
|
||||
@ -81,8 +81,7 @@ public:
|
||||
|
||||
/// Read multiple objects with common prefix
|
||||
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const = 0;
|
||||
@ -101,13 +100,13 @@ public:
|
||||
|
||||
/// Remove multiple objects. Some object storages can do batch remove in a more
|
||||
/// optimal way.
|
||||
virtual void removeObjects(const std::vector<std::string> & paths) = 0;
|
||||
virtual void removeObjects(const PathsWithSize & paths) = 0;
|
||||
|
||||
/// Remove object on path if exists
|
||||
virtual void removeObjectIfExists(const std::string & path) = 0;
|
||||
|
||||
/// Remove objects on path if exists
|
||||
virtual void removeObjectsIfExist(const std::vector<std::string> & paths) = 0;
|
||||
virtual void removeObjectsIfExist(const PathsWithSize & paths) = 0;
|
||||
|
||||
/// Copy object with different attributes if required
|
||||
virtual void copyObject( /// NOLINT
|
||||
@ -140,7 +139,10 @@ public:
|
||||
void removeFromCache(const std::string & path);
|
||||
|
||||
/// Apply new settings, in most cases reiniatilize client and some other staff
|
||||
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
||||
virtual void applyNewSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) = 0;
|
||||
|
||||
/// Sometimes object storages have something similar to chroot or namespace, for example
|
||||
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
|
||||
@ -148,7 +150,10 @@ public:
|
||||
|
||||
/// FIXME: confusing function required for a very specific case. Create new instance of object storage
|
||||
/// in different namespace.
|
||||
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
||||
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||
const std::string & new_namespace,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix, ContextPtr context) = 0;
|
||||
|
||||
protected:
|
||||
FileCachePtr cache;
|
||||
|
@ -300,18 +300,21 @@ MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
|
||||
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
|
||||
PathsWithSize MetadataStorageFromDisk::getObjectStoragePaths(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
|
||||
std::vector<std::string> remote_paths;
|
||||
auto blobs = metadata->getBlobs();
|
||||
auto root_path = metadata->getBlobsCommonPrefix();
|
||||
remote_paths.reserve(blobs.size());
|
||||
for (const auto & [remote_path, _] : blobs)
|
||||
remote_paths.push_back(fs::path(root_path) / remote_path);
|
||||
auto object_storage_relative_paths = metadata->getBlobsRelativePaths(); /// Relative paths.
|
||||
fs::path root_path = metadata->getBlobsCommonPrefix();
|
||||
|
||||
return remote_paths;
|
||||
PathsWithSize object_storage_paths;
|
||||
object_storage_paths.reserve(object_storage_relative_paths.size());
|
||||
|
||||
/// Relative paths -> absolute.
|
||||
for (auto & [object_relative_path, size] : object_storage_relative_paths)
|
||||
object_storage_paths.emplace_back(root_path / object_relative_path, size);
|
||||
|
||||
return object_storage_paths;
|
||||
}
|
||||
|
||||
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
|
||||
@ -320,12 +323,6 @@ uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) con
|
||||
return metadata->getRefCount();
|
||||
}
|
||||
|
||||
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
return metadata->getBlobs();
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
||||
{
|
||||
auto metadata = metadata_storage.readMetadata(path);
|
||||
|
@ -59,9 +59,7 @@ public:
|
||||
|
||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
||||
|
||||
BlobsPathToSize getBlobs(const std::string & path) const override;
|
||||
|
||||
std::vector<std::string> getRemotePaths(const std::string & path) const override;
|
||||
PathsWithSize getObjectStoragePaths(const std::string & path) const override;
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
|
@ -109,8 +109,7 @@ bool S3ObjectStorage::exists(const std::string & path) const
|
||||
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings,
|
||||
std::optional<size_t>,
|
||||
std::optional<size_t>) const
|
||||
@ -128,8 +127,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
||||
client.get(), bucket, version_id, common_path_prefix, blobs_to_read,
|
||||
settings_ptr->s3_settings.max_single_read_retries, disk_read_settings);
|
||||
client.get(),
|
||||
bucket,
|
||||
version_id,
|
||||
paths_to_read,
|
||||
settings_ptr->s3_settings.max_single_read_retries,
|
||||
disk_read_settings);
|
||||
|
||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||
{
|
||||
@ -192,7 +195,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
||||
void S3ObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
@ -253,14 +256,14 @@ void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
|
||||
}
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists)
|
||||
void S3ObjectStorage::removeObjectsImpl(const PathsWithSize & paths, bool if_exists)
|
||||
{
|
||||
if (paths.empty())
|
||||
return;
|
||||
|
||||
if (!s3_capabilities.support_batch_delete)
|
||||
{
|
||||
for (const auto & path : paths)
|
||||
for (const auto & [path, _] : paths)
|
||||
removeObjectImpl(path, if_exists);
|
||||
}
|
||||
else
|
||||
@ -278,12 +281,12 @@ void S3ObjectStorage::removeObjectsImpl(const std::vector<std::string> & paths,
|
||||
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
||||
{
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(paths[current_position]);
|
||||
obj.SetKey(paths[current_position].path);
|
||||
current_chunk.push_back(obj);
|
||||
|
||||
if (!keys.empty())
|
||||
keys += ", ";
|
||||
keys += paths[current_position];
|
||||
keys += paths[current_position].path;
|
||||
}
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
@ -308,12 +311,12 @@ void S3ObjectStorage::removeObjectIfExists(const std::string & path)
|
||||
removeObjectImpl(path, true);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
||||
void S3ObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||
{
|
||||
removeObjectsImpl(paths, false);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
||||
void S3ObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||
{
|
||||
removeObjectsImpl(paths, true);
|
||||
}
|
||||
|
@ -66,8 +66,7 @@ public:
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||
const std::string & common_path_prefix,
|
||||
const BlobsPathToSize & blobs_to_read,
|
||||
const PathsWithSize & paths_to_read,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
std::optional<size_t> read_hint = {},
|
||||
std::optional<size_t> file_size = {}) const override;
|
||||
@ -81,15 +80,16 @@ public:
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
const WriteSettings & write_settings = {}) override;
|
||||
|
||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
||||
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exist or it's a directory.
|
||||
void removeObject(const std::string & path) override;
|
||||
|
||||
void removeObjects(const std::vector<std::string> & paths) override;
|
||||
void removeObjects(const PathsWithSize & paths) override;
|
||||
|
||||
void removeObjectIfExists(const std::string & path) override;
|
||||
|
||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
||||
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||
|
||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||
|
||||
@ -108,26 +108,42 @@ public:
|
||||
|
||||
void startup() override;
|
||||
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
void applyNewSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
String getObjectsNamespace() const override { return bucket; }
|
||||
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||
const std::string & new_namespace,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
private:
|
||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||
|
||||
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
||||
|
||||
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
void copyObjectImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||
|
||||
void copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
void copyObjectMultipartImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||
|
||||
void removeObjectImpl(const std::string & path, bool if_exists);
|
||||
void removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists);
|
||||
void removeObjectsImpl(const PathsWithSize & paths, bool if_exists);
|
||||
|
||||
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
|
||||
|
||||
|
@ -68,11 +68,11 @@ Pipe StorageSystemRemoteDataPaths::read(
|
||||
col_base_path->insert(disk->getPath());
|
||||
col_cache_base_path->insert(cache_base_path);
|
||||
col_local_path->insert(local_path);
|
||||
col_remote_path->insert(remote_path);
|
||||
col_remote_path->insert(remote_path.path);
|
||||
|
||||
if (cache)
|
||||
{
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->hash(remote_path));
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->hash(remote_path.path));
|
||||
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
||||
}
|
||||
else
|
||||
|
Loading…
Reference in New Issue
Block a user