mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
get rid of path separation
This commit is contained in:
parent
90307177b4
commit
725d80d470
@ -75,7 +75,7 @@ public:
|
|||||||
void startup(ContextPtr context) override;
|
void startup(ContextPtr context) override;
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
|
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
|
||||||
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
|
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
|
||||||
std::vector<String> getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); }
|
PathsWithSize getObjectStoragePaths(const String & path) const override { return delegate->getObjectStoragePaths(path); }
|
||||||
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
|
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
|
||||||
|
|
||||||
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
|
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
|
||||||
|
@ -318,10 +318,10 @@ String DiskRestartProxy::getCacheBasePath() const
|
|||||||
return DiskDecorator::getCacheBasePath();
|
return DiskDecorator::getCacheBasePath();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> DiskRestartProxy::getRemotePaths(const String & path) const
|
PathsWithSize DiskRestartProxy::getObjectStoragePaths(const String & path) const
|
||||||
{
|
{
|
||||||
ReadLock lock (mutex);
|
ReadLock lock (mutex);
|
||||||
return DiskDecorator::getRemotePaths(path);
|
return DiskDecorator::getObjectStoragePaths(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
||||||
|
@ -65,7 +65,7 @@ public:
|
|||||||
String getUniqueId(const String & path) const override;
|
String getUniqueId(const String & path) const override;
|
||||||
bool checkUniqueId(const String & id) const override;
|
bool checkUniqueId(const String & id) const override;
|
||||||
String getCacheBasePath() const override;
|
String getCacheBasePath() const override;
|
||||||
std::vector<String> getRemotePaths(const String & path) const override;
|
PathsWithSize getObjectStoragePaths(const String & path) const override;
|
||||||
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
||||||
|
|
||||||
void restart(ContextPtr context);
|
void restart(ContextPtr context);
|
||||||
|
@ -170,10 +170,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
|||||||
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
|
||||||
remote_path = remote_path.string().substr(url.size());
|
remote_path = remote_path.string().substr(url.size());
|
||||||
|
|
||||||
std::vector<BlobPathWithSize> blobs_to_read;
|
PathsWithSize blobs_to_read;
|
||||||
blobs_to_read.emplace_back(remote_path, iter->second.size);
|
blobs_to_read.emplace_back(remote_path, iter->second.size);
|
||||||
|
|
||||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, path, blobs_to_read, getContext(), read_settings);
|
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, blobs_to_read, getContext(), read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
|
@ -169,7 +169,7 @@ public:
|
|||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> getRemotePaths(const String &) const override { return {}; }
|
PathsWithSize getObjectStoragePaths(const String &) const override { return {}; }
|
||||||
|
|
||||||
void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &) override {}
|
void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &) override {}
|
||||||
|
|
||||||
|
@ -219,13 +219,13 @@ public:
|
|||||||
|
|
||||||
/// Returns a list of paths because for Log family engines there might be
|
/// Returns a list of paths because for Log family engines there might be
|
||||||
/// multiple files in remote fs for single clickhouse file.
|
/// multiple files in remote fs for single clickhouse file.
|
||||||
virtual std::vector<String> getRemotePaths(const String &) const
|
virtual PathsWithSize getObjectStoragePaths(const String &) const
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePaths() not implemented for disk: {}`", getType());
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getObjectStoragePaths() not implemented for disk: {}`", getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For one local path there might be multiple remote paths in case of Log family engines.
|
/// For one local path there might be multiple remote paths in case of Log family engines.
|
||||||
using LocalPathWithRemotePaths = std::pair<String, std::vector<String>>;
|
using LocalPathWithRemotePaths = std::pair<String, PathsWithSize>;
|
||||||
|
|
||||||
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &)
|
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &)
|
||||||
{
|
{
|
||||||
|
@ -40,7 +40,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
|||||||
appendFilesystemCacheLog();
|
appendFilesystemCacheLog();
|
||||||
}
|
}
|
||||||
|
|
||||||
current_file_path = fs::path(common_path_prefix) / path;
|
current_file_path = path;
|
||||||
current_file_size = file_size;
|
current_file_size = file_size;
|
||||||
total_bytes_read_from_current_file = 0;
|
total_bytes_read_from_current_file = 0;
|
||||||
|
|
||||||
@ -50,18 +50,30 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
|
|||||||
#if USE_AWS_S3
|
#if USE_AWS_S3
|
||||||
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(const String & path, size_t file_size)
|
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(const String & path, size_t file_size)
|
||||||
{
|
{
|
||||||
auto remote_path = fs::path(common_path_prefix) / path;
|
|
||||||
auto remote_file_reader_creator = [=, this]()
|
auto remote_file_reader_creator = [=, this]()
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromS3>(
|
return std::make_unique<ReadBufferFromS3>(
|
||||||
client_ptr, bucket, remote_path, version_id, max_single_read_retries,
|
client_ptr,
|
||||||
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
|
bucket,
|
||||||
|
path,
|
||||||
|
version_id,
|
||||||
|
max_single_read_retries,
|
||||||
|
settings,
|
||||||
|
/* use_external_buffer */true,
|
||||||
|
/* offset */0,
|
||||||
|
read_until_position,
|
||||||
|
/* restricted_seek */true);
|
||||||
};
|
};
|
||||||
|
|
||||||
if (with_cache)
|
if (with_cache)
|
||||||
{
|
{
|
||||||
return std::make_shared<CachedReadBufferFromRemoteFS>(
|
return std::make_shared<CachedReadBufferFromRemoteFS>(
|
||||||
remote_path, settings.remote_fs_cache, remote_file_reader_creator, settings, query_id, read_until_position ? read_until_position : file_size);
|
path,
|
||||||
|
settings.remote_fs_cache,
|
||||||
|
remote_file_reader_creator,
|
||||||
|
settings,
|
||||||
|
query_id,
|
||||||
|
read_until_position ? read_until_position : file_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
return remote_file_reader_creator();
|
return remote_file_reader_creator();
|
||||||
@ -82,24 +94,27 @@ SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementation
|
|||||||
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
||||||
{
|
{
|
||||||
current_file_path = path;
|
current_file_path = path;
|
||||||
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, /* use_external_buffer */true, read_until_position);
|
return std::make_unique<ReadBufferFromWebServer>(
|
||||||
|
path,
|
||||||
|
context,
|
||||||
|
settings,
|
||||||
|
/* use_external_buffer */true,
|
||||||
|
read_until_position);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if USE_HDFS
|
#if USE_HDFS
|
||||||
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, settings.remote_fs_buffer_size);
|
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, path, config);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBuffer(nullptr, 0)
|
: ReadBuffer(nullptr, 0)
|
||||||
, common_path_prefix(common_path_prefix_)
|
|
||||||
, blobs_to_read(blobs_to_read_)
|
, blobs_to_read(blobs_to_read_)
|
||||||
, settings(settings_)
|
, settings(settings_)
|
||||||
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
|
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
|
||||||
@ -295,7 +310,7 @@ size_t ReadBufferFromRemoteFSGather::getFileSize() const
|
|||||||
{
|
{
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
for (const auto & object : blobs_to_read)
|
for (const auto & object : blobs_to_read)
|
||||||
size += object.bytes_size;
|
size += object.size;
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,8 +27,7 @@ friend class ReadIndirectBufferFromRemoteFS;
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
ReadBufferFromRemoteFSGather(
|
ReadBufferFromRemoteFSGather(
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
const ReadSettings & settings_);
|
const ReadSettings & settings_);
|
||||||
|
|
||||||
~ReadBufferFromRemoteFSGather() override;
|
~ReadBufferFromRemoteFSGather() override;
|
||||||
@ -54,9 +53,7 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
virtual SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) = 0;
|
virtual SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) = 0;
|
||||||
|
|
||||||
std::string common_path_prefix;
|
PathsWithSize blobs_to_read;
|
||||||
|
|
||||||
BlobsPathToSize blobs_to_read;
|
|
||||||
|
|
||||||
ReadSettings settings;
|
ReadSettings settings;
|
||||||
|
|
||||||
@ -112,11 +109,10 @@ public:
|
|||||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||||
const String & bucket_,
|
const String & bucket_,
|
||||||
const String & version_id_,
|
const String & version_id_,
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
size_t max_single_read_retries_,
|
size_t max_single_read_retries_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||||
, client_ptr(std::move(client_ptr_))
|
, client_ptr(std::move(client_ptr_))
|
||||||
, bucket(bucket_)
|
, bucket(bucket_)
|
||||||
, version_id(version_id_)
|
, version_id(version_id_)
|
||||||
@ -142,12 +138,11 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
|
|||||||
public:
|
public:
|
||||||
ReadBufferFromAzureBlobStorageGather(
|
ReadBufferFromAzureBlobStorageGather(
|
||||||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
size_t max_single_read_retries_,
|
size_t max_single_read_retries_,
|
||||||
size_t max_single_download_retries_,
|
size_t max_single_download_retries_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||||
, blob_container_client(blob_container_client_)
|
, blob_container_client(blob_container_client_)
|
||||||
, max_single_read_retries(max_single_read_retries_)
|
, max_single_read_retries(max_single_read_retries_)
|
||||||
, max_single_download_retries(max_single_download_retries_)
|
, max_single_download_retries(max_single_download_retries_)
|
||||||
@ -169,11 +164,10 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
|
|||||||
public:
|
public:
|
||||||
ReadBufferFromWebServerGather(
|
ReadBufferFromWebServerGather(
|
||||||
const String & uri_,
|
const String & uri_,
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
ContextPtr context_,
|
ContextPtr context_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||||
, uri(uri_)
|
, uri(uri_)
|
||||||
, context(context_)
|
, context(context_)
|
||||||
{
|
{
|
||||||
@ -195,15 +189,12 @@ public:
|
|||||||
ReadBufferFromHDFSGather(
|
ReadBufferFromHDFSGather(
|
||||||
const Poco::Util::AbstractConfiguration & config_,
|
const Poco::Util::AbstractConfiguration & config_,
|
||||||
const String & hdfs_uri_,
|
const String & hdfs_uri_,
|
||||||
const std::string & common_path_prefix_,
|
const PathsWithSize & blobs_to_read_,
|
||||||
const BlobsPathToSize & blobs_to_read_,
|
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
|
||||||
, config(config_)
|
, config(config_)
|
||||||
|
, hdfs_uri(hdfs_uri_)
|
||||||
{
|
{
|
||||||
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);
|
|
||||||
hdfs_directory = hdfs_uri_.substr(begin_of_path);
|
|
||||||
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
|
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
|
||||||
@ -211,7 +202,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
const Poco::Util::AbstractConfiguration & config;
|
const Poco::Util::AbstractConfiguration & config;
|
||||||
String hdfs_uri;
|
String hdfs_uri;
|
||||||
String hdfs_directory;
|
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -19,7 +19,8 @@ class ReadBufferFromWebServer : public SeekableReadBuffer
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit ReadBufferFromWebServer(
|
explicit ReadBufferFromWebServer(
|
||||||
const String & url_, ContextPtr context_,
|
const String & url_,
|
||||||
|
ContextPtr context_,
|
||||||
const ReadSettings & settings_ = {},
|
const ReadSettings & settings_ = {},
|
||||||
bool use_external_buffer_ = false,
|
bool use_external_buffer_ = false,
|
||||||
size_t read_until_position = 0);
|
size_t read_until_position = 0);
|
||||||
|
@ -67,16 +67,18 @@ std::unique_ptr<SeekableReadBuffer> AzureObjectStorage::readObject( /// NOLINT
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
std::optional<size_t>,
|
std::optional<size_t>,
|
||||||
std::optional<size_t>) const
|
std::optional<size_t>) const
|
||||||
{
|
{
|
||||||
auto settings_ptr = settings.get();
|
auto settings_ptr = settings.get();
|
||||||
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
||||||
client.get(), common_path_prefix, blobs_to_read,
|
client.get(),
|
||||||
settings_ptr->max_single_read_retries, settings_ptr->max_single_download_retries, read_settings);
|
paths_to_read,
|
||||||
|
settings_ptr->max_single_read_retries,
|
||||||
|
settings_ptr->max_single_download_retries,
|
||||||
|
read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
@ -111,7 +113,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
|||||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
|
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AzureObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
void AzureObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||||
{
|
{
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
|
|
||||||
@ -134,10 +136,10 @@ void AzureObjectStorage::removeObject(const std::string & path)
|
|||||||
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
|
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AzureObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
void AzureObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
for (const auto & path : paths)
|
for (const auto & [path, _] : paths)
|
||||||
{
|
{
|
||||||
auto delete_info = client_ptr->DeleteBlob(path);
|
auto delete_info = client_ptr->DeleteBlob(path);
|
||||||
if (!delete_info.Value.Deleted)
|
if (!delete_info.Value.Deleted)
|
||||||
@ -151,10 +153,10 @@ void AzureObjectStorage::removeObjectIfExists(const std::string & path)
|
|||||||
auto delete_info = client_ptr->DeleteBlob(path);
|
auto delete_info = client_ptr->DeleteBlob(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AzureObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
void AzureObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
for (const auto & path : paths)
|
for (const auto & [path, _] : paths)
|
||||||
auto delete_info = client_ptr->DeleteBlob(path);
|
auto delete_info = client_ptr->DeleteBlob(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,8 +59,7 @@ public:
|
|||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & blobs_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings = ReadSettings{},
|
const ReadSettings & read_settings = ReadSettings{},
|
||||||
std::optional<size_t> read_hint = {},
|
std::optional<size_t> read_hint = {},
|
||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
@ -74,15 +73,16 @@ public:
|
|||||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||||
const WriteSettings & write_settings = {}) override;
|
const WriteSettings & write_settings = {}) override;
|
||||||
|
|
||||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||||
|
|
||||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||||
void removeObject(const std::string & path) override;
|
void removeObject(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjects(const std::vector<std::string> & paths) override;
|
void removeObjects(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
void removeObjectIfExists(const std::string & path) override;
|
void removeObjectIfExists(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||||
|
|
||||||
@ -95,11 +95,19 @@ public:
|
|||||||
|
|
||||||
void startup() override {}
|
void startup() override {}
|
||||||
|
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
void applyNewSettings(
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
String getObjectsNamespace() const override { return ""; }
|
String getObjectsNamespace() const override { return ""; }
|
||||||
|
|
||||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||||
|
const std::string & new_namespace,
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const String name;
|
const String name;
|
||||||
|
@ -108,9 +108,9 @@ DiskObjectStorage::DiskObjectStorage(
|
|||||||
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
|
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
std::vector<String> DiskObjectStorage::getRemotePaths(const String & local_path) const
|
PathsWithSize DiskObjectStorage::getObjectStoragePaths(const String & local_path) const
|
||||||
{
|
{
|
||||||
return metadata_storage->getRemotePaths(local_path);
|
return metadata_storage->getObjectStoragePaths(local_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
|
||||||
@ -120,7 +120,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
paths_map.emplace_back(local_path, getRemotePaths(local_path));
|
paths_map.emplace_back(local_path, getObjectStoragePaths(local_path));
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
@ -244,9 +244,9 @@ String DiskObjectStorage::getUniqueId(const String & path) const
|
|||||||
{
|
{
|
||||||
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
|
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
|
||||||
String id;
|
String id;
|
||||||
auto blobs_paths = metadata_storage->getRemotePaths(path);
|
auto blobs_paths = metadata_storage->getObjectStoragePaths(path);
|
||||||
if (!blobs_paths.empty())
|
if (!blobs_paths.empty())
|
||||||
id = blobs_paths[0];
|
id = blobs_paths[0].path;
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -438,7 +438,11 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
|
|||||||
std::optional<size_t> read_hint,
|
std::optional<size_t> read_hint,
|
||||||
std::optional<size_t> file_size) const
|
std::optional<size_t> file_size) const
|
||||||
{
|
{
|
||||||
return object_storage->readObjects(remote_fs_root_path, metadata_storage->getBlobs(path), settings, read_hint, file_size);
|
return object_storage->readObjects(
|
||||||
|
metadata_storage->getObjectStoragePaths(path),
|
||||||
|
settings,
|
||||||
|
read_hint,
|
||||||
|
file_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||||
|
@ -49,7 +49,7 @@ public:
|
|||||||
|
|
||||||
const String & getPath() const override { return metadata_storage->getPath(); }
|
const String & getPath() const override { return metadata_storage->getPath(); }
|
||||||
|
|
||||||
std::vector<String> getRemotePaths(const String & local_path) const override;
|
PathsWithSize getObjectStoragePaths(const String & local_path) const override;
|
||||||
|
|
||||||
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
|
||||||
|
|
||||||
|
@ -26,14 +26,14 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
|||||||
|
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
|
|
||||||
UInt32 remote_fs_objects_count;
|
UInt32 storage_objects_count;
|
||||||
readIntText(remote_fs_objects_count, buf);
|
readIntText(storage_objects_count, buf);
|
||||||
assertChar('\t', buf);
|
assertChar('\t', buf);
|
||||||
readIntText(total_size, buf);
|
readIntText(total_size, buf);
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
remote_fs_objects.resize(remote_fs_objects_count);
|
storage_objects.resize(storage_objects_count);
|
||||||
|
|
||||||
for (size_t i = 0; i < remote_fs_objects_count; ++i)
|
for (size_t i = 0; i < storage_objects_count; ++i)
|
||||||
{
|
{
|
||||||
String remote_fs_object_path;
|
String remote_fs_object_path;
|
||||||
size_t remote_fs_object_size;
|
size_t remote_fs_object_size;
|
||||||
@ -50,8 +50,8 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
|
|||||||
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
||||||
}
|
}
|
||||||
assertChar('\n', buf);
|
assertChar('\n', buf);
|
||||||
remote_fs_objects[i].relative_path = remote_fs_object_path;
|
storage_objects[i].path = remote_fs_object_path;
|
||||||
remote_fs_objects[i].bytes_size = remote_fs_object_size;
|
storage_objects[i].size = remote_fs_object_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
readIntText(ref_count, buf);
|
readIntText(ref_count, buf);
|
||||||
@ -75,12 +75,12 @@ void DiskObjectStorageMetadata::serialize(WriteBuffer & buf, bool sync) const
|
|||||||
writeIntText(VERSION_READ_ONLY_FLAG, buf);
|
writeIntText(VERSION_READ_ONLY_FLAG, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
writeIntText(remote_fs_objects.size(), buf);
|
writeIntText(storage_objects.size(), buf);
|
||||||
writeChar('\t', buf);
|
writeChar('\t', buf);
|
||||||
writeIntText(total_size, buf);
|
writeIntText(total_size, buf);
|
||||||
writeChar('\n', buf);
|
writeChar('\n', buf);
|
||||||
|
|
||||||
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
|
for (const auto & [remote_fs_object_path, remote_fs_object_size] : storage_objects)
|
||||||
{
|
{
|
||||||
writeIntText(remote_fs_object_size, buf);
|
writeIntText(remote_fs_object_size, buf);
|
||||||
writeChar('\t', buf);
|
writeChar('\t', buf);
|
||||||
@ -120,7 +120,7 @@ DiskObjectStorageMetadata::DiskObjectStorageMetadata(
|
|||||||
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
|
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
|
||||||
{
|
{
|
||||||
total_size += size;
|
total_size += size;
|
||||||
remote_fs_objects.emplace_back(path, size);
|
storage_objects.emplace_back(path, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,8 +19,8 @@ private:
|
|||||||
|
|
||||||
const std::string & common_metadata_path;
|
const std::string & common_metadata_path;
|
||||||
|
|
||||||
/// Remote FS objects paths and their sizes.
|
/// Relative paths of blobs.
|
||||||
std::vector<BlobPathWithSize> remote_fs_objects;
|
std::vector<PathWithSize> storage_objects;
|
||||||
|
|
||||||
/// URI
|
/// URI
|
||||||
const std::string & remote_fs_root_path;
|
const std::string & remote_fs_root_path;
|
||||||
@ -60,9 +60,9 @@ public:
|
|||||||
return remote_fs_root_path;
|
return remote_fs_root_path;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<BlobPathWithSize> getBlobs() const
|
std::vector<PathWithSize> getBlobs() const
|
||||||
{
|
{
|
||||||
return remote_fs_objects;
|
return storage_objects;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isReadOnly() const
|
bool isReadOnly() const
|
||||||
|
@ -84,13 +84,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
|
|||||||
{
|
{
|
||||||
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
|
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
|
||||||
|
|
||||||
auto blobs = disk->metadata_storage->getBlobs(path);
|
auto objects = disk->metadata_storage->getObjectStoragePaths(path);
|
||||||
for (const auto & [key, _] : blobs)
|
for (const auto & [object_path, size] : objects)
|
||||||
{
|
{
|
||||||
ObjectAttributes metadata {
|
ObjectAttributes metadata {
|
||||||
{"path", path}
|
{"path", path}
|
||||||
};
|
};
|
||||||
updateObjectMetadata(disk->remote_fs_root_path + key, metadata);
|
updateObjectMetadata(object_path, metadata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
|
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
|
||||||
@ -346,7 +346,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
|
|||||||
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
|
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
|
||||||
|
|
||||||
std::vector<std::future<void>> results;
|
std::vector<std::future<void>> results;
|
||||||
auto restore_files = [this, &source_object_storage, &restore_information, &results](const BlobsPathToSize & keys)
|
auto restore_files = [this, &source_object_storage, &restore_information, &results](const PathsWithSize & keys)
|
||||||
{
|
{
|
||||||
std::vector<String> keys_names;
|
std::vector<String> keys_names;
|
||||||
for (const auto & [key, size] : keys)
|
for (const auto & [key, size] : keys)
|
||||||
@ -379,7 +379,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
|
|||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
BlobsPathToSize children;
|
PathsWithSize children;
|
||||||
source_object_storage->listPrefix(restore_information.source_path, children);
|
source_object_storage->listPrefix(restore_information.source_path, children);
|
||||||
|
|
||||||
restore_files(children);
|
restore_files(children);
|
||||||
@ -456,7 +456,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
|||||||
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path;
|
bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path;
|
||||||
|
|
||||||
std::set<String> renames;
|
std::set<String> renames;
|
||||||
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys)
|
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const PathsWithSize & keys)
|
||||||
{
|
{
|
||||||
const String rename = "rename";
|
const String rename = "rename";
|
||||||
const String hardlink = "hardlink";
|
const String hardlink = "hardlink";
|
||||||
@ -523,7 +523,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
|||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
BlobsPathToSize children;
|
PathsWithSize children;
|
||||||
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
|
||||||
restore_file_operations(children);
|
restore_file_operations(children);
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
std::string path;
|
std::string path;
|
||||||
bool delete_metadata_only;
|
bool delete_metadata_only;
|
||||||
bool remove_from_cache{false};
|
bool remove_from_cache{false};
|
||||||
std::vector<std::string> paths_to_remove;
|
PathsWithSize paths_to_remove;
|
||||||
bool if_exists;
|
bool if_exists;
|
||||||
|
|
||||||
RemoveObjectStorageOperation(
|
RemoveObjectStorageOperation(
|
||||||
@ -96,13 +96,13 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
|
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
|
||||||
auto remote_objects = metadata_storage.getRemotePaths(path);
|
auto objects = metadata_storage.getObjectStoragePaths(path);
|
||||||
|
|
||||||
tx->unlinkMetadata(path);
|
tx->unlinkMetadata(path);
|
||||||
|
|
||||||
if (hardlink_count == 0)
|
if (hardlink_count == 0)
|
||||||
{
|
{
|
||||||
paths_to_remove = remote_objects;
|
paths_to_remove = objects;
|
||||||
remove_from_cache = true;
|
remove_from_cache = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -134,7 +134,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
if (remove_from_cache)
|
if (remove_from_cache)
|
||||||
{
|
{
|
||||||
for (const auto & path_to_remove : paths_to_remove)
|
for (const auto & path_to_remove : paths_to_remove)
|
||||||
object_storage.removeFromCache(path_to_remove);
|
object_storage.removeFromCache(path_to_remove.path);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -143,10 +143,10 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
|
struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||||
{
|
{
|
||||||
std::string path;
|
std::string path;
|
||||||
std::unordered_map<std::string, std::vector<std::string>> paths_to_remove;
|
std::unordered_map<std::string, PathsWithSize> paths_to_remove;
|
||||||
bool keep_all_batch_data;
|
bool keep_all_batch_data;
|
||||||
NameSet file_names_remove_metadata_only;
|
NameSet file_names_remove_metadata_only;
|
||||||
std::vector<std::string> path_to_remove_from_cache;
|
PathsWithSize path_to_remove_from_cache;
|
||||||
|
|
||||||
RemoveRecursiveObjectStorageOperation(
|
RemoveRecursiveObjectStorageOperation(
|
||||||
IObjectStorage & object_storage_,
|
IObjectStorage & object_storage_,
|
||||||
@ -169,14 +169,14 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path_to_remove);
|
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path_to_remove);
|
||||||
auto remote_objects = metadata_storage.getRemotePaths(path_to_remove);
|
auto objects_paths = metadata_storage.getObjectStoragePaths(path_to_remove);
|
||||||
|
|
||||||
tx->unlinkMetadata(path_to_remove);
|
tx->unlinkMetadata(path_to_remove);
|
||||||
|
|
||||||
if (hardlink_count == 0)
|
if (hardlink_count == 0)
|
||||||
{
|
{
|
||||||
paths_to_remove[path_to_remove] = remote_objects;
|
paths_to_remove[path_to_remove] = objects_paths;
|
||||||
path_to_remove_from_cache.insert(path_to_remove_from_cache.end(), remote_objects.begin(), remote_objects.end());
|
path_to_remove_from_cache.insert(path_to_remove_from_cache.end(), objects_paths.begin(), objects_paths.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -217,7 +217,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
|||||||
{
|
{
|
||||||
if (!keep_all_batch_data)
|
if (!keep_all_batch_data)
|
||||||
{
|
{
|
||||||
std::vector<std::string> remove_from_remote;
|
PathsWithSize remove_from_remote;
|
||||||
for (auto && [local_path, remote_paths] : paths_to_remove)
|
for (auto && [local_path, remote_paths] : paths_to_remove)
|
||||||
{
|
{
|
||||||
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
|
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
|
||||||
@ -228,7 +228,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
|
|||||||
object_storage.removeObjects(remove_from_remote);
|
object_storage.removeObjects(remove_from_remote);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto & path_to_remove : path_to_remove_from_cache)
|
for (const auto & [path_to_remove, _] : path_to_remove_from_cache)
|
||||||
object_storage.removeFromCache(path_to_remove);
|
object_storage.removeFromCache(path_to_remove);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -238,7 +238,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
|
|||||||
{
|
{
|
||||||
std::string path_from;
|
std::string path_from;
|
||||||
std::string path_to;
|
std::string path_to;
|
||||||
std::vector<std::string> blobs_to_remove;
|
PathsWithSize blobs_to_remove;
|
||||||
|
|
||||||
ReplaceFileObjectStorageOperation(
|
ReplaceFileObjectStorageOperation(
|
||||||
IObjectStorage & object_storage_,
|
IObjectStorage & object_storage_,
|
||||||
@ -254,7 +254,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
|
|||||||
{
|
{
|
||||||
if (metadata_storage.exists(path_to))
|
if (metadata_storage.exists(path_to))
|
||||||
{
|
{
|
||||||
blobs_to_remove = metadata_storage.getRemotePaths(path_to);
|
blobs_to_remove = metadata_storage.getObjectStoragePaths(path_to);
|
||||||
tx->replaceFile(path_from, path_to);
|
tx->replaceFile(path_from, path_to);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -328,14 +328,15 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
|||||||
void execute(MetadataTransactionPtr tx) override
|
void execute(MetadataTransactionPtr tx) override
|
||||||
{
|
{
|
||||||
tx->createEmptyMetadataFile(to_path);
|
tx->createEmptyMetadataFile(to_path);
|
||||||
auto source_blobs = metadata_storage.getBlobs(from_path);
|
auto source_blobs = metadata_storage.getObjectStoragePaths(from_path); /// Full paths
|
||||||
|
|
||||||
for (const auto & [blob_from, size] : source_blobs)
|
for (const auto & [blob_from, size] : source_blobs)
|
||||||
{
|
{
|
||||||
auto blob_name = getRandomASCIIString();
|
auto blob_name = getRandomASCIIString();
|
||||||
|
|
||||||
auto blob_to = fs::path(remote_fs_root_path) / blob_name;
|
auto blob_to = fs::path(remote_fs_root_path) / blob_name;
|
||||||
|
|
||||||
object_storage.copyObject(fs::path(remote_fs_root_path) / blob_from, blob_to);
|
object_storage.copyObject(blob_from, blob_to);
|
||||||
|
|
||||||
tx->addBlobToMetadata(to_path, blob_name, size);
|
tx->addBlobToMetadata(to_path, blob_name, size);
|
||||||
|
|
||||||
|
@ -49,13 +49,12 @@ std::unique_ptr<SeekableReadBuffer> HDFSObjectStorage::readObject( /// NOLINT
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
std::optional<size_t>,
|
std::optional<size_t>,
|
||||||
std::optional<size_t>) const
|
std::optional<size_t>) const
|
||||||
{
|
{
|
||||||
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, common_path_prefix, common_path_prefix, blobs_to_read, read_settings);
|
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, hdfs_root_path, paths_to_read, read_settings);
|
||||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
||||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||||
}
|
}
|
||||||
@ -69,7 +68,9 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
|||||||
const WriteSettings &)
|
const WriteSettings &)
|
||||||
{
|
{
|
||||||
if (attributes.has_value())
|
if (attributes.has_value())
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
throw Exception(
|
||||||
|
ErrorCodes::UNSUPPORTED_METHOD,
|
||||||
|
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||||
|
|
||||||
/// Single O_WRONLY in libhdfs adds O_TRUNC
|
/// Single O_WRONLY in libhdfs adds O_TRUNC
|
||||||
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
|
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
|
||||||
@ -80,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void HDFSObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
void HDFSObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||||
{
|
{
|
||||||
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
const size_t begin_of_path = path.find('/', path.find("//") + 2);
|
||||||
int32_t num_entries;
|
int32_t num_entries;
|
||||||
@ -104,10 +105,10 @@ void HDFSObjectStorage::removeObject(const std::string & path)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void HDFSObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
void HDFSObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
for (const auto & hdfs_path : paths)
|
for (const auto & [path, _] : paths)
|
||||||
removeObject(hdfs_path);
|
removeObject(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
|
void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
|
||||||
@ -116,15 +117,17 @@ void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
|
|||||||
removeObject(path);
|
removeObject(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HDFSObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
void HDFSObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
for (const auto & hdfs_path : paths)
|
for (const auto & [path, _] : paths)
|
||||||
removeObjectIfExists(hdfs_path);
|
removeObjectIfExists(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
|
ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
throw Exception(
|
||||||
|
ErrorCodes::UNSUPPORTED_METHOD,
|
||||||
|
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||||
}
|
}
|
||||||
|
|
||||||
void HDFSObjectStorage::copyObject( /// NOLINT
|
void HDFSObjectStorage::copyObject( /// NOLINT
|
||||||
@ -133,7 +136,9 @@ void HDFSObjectStorage::copyObject( /// NOLINT
|
|||||||
std::optional<ObjectAttributes> object_to_attributes)
|
std::optional<ObjectAttributes> object_to_attributes)
|
||||||
{
|
{
|
||||||
if (object_to_attributes.has_value())
|
if (object_to_attributes.has_value())
|
||||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
|
throw Exception(
|
||||||
|
ErrorCodes::UNSUPPORTED_METHOD,
|
||||||
|
"HDFS API doesn't support custom attributes/metadata for stored objects");
|
||||||
|
|
||||||
auto in = readObject(object_from);
|
auto in = readObject(object_from);
|
||||||
auto out = writeObject(object_to, WriteMode::Rewrite);
|
auto out = writeObject(object_to, WriteMode::Rewrite);
|
||||||
|
@ -50,6 +50,7 @@ public:
|
|||||||
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
||||||
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
||||||
, settings(std::move(settings_))
|
, settings(std::move(settings_))
|
||||||
|
, hdfs_root_path(hdfs_root_path_)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
bool exists(const std::string & hdfs_uri) const override;
|
bool exists(const std::string & hdfs_uri) const override;
|
||||||
@ -61,8 +62,7 @@ public:
|
|||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings = ReadSettings{},
|
const ReadSettings & read_settings = ReadSettings{},
|
||||||
std::optional<size_t> read_hint = {},
|
std::optional<size_t> read_hint = {},
|
||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
@ -76,15 +76,16 @@ public:
|
|||||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||||
const WriteSettings & write_settings = {}) override;
|
const WriteSettings & write_settings = {}) override;
|
||||||
|
|
||||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||||
|
|
||||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||||
void removeObject(const std::string & path) override;
|
void removeObject(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjects(const std::vector<std::string> & paths) override;
|
void removeObjects(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
void removeObjectIfExists(const std::string & path) override;
|
void removeObjectIfExists(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||||
|
|
||||||
@ -97,11 +98,18 @@ public:
|
|||||||
|
|
||||||
void startup() override;
|
void startup() override;
|
||||||
|
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
void applyNewSettings(
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
String getObjectsNamespace() const override { return ""; }
|
String getObjectsNamespace() const override { return ""; }
|
||||||
|
|
||||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||||
|
const std::string & new_namespace,
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const Poco::Util::AbstractConfiguration & config;
|
const Poco::Util::AbstractConfiguration & config;
|
||||||
@ -110,8 +118,7 @@ private:
|
|||||||
HDFSFSPtr hdfs_fs;
|
HDFSFSPtr hdfs_fs;
|
||||||
|
|
||||||
SettingsPtr settings;
|
SettingsPtr settings;
|
||||||
|
String hdfs_root_path;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -119,11 +119,9 @@ public:
|
|||||||
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
|
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
|
||||||
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
|
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
|
||||||
|
|
||||||
/// Return list of paths corresponding to metadata stored in local path
|
/// Return [(object_storage_path, size_in_bytes), ...] for metadata path
|
||||||
virtual std::vector<std::string> getRemotePaths(const std::string & path) const = 0;
|
/// object_storage_path is a full path to the blob.
|
||||||
|
virtual PathsWithSize getObjectStoragePaths(const std::string & path) const = 0;
|
||||||
/// Return [(remote_path, size_in_bytes), ...] for metadata path
|
|
||||||
virtual BlobsPathToSize getBlobs(const std::string & path) const = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
||||||
|
@ -25,23 +25,23 @@ class WriteBufferFromFileBase;
|
|||||||
|
|
||||||
using ObjectAttributes = std::map<std::string, std::string>;
|
using ObjectAttributes = std::map<std::string, std::string>;
|
||||||
|
|
||||||
/// Path to blob with it's size
|
/// Path to a file and its size.
|
||||||
struct BlobPathWithSize
|
/// Path can be either relative or absolute - according to the context of use.
|
||||||
|
struct PathWithSize
|
||||||
{
|
{
|
||||||
std::string relative_path;
|
std::string path;
|
||||||
uint64_t bytes_size;
|
uint64_t size; /// Size in bytes
|
||||||
|
|
||||||
BlobPathWithSize() = default;
|
PathWithSize() = default;
|
||||||
BlobPathWithSize(const BlobPathWithSize & other) = default;
|
|
||||||
|
|
||||||
BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_)
|
PathWithSize(const std::string & path_, uint64_t size_)
|
||||||
: relative_path(relative_path_)
|
: path(path_)
|
||||||
, bytes_size(bytes_size_)
|
, size(size_)
|
||||||
{}
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
/// List of blobs with their sizes
|
/// List of paths with their sizes
|
||||||
using BlobsPathToSize = std::vector<BlobPathWithSize>;
|
using PathsWithSize = std::vector<PathWithSize>;
|
||||||
|
|
||||||
struct ObjectMetadata
|
struct ObjectMetadata
|
||||||
{
|
{
|
||||||
@ -65,8 +65,8 @@ public:
|
|||||||
/// Path exists or not
|
/// Path exists or not
|
||||||
virtual bool exists(const std::string & path) const = 0;
|
virtual bool exists(const std::string & path) const = 0;
|
||||||
|
|
||||||
/// List on prefix, return children with their sizes.
|
/// List on prefix, return children (relative paths) with their sizes.
|
||||||
virtual void listPrefix(const std::string & path, BlobsPathToSize & children) const = 0;
|
virtual void listPrefix(const std::string & path, PathsWithSize & children) const = 0;
|
||||||
|
|
||||||
/// Get object metadata if supported. It should be possible to receive
|
/// Get object metadata if supported. It should be possible to receive
|
||||||
/// at least size of object
|
/// at least size of object
|
||||||
@ -81,8 +81,7 @@ public:
|
|||||||
|
|
||||||
/// Read multiple objects with common prefix
|
/// Read multiple objects with common prefix
|
||||||
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings = ReadSettings{},
|
const ReadSettings & read_settings = ReadSettings{},
|
||||||
std::optional<size_t> read_hint = {},
|
std::optional<size_t> read_hint = {},
|
||||||
std::optional<size_t> file_size = {}) const = 0;
|
std::optional<size_t> file_size = {}) const = 0;
|
||||||
@ -101,13 +100,13 @@ public:
|
|||||||
|
|
||||||
/// Remove multiple objects. Some object storages can do batch remove in a more
|
/// Remove multiple objects. Some object storages can do batch remove in a more
|
||||||
/// optimal way.
|
/// optimal way.
|
||||||
virtual void removeObjects(const std::vector<std::string> & paths) = 0;
|
virtual void removeObjects(const PathsWithSize & paths) = 0;
|
||||||
|
|
||||||
/// Remove object on path if exists
|
/// Remove object on path if exists
|
||||||
virtual void removeObjectIfExists(const std::string & path) = 0;
|
virtual void removeObjectIfExists(const std::string & path) = 0;
|
||||||
|
|
||||||
/// Remove objects on path if exists
|
/// Remove objects on path if exists
|
||||||
virtual void removeObjectsIfExist(const std::vector<std::string> & paths) = 0;
|
virtual void removeObjectsIfExist(const PathsWithSize & paths) = 0;
|
||||||
|
|
||||||
/// Copy object with different attributes if required
|
/// Copy object with different attributes if required
|
||||||
virtual void copyObject( /// NOLINT
|
virtual void copyObject( /// NOLINT
|
||||||
@ -140,7 +139,10 @@ public:
|
|||||||
void removeFromCache(const std::string & path);
|
void removeFromCache(const std::string & path);
|
||||||
|
|
||||||
/// Apply new settings, in most cases reiniatilize client and some other staff
|
/// Apply new settings, in most cases reiniatilize client and some other staff
|
||||||
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
virtual void applyNewSettings(
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) = 0;
|
||||||
|
|
||||||
/// Sometimes object storages have something similar to chroot or namespace, for example
|
/// Sometimes object storages have something similar to chroot or namespace, for example
|
||||||
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
|
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
|
||||||
@ -148,7 +150,10 @@ public:
|
|||||||
|
|
||||||
/// FIXME: confusing function required for a very specific case. Create new instance of object storage
|
/// FIXME: confusing function required for a very specific case. Create new instance of object storage
|
||||||
/// in different namespace.
|
/// in different namespace.
|
||||||
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0;
|
virtual std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||||
|
const std::string & new_namespace,
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix, ContextPtr context) = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
FileCachePtr cache;
|
FileCachePtr cache;
|
||||||
|
@ -300,18 +300,18 @@ MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
|
|||||||
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
|
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
|
PathsWithSize MetadataStorageFromDisk::getObjectStoragePaths(const std::string & path) const
|
||||||
{
|
{
|
||||||
auto metadata = readMetadata(path);
|
auto metadata = readMetadata(path);
|
||||||
|
|
||||||
std::vector<std::string> remote_paths;
|
PathsWithSize object_storage_paths = metadata->getBlobs(); /// Relative paths.
|
||||||
auto blobs = metadata->getBlobs();
|
|
||||||
auto root_path = metadata->getBlobsCommonPrefix();
|
auto root_path = metadata->getBlobsCommonPrefix();
|
||||||
remote_paths.reserve(blobs.size());
|
|
||||||
for (const auto & [remote_path, _] : blobs)
|
|
||||||
remote_paths.push_back(fs::path(root_path) / remote_path);
|
|
||||||
|
|
||||||
return remote_paths;
|
/// Relative paths -> absolute.
|
||||||
|
for (auto & [object_path, _] : object_storage_paths)
|
||||||
|
object_path = fs::path(root_path) / object_path;
|
||||||
|
|
||||||
|
return object_storage_paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
|
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
|
||||||
@ -320,12 +320,6 @@ uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) con
|
|||||||
return metadata->getRefCount();
|
return metadata->getRefCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
|
|
||||||
{
|
|
||||||
auto metadata = readMetadata(path);
|
|
||||||
return metadata->getBlobs();
|
|
||||||
}
|
|
||||||
|
|
||||||
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
||||||
{
|
{
|
||||||
auto metadata = metadata_storage.readMetadata(path);
|
auto metadata = metadata_storage.readMetadata(path);
|
||||||
|
@ -59,9 +59,7 @@ public:
|
|||||||
|
|
||||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
||||||
|
|
||||||
BlobsPathToSize getBlobs(const std::string & path) const override;
|
PathsWithSize getObjectStoragePaths(const std::string & path) const override;
|
||||||
|
|
||||||
std::vector<std::string> getRemotePaths(const std::string & path) const override;
|
|
||||||
|
|
||||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||||
|
|
||||||
|
@ -109,8 +109,7 @@ bool S3ObjectStorage::exists(const std::string & path) const
|
|||||||
|
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
std::optional<size_t>,
|
std::optional<size_t>,
|
||||||
std::optional<size_t>) const
|
std::optional<size_t>) const
|
||||||
@ -128,8 +127,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
|||||||
auto settings_ptr = s3_settings.get();
|
auto settings_ptr = s3_settings.get();
|
||||||
|
|
||||||
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
||||||
client.get(), bucket, version_id, common_path_prefix, blobs_to_read,
|
client.get(),
|
||||||
settings_ptr->s3_settings.max_single_read_retries, disk_read_settings);
|
bucket,
|
||||||
|
version_id,
|
||||||
|
paths_to_read,
|
||||||
|
settings_ptr->s3_settings.max_single_read_retries,
|
||||||
|
read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
@ -192,7 +195,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
|||||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
|
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
|
void S3ObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
|
||||||
{
|
{
|
||||||
auto settings_ptr = s3_settings.get();
|
auto settings_ptr = s3_settings.get();
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
@ -253,14 +256,14 @@ void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists)
|
void S3ObjectStorage::removeObjectsImpl(const PathsWithSize & paths, bool if_exists)
|
||||||
{
|
{
|
||||||
if (paths.empty())
|
if (paths.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (!s3_capabilities.support_batch_delete)
|
if (!s3_capabilities.support_batch_delete)
|
||||||
{
|
{
|
||||||
for (const auto & path : paths)
|
for (const auto & [path, _] : paths)
|
||||||
removeObjectImpl(path, if_exists);
|
removeObjectImpl(path, if_exists);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -278,12 +281,12 @@ void S3ObjectStorage::removeObjectsImpl(const std::vector<std::string> & paths,
|
|||||||
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
||||||
{
|
{
|
||||||
Aws::S3::Model::ObjectIdentifier obj;
|
Aws::S3::Model::ObjectIdentifier obj;
|
||||||
obj.SetKey(paths[current_position]);
|
obj.SetKey(paths[current_position].path);
|
||||||
current_chunk.push_back(obj);
|
current_chunk.push_back(obj);
|
||||||
|
|
||||||
if (!keys.empty())
|
if (!keys.empty())
|
||||||
keys += ", ";
|
keys += ", ";
|
||||||
keys += paths[current_position];
|
keys += paths[current_position].path;
|
||||||
}
|
}
|
||||||
|
|
||||||
Aws::S3::Model::Delete delkeys;
|
Aws::S3::Model::Delete delkeys;
|
||||||
@ -308,12 +311,12 @@ void S3ObjectStorage::removeObjectIfExists(const std::string & path)
|
|||||||
removeObjectImpl(path, true);
|
removeObjectImpl(path, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
|
void S3ObjectStorage::removeObjects(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
removeObjectsImpl(paths, false);
|
removeObjectsImpl(paths, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
|
void S3ObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
|
||||||
{
|
{
|
||||||
removeObjectsImpl(paths, true);
|
removeObjectsImpl(paths, true);
|
||||||
}
|
}
|
||||||
|
@ -66,8 +66,7 @@ public:
|
|||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
|
||||||
const std::string & common_path_prefix,
|
const PathsWithSize & paths_to_read,
|
||||||
const BlobsPathToSize & blobs_to_read,
|
|
||||||
const ReadSettings & read_settings = ReadSettings{},
|
const ReadSettings & read_settings = ReadSettings{},
|
||||||
std::optional<size_t> read_hint = {},
|
std::optional<size_t> read_hint = {},
|
||||||
std::optional<size_t> file_size = {}) const override;
|
std::optional<size_t> file_size = {}) const override;
|
||||||
@ -81,15 +80,16 @@ public:
|
|||||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||||
const WriteSettings & write_settings = {}) override;
|
const WriteSettings & write_settings = {}) override;
|
||||||
|
|
||||||
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
|
void listPrefix(const std::string & path, PathsWithSize & children) const override;
|
||||||
|
|
||||||
/// Remove file. Throws exception if file doesn't exist or it's a directory.
|
/// Remove file. Throws exception if file doesn't exist or it's a directory.
|
||||||
void removeObject(const std::string & path) override;
|
void removeObject(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjects(const std::vector<std::string> & paths) override;
|
void removeObjects(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
void removeObjectIfExists(const std::string & path) override;
|
void removeObjectIfExists(const std::string & path) override;
|
||||||
|
|
||||||
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
|
void removeObjectsIfExist(const PathsWithSize & paths) override;
|
||||||
|
|
||||||
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
ObjectMetadata getObjectMetadata(const std::string & path) const override;
|
||||||
|
|
||||||
@ -108,26 +108,42 @@ public:
|
|||||||
|
|
||||||
void startup() override;
|
void startup() override;
|
||||||
|
|
||||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
void applyNewSettings(
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
String getObjectsNamespace() const override { return bucket; }
|
String getObjectsNamespace() const override { return bucket; }
|
||||||
|
|
||||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
|
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||||
|
const std::string & new_namespace,
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
|
const std::string & config_prefix,
|
||||||
|
ContextPtr context) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||||
|
|
||||||
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
||||||
|
|
||||||
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
void copyObjectImpl(
|
||||||
|
const String & src_bucket,
|
||||||
|
const String & src_key,
|
||||||
|
const String & dst_bucket,
|
||||||
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||||
|
|
||||||
void copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
void copyObjectMultipartImpl(
|
||||||
|
const String & src_bucket,
|
||||||
|
const String & src_key,
|
||||||
|
const String & dst_bucket,
|
||||||
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||||
|
|
||||||
void removeObjectImpl(const std::string & path, bool if_exists);
|
void removeObjectImpl(const std::string & path, bool if_exists);
|
||||||
void removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists);
|
void removeObjectsImpl(const PathsWithSize & paths, bool if_exists);
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
|
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
|
||||||
|
|
||||||
|
@ -68,11 +68,11 @@ Pipe StorageSystemRemoteDataPaths::read(
|
|||||||
col_base_path->insert(disk->getPath());
|
col_base_path->insert(disk->getPath());
|
||||||
col_cache_base_path->insert(cache_base_path);
|
col_cache_base_path->insert(cache_base_path);
|
||||||
col_local_path->insert(local_path);
|
col_local_path->insert(local_path);
|
||||||
col_remote_path->insert(remote_path);
|
col_remote_path->insert(remote_path.path);
|
||||||
|
|
||||||
if (cache)
|
if (cache)
|
||||||
{
|
{
|
||||||
auto cache_paths = cache->tryGetCachePaths(cache->hash(remote_path));
|
auto cache_paths = cache->tryGetCachePaths(cache->hash(remote_path.path));
|
||||||
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
Loading…
Reference in New Issue
Block a user