mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Merge pull request #36070 from ClickHouse/remove_unused_field
Less dependencies from disks in buffers
This commit is contained in:
commit
2c2b288b44
@ -71,8 +71,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
|
|||||||
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
|
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
|
||||||
|
|
||||||
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
||||||
path, blob_container_client, metadata, settings->max_single_read_retries,
|
blob_container_client, metadata.remote_fs_root_path, metadata.remote_fs_objects,
|
||||||
settings->max_single_download_retries, read_settings);
|
settings->max_single_read_retries, settings->max_single_download_retries, read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
@ -109,7 +109,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
|
|||||||
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; });
|
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; });
|
||||||
};
|
};
|
||||||
|
|
||||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(create_metadata_callback), path);
|
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(create_metadata_callback), blob_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -166,9 +166,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
|
|||||||
remote_path = remote_path.string().substr(url.size());
|
remote_path = remote_path.string().substr(url.size());
|
||||||
|
|
||||||
RemoteMetadata meta(path, remote_path);
|
RemoteMetadata meta(path, remote_path);
|
||||||
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
|
meta.remote_fs_objects.emplace_back(remote_path, iter->second.size);
|
||||||
|
|
||||||
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(path, url, meta, getContext(), read_settings);
|
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta.remote_fs_root_path, meta.remote_fs_objects, getContext(), read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
{
|
{
|
||||||
|
@ -82,7 +82,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
|
|||||||
"Read from file by path: {}. Existing HDFS objects: {}",
|
"Read from file by path: {}. Existing HDFS objects: {}",
|
||||||
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
|
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
|
||||||
|
|
||||||
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(path, config, remote_fs_root_path, metadata, read_settings);
|
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, remote_fs_root_path, remote_fs_root_path, metadata.remote_fs_objects, read_settings);
|
||||||
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
|
||||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
|
||||||
}
|
}
|
||||||
@ -91,8 +91,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
|
|||||||
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &)
|
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &)
|
||||||
{
|
{
|
||||||
/// Path to store new HDFS object.
|
/// Path to store new HDFS object.
|
||||||
auto file_name = getRandomName();
|
std::string file_name = getRandomName();
|
||||||
auto hdfs_path = remote_fs_root_path + file_name;
|
std::string hdfs_path = fs::path(remote_fs_root_path) / file_name;
|
||||||
|
|
||||||
LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
|
LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
|
||||||
backQuote(metadata_disk->getPath() + path), hdfs_path);
|
backQuote(metadata_disk->getPath() + path), hdfs_path);
|
||||||
@ -106,7 +106,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
|
|||||||
readOrCreateUpdateAndStoreMetadata(path, mode, false, [file_name, count] (Metadata & metadata) { metadata.addObject(file_name, count); return true; });
|
readOrCreateUpdateAndStoreMetadata(path, mode, false, [file_name, count] (Metadata & metadata) { metadata.addObject(file_name, count); return true; });
|
||||||
};
|
};
|
||||||
|
|
||||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(create_metadata_callback), path);
|
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(create_metadata_callback), hdfs_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -122,7 +122,8 @@ void IDiskRemote::Metadata::load()
|
|||||||
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
|
||||||
}
|
}
|
||||||
assertChar('\n', *buf);
|
assertChar('\n', *buf);
|
||||||
remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size};
|
remote_fs_objects[i].relative_path = remote_fs_object_path;
|
||||||
|
remote_fs_objects[i].bytes_size = remote_fs_object_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
readIntText(ref_count, *buf);
|
readIntText(ref_count, *buf);
|
||||||
@ -638,7 +639,7 @@ String IDiskRemote::getUniqueId(const String & path) const
|
|||||||
auto metadata = readMetadata(path);
|
auto metadata = readMetadata(path);
|
||||||
String id;
|
String id;
|
||||||
if (!metadata.remote_fs_objects.empty())
|
if (!metadata.remote_fs_objects.empty())
|
||||||
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
|
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path;
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
extern const Metric DiskSpaceReservedForMerge;
|
extern const Metric DiskSpaceReservedForMerge;
|
||||||
@ -22,6 +21,24 @@ namespace CurrentMetrics
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/// Path to blob with it's size
|
||||||
|
struct BlobPathWithSize
|
||||||
|
{
|
||||||
|
std::string relative_path;
|
||||||
|
uint64_t bytes_size;
|
||||||
|
|
||||||
|
BlobPathWithSize() = default;
|
||||||
|
BlobPathWithSize(const BlobPathWithSize & other) = default;
|
||||||
|
|
||||||
|
BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_)
|
||||||
|
: relative_path(relative_path_)
|
||||||
|
, bytes_size(bytes_size_)
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// List of blobs with their sizes
|
||||||
|
using BlobsPathToSize = std::vector<BlobPathWithSize>;
|
||||||
|
|
||||||
/// Helper class to collect paths into chunks of maximum size.
|
/// Helper class to collect paths into chunks of maximum size.
|
||||||
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
|
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
|
||||||
class RemoteFSPathKeeper
|
class RemoteFSPathKeeper
|
||||||
@ -191,10 +208,8 @@ using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
|
|||||||
/// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
|
/// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS<T>
|
||||||
struct RemoteMetadata
|
struct RemoteMetadata
|
||||||
{
|
{
|
||||||
using PathAndSize = std::pair<String, size_t>;
|
|
||||||
|
|
||||||
/// Remote FS objects paths and their sizes.
|
/// Remote FS objects paths and their sizes.
|
||||||
std::vector<PathAndSize> remote_fs_objects;
|
std::vector<BlobPathWithSize> remote_fs_objects;
|
||||||
|
|
||||||
/// URI
|
/// URI
|
||||||
const String & remote_fs_root_path;
|
const String & remote_fs_root_path;
|
||||||
|
@ -43,7 +43,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
|
|||||||
auto remote_file_reader_creator = [=, this]()
|
auto remote_file_reader_creator = [=, this]()
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromS3>(
|
return std::make_unique<ReadBufferFromS3>(
|
||||||
client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries,
|
client_ptr, bucket, fs::path(common_path_prefix) / path, max_single_read_retries,
|
||||||
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
|
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -83,11 +83,14 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const ReadSettings & settings_, const String & path_)
|
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
|
||||||
|
const std::string & common_path_prefix_,
|
||||||
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
|
const ReadSettings & settings_)
|
||||||
: ReadBuffer(nullptr, 0)
|
: ReadBuffer(nullptr, 0)
|
||||||
, metadata(metadata_)
|
, common_path_prefix(common_path_prefix_)
|
||||||
|
, blobs_to_read(blobs_to_read_)
|
||||||
, settings(settings_)
|
, settings(settings_)
|
||||||
, canonical_path(path_)
|
|
||||||
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
|
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -119,9 +122,9 @@ void ReadBufferFromRemoteFSGather::initialize()
|
|||||||
{
|
{
|
||||||
/// One clickhouse file can be split into multiple files in remote fs.
|
/// One clickhouse file can be split into multiple files in remote fs.
|
||||||
auto current_buf_offset = file_offset_of_buffer_end;
|
auto current_buf_offset = file_offset_of_buffer_end;
|
||||||
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
|
for (size_t i = 0; i < blobs_to_read.size(); ++i)
|
||||||
{
|
{
|
||||||
const auto & [file_path, size] = metadata.remote_fs_objects[i];
|
const auto & [file_path, size] = blobs_to_read[i];
|
||||||
|
|
||||||
if (size > current_buf_offset)
|
if (size > current_buf_offset)
|
||||||
{
|
{
|
||||||
@ -138,7 +141,7 @@ void ReadBufferFromRemoteFSGather::initialize()
|
|||||||
|
|
||||||
current_buf_offset -= size;
|
current_buf_offset -= size;
|
||||||
}
|
}
|
||||||
current_buf_idx = metadata.remote_fs_objects.size();
|
current_buf_idx = blobs_to_read.size();
|
||||||
current_buf = nullptr;
|
current_buf = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,12 +171,12 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
|||||||
bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
|
bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
|
||||||
{
|
{
|
||||||
/// If there is no available buffers - nothing to read.
|
/// If there is no available buffers - nothing to read.
|
||||||
if (current_buf_idx + 1 >= metadata.remote_fs_objects.size())
|
if (current_buf_idx + 1 >= blobs_to_read.size())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
++current_buf_idx;
|
++current_buf_idx;
|
||||||
|
|
||||||
const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx];
|
const auto & [path, size] = blobs_to_read[current_buf_idx];
|
||||||
current_buf = createImplementationBuffer(path, size);
|
current_buf = createImplementationBuffer(path, size);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -202,7 +205,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
|||||||
if (!result)
|
if (!result)
|
||||||
result = current_buf->next();
|
result = current_buf->next();
|
||||||
|
|
||||||
if (metadata.remote_fs_objects.size() == 1)
|
if (blobs_to_read.size() == 1)
|
||||||
{
|
{
|
||||||
file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd();
|
file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd();
|
||||||
}
|
}
|
||||||
@ -255,8 +258,8 @@ String ReadBufferFromRemoteFSGather::getFileName() const
|
|||||||
size_t ReadBufferFromRemoteFSGather::getFileSize() const
|
size_t ReadBufferFromRemoteFSGather::getFileSize() const
|
||||||
{
|
{
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
for (const auto & object : metadata.remote_fs_objects)
|
for (const auto & object : blobs_to_read)
|
||||||
size += object.second;
|
size += object.bytes_size;
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,9 +26,9 @@ friend class ReadIndirectBufferFromRemoteFS;
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
ReadBufferFromRemoteFSGather(
|
ReadBufferFromRemoteFSGather(
|
||||||
const RemoteMetadata & metadata_,
|
const std::string & common_path_prefix_,
|
||||||
const ReadSettings & settings_,
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
const String & path_);
|
const ReadSettings & settings_);
|
||||||
|
|
||||||
String getFileName() const;
|
String getFileName() const;
|
||||||
|
|
||||||
@ -57,7 +57,9 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0;
|
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0;
|
||||||
|
|
||||||
RemoteMetadata metadata;
|
std::string common_path_prefix;
|
||||||
|
|
||||||
|
BlobsPathToSize blobs_to_read;
|
||||||
|
|
||||||
ReadSettings settings;
|
ReadSettings settings;
|
||||||
|
|
||||||
@ -89,8 +91,6 @@ private:
|
|||||||
*/
|
*/
|
||||||
size_t bytes_to_ignore = 0;
|
size_t bytes_to_ignore = 0;
|
||||||
|
|
||||||
String canonical_path;
|
|
||||||
|
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -101,13 +101,13 @@ class ReadBufferFromS3Gather final : public ReadBufferFromRemoteFSGather
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReadBufferFromS3Gather(
|
ReadBufferFromS3Gather(
|
||||||
const String & path_,
|
|
||||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||||
const String & bucket_,
|
const String & bucket_,
|
||||||
IDiskRemote::Metadata metadata_,
|
const std::string & common_path_prefix_,
|
||||||
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
size_t max_single_read_retries_,
|
size_t max_single_read_retries_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(metadata_, settings_, path_)
|
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||||
, client_ptr(std::move(client_ptr_))
|
, client_ptr(std::move(client_ptr_))
|
||||||
, bucket(bucket_)
|
, bucket(bucket_)
|
||||||
, max_single_read_retries(max_single_read_retries_)
|
, max_single_read_retries(max_single_read_retries_)
|
||||||
@ -130,13 +130,13 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReadBufferFromAzureBlobStorageGather(
|
ReadBufferFromAzureBlobStorageGather(
|
||||||
const String & path_,
|
|
||||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||||
IDiskRemote::Metadata metadata_,
|
const std::string & common_path_prefix_,
|
||||||
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
size_t max_single_read_retries_,
|
size_t max_single_read_retries_,
|
||||||
size_t max_single_download_retries_,
|
size_t max_single_download_retries_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(metadata_, settings_, path_)
|
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||||
, blob_container_client(blob_container_client_)
|
, blob_container_client(blob_container_client_)
|
||||||
, max_single_read_retries(max_single_read_retries_)
|
, max_single_read_retries(max_single_read_retries_)
|
||||||
, max_single_download_retries(max_single_download_retries_)
|
, max_single_download_retries(max_single_download_retries_)
|
||||||
@ -157,12 +157,12 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReadBufferFromWebServerGather(
|
ReadBufferFromWebServerGather(
|
||||||
const String & path_,
|
|
||||||
const String & uri_,
|
const String & uri_,
|
||||||
RemoteMetadata metadata_,
|
const std::string & common_path_prefix_,
|
||||||
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
ContextPtr context_,
|
ContextPtr context_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(metadata_, settings_, path_)
|
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||||
, uri(uri_)
|
, uri(uri_)
|
||||||
, context(context_)
|
, context(context_)
|
||||||
{
|
{
|
||||||
@ -182,12 +182,12 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ReadBufferFromHDFSGather(
|
ReadBufferFromHDFSGather(
|
||||||
const String & path_,
|
|
||||||
const Poco::Util::AbstractConfiguration & config_,
|
const Poco::Util::AbstractConfiguration & config_,
|
||||||
const String & hdfs_uri_,
|
const String & hdfs_uri_,
|
||||||
IDiskRemote::Metadata metadata_,
|
const std::string & common_path_prefix_,
|
||||||
|
const BlobsPathToSize & blobs_to_read_,
|
||||||
const ReadSettings & settings_)
|
const ReadSettings & settings_)
|
||||||
: ReadBufferFromRemoteFSGather(metadata_, settings_, path_)
|
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
|
||||||
, config(config_)
|
, config(config_)
|
||||||
{
|
{
|
||||||
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);
|
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);
|
||||||
|
@ -12,10 +12,10 @@ namespace DB
|
|||||||
WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
|
WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
|
||||||
std::unique_ptr<WriteBuffer> impl_,
|
std::unique_ptr<WriteBuffer> impl_,
|
||||||
CreateMetadataCallback && create_callback_,
|
CreateMetadataCallback && create_callback_,
|
||||||
const String & metadata_file_path_)
|
const String & remote_path_)
|
||||||
: WriteBufferFromFileDecorator(std::move(impl_))
|
: WriteBufferFromFileDecorator(std::move(impl_))
|
||||||
, create_metadata_callback(std::move(create_callback_))
|
, create_metadata_callback(std::move(create_callback_))
|
||||||
, metadata_file_path(metadata_file_path_)
|
, remote_path(remote_path_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,17 +18,17 @@ public:
|
|||||||
WriteIndirectBufferFromRemoteFS(
|
WriteIndirectBufferFromRemoteFS(
|
||||||
std::unique_ptr<WriteBuffer> impl_,
|
std::unique_ptr<WriteBuffer> impl_,
|
||||||
CreateMetadataCallback && create_callback_,
|
CreateMetadataCallback && create_callback_,
|
||||||
const String & metadata_file_path_);
|
const String & remote_path_);
|
||||||
|
|
||||||
~WriteIndirectBufferFromRemoteFS() override;
|
~WriteIndirectBufferFromRemoteFS() override;
|
||||||
|
|
||||||
String getFileName() const override { return metadata_file_path; }
|
String getFileName() const override { return remote_path; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void finalizeImpl() override;
|
void finalizeImpl() override;
|
||||||
|
|
||||||
CreateMetadataCallback create_metadata_callback;
|
CreateMetadataCallback create_metadata_callback;
|
||||||
String metadata_file_path;
|
String remote_path;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
|
||||||
path, settings->client, bucket, metadata,
|
settings->client, bucket, metadata.remote_fs_root_path, metadata.remote_fs_objects,
|
||||||
settings->s3_max_single_read_retries, disk_read_settings);
|
settings->s3_max_single_read_retries, disk_read_settings);
|
||||||
|
|
||||||
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
|
||||||
@ -280,7 +280,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
|||||||
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
|
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
|
||||||
settings->client,
|
settings->client,
|
||||||
bucket,
|
bucket,
|
||||||
remote_fs_root_path + blob_name,
|
fs::path(remote_fs_root_path) / blob_name,
|
||||||
settings->s3_min_upload_part_size,
|
settings->s3_min_upload_part_size,
|
||||||
settings->s3_upload_part_size_multiply_factor,
|
settings->s3_upload_part_size_multiply_factor,
|
||||||
settings->s3_upload_part_size_multiply_parts_count_threshold,
|
settings->s3_upload_part_size_multiply_parts_count_threshold,
|
||||||
@ -293,7 +293,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
|||||||
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_name, count] (Metadata & metadata) { metadata.addObject(blob_name, count); return true; });
|
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_name, count] (Metadata & metadata) { metadata.addObject(blob_name, count); return true; });
|
||||||
};
|
};
|
||||||
|
|
||||||
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(create_metadata_callback), path);
|
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(create_metadata_callback), fs::path(remote_fs_root_path) / blob_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
||||||
|
Loading…
Reference in New Issue
Block a user