diff --git a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp index 7b027837bb0..78b9b9e3446 100644 --- a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp +++ b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp @@ -71,8 +71,8 @@ std::unique_ptr DiskAzureBlobStorage::readFile( LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path)); auto reader_impl = std::make_unique( - blob_container_client, metadata, settings->max_single_read_retries, - settings->max_single_download_retries, read_settings); + blob_container_client, metadata.remote_fs_root_path, metadata.remote_fs_objects, + settings->max_single_read_retries, settings->max_single_download_retries, read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index 61fd6885411..2f8929982e3 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -166,9 +166,9 @@ std::unique_ptr DiskWebServer::readFile(const String & p remote_path = remote_path.string().substr(url.size()); RemoteMetadata meta(path, remote_path); - meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size)); + meta.remote_fs_objects.emplace_back(remote_path, iter->second.size); - auto web_impl = std::make_unique(url, meta, getContext(), read_settings); + auto web_impl = std::make_unique(url, meta.remote_fs_root_path, meta.remote_fs_objects, getContext(), read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index 49854d99460..a3817a85a36 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -82,7 +82,7 @@ std::unique_ptr DiskHDFS::readFile(const String & path, "Read from file by path: {}. Existing HDFS objects: {}", backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size()); - auto hdfs_impl = std::make_unique(config, remote_fs_root_path, metadata, read_settings); + auto hdfs_impl = std::make_unique(config, remote_fs_root_path, remote_fs_root_path, metadata.remote_fs_objects, read_settings); auto buf = std::make_unique(std::move(hdfs_impl)); return std::make_unique(std::move(buf), settings->min_bytes_for_seek); } @@ -91,8 +91,8 @@ std::unique_ptr DiskHDFS::readFile(const String & path, std::unique_ptr DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &) { /// Path to store new HDFS object. - auto file_name = getRandomName(); - auto hdfs_path = fs::path(remote_fs_root_path) / file_name; + std::string file_name = getRandomName(); + std::string hdfs_path = fs::path(remote_fs_root_path) / file_name; LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), hdfs_path); diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index fb1c0ddc378..b475ae1ee94 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -122,7 +122,8 @@ void IDiskRemote::Metadata::load() remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); } assertChar('\n', *buf); - remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size}; + remote_fs_objects[i].relative_path = remote_fs_object_path; + remote_fs_objects[i].bytes_size = remote_fs_object_size; } readIntText(ref_count, *buf); @@ -638,7 +639,7 @@ String IDiskRemote::getUniqueId(const String & path) const auto metadata = readMetadata(path); String id; if (!metadata.remote_fs_objects.empty()) - id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first; + id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path; return id; } diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index a8a299391bf..aa78468c7bb 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -13,7 +13,6 @@ #include #include - namespace CurrentMetrics { extern const Metric DiskSpaceReservedForMerge; @@ -22,6 +21,24 @@ namespace CurrentMetrics namespace DB { +/// Path to blob with it's size +struct BlobPathWithSize +{ + std::string relative_path; + uint64_t bytes_size; + + BlobPathWithSize() = default; + BlobPathWithSize(const BlobPathWithSize & other) = default; + + BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_) + : relative_path(relative_path_) + , bytes_size(bytes_size_) + {} +}; + +/// List of blobs with their sizes +using BlobsPathToSize = std::vector; + /// Helper class to collect paths into chunks of maximum size. /// For s3 it is Aws::vector, for hdfs it is std::vector. class RemoteFSPathKeeper @@ -191,10 +208,8 @@ using RemoteDiskPtr = std::shared_ptr; /// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS struct RemoteMetadata { - using PathAndSize = std::pair; - /// Remote FS objects paths and their sizes. - std::vector remote_fs_objects; + std::vector remote_fs_objects; /// URI const String & remote_fs_root_path; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 08b39cade79..16a57b83771 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -43,7 +43,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S auto remote_file_reader_creator = [=, this]() { return std::make_unique( - client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, + client_ptr, bucket, fs::path(common_path_prefix) / path, max_single_read_retries, settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true); }; @@ -83,9 +83,13 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const #endif -ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const ReadSettings & settings_) +ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather( + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, + const ReadSettings & settings_) : ReadBuffer(nullptr, 0) - , metadata(metadata_) + , common_path_prefix(common_path_prefix_) + , blobs_to_read(blobs_to_read_) , settings(settings_) , log(&Poco::Logger::get("ReadBufferFromRemoteFSGather")) { @@ -118,9 +122,9 @@ void ReadBufferFromRemoteFSGather::initialize() { /// One clickhouse file can be split into multiple files in remote fs. auto current_buf_offset = file_offset_of_buffer_end; - for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) + for (size_t i = 0; i < blobs_to_read.size(); ++i) { - const auto & [file_path, size] = metadata.remote_fs_objects[i]; + const auto & [file_path, size] = blobs_to_read[i]; if (size > current_buf_offset) { @@ -137,7 +141,7 @@ void ReadBufferFromRemoteFSGather::initialize() current_buf_offset -= size; } - current_buf_idx = metadata.remote_fs_objects.size(); + current_buf_idx = blobs_to_read.size(); current_buf = nullptr; } @@ -167,12 +171,12 @@ bool ReadBufferFromRemoteFSGather::nextImpl() bool ReadBufferFromRemoteFSGather::moveToNextBuffer() { /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) + if (current_buf_idx + 1 >= blobs_to_read.size()) return false; ++current_buf_idx; - const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx]; + const auto & [path, size] = blobs_to_read[current_buf_idx]; current_buf = createImplementationBuffer(path, size); return true; @@ -201,7 +205,7 @@ bool ReadBufferFromRemoteFSGather::readImpl() if (!result) result = current_buf->next(); - if (metadata.remote_fs_objects.size() == 1) + if (blobs_to_read.size() == 1) { file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd(); } @@ -254,8 +258,8 @@ String ReadBufferFromRemoteFSGather::getFileName() const size_t ReadBufferFromRemoteFSGather::getFileSize() const { size_t size = 0; - for (const auto & object : metadata.remote_fs_objects) - size += object.second; + for (const auto & object : blobs_to_read) + size += object.bytes_size; return size; } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 57b409bc740..f8a6209eb59 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -16,6 +16,8 @@ namespace Poco { class Logger; } namespace DB { + + /** * Remote disk might need to split one clickhouse file into multiple files in remote fs. * This class works like a proxy to allow transition from one file into multiple. @@ -26,7 +28,8 @@ friend class ReadIndirectBufferFromRemoteFS; public: ReadBufferFromRemoteFSGather( - const RemoteMetadata & metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, const ReadSettings & settings_); String getFileName() const; @@ -56,7 +59,9 @@ public: protected: virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0; - RemoteMetadata metadata; + std::string common_path_prefix; + + BlobsPathToSize blobs_to_read; ReadSettings settings; @@ -100,10 +105,11 @@ public: ReadBufferFromS3Gather( std::shared_ptr client_ptr_, const String & bucket_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, size_t max_single_read_retries_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , client_ptr(std::move(client_ptr_)) , bucket(bucket_) , max_single_read_retries(max_single_read_retries_) @@ -127,11 +133,12 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS public: ReadBufferFromAzureBlobStorageGather( std::shared_ptr blob_container_client_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, size_t max_single_read_retries_, size_t max_single_download_retries_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , blob_container_client(blob_container_client_) , max_single_read_retries(max_single_read_retries_) , max_single_download_retries(max_single_download_retries_) @@ -153,10 +160,11 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather public: ReadBufferFromWebServerGather( const String & uri_, - RemoteMetadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, ContextPtr context_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , uri(uri_) , context(context_) { @@ -178,9 +186,10 @@ public: ReadBufferFromHDFSGather( const Poco::Util::AbstractConfiguration & config_, const String & hdfs_uri_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , config(config_) { const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2); diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 90cec1d5dc9..b6171a41dfb 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -237,7 +237,7 @@ std::unique_ptr DiskS3::readFile(const String & path, co } auto s3_impl = std::make_unique( - settings->client, bucket, metadata, + settings->client, bucket, metadata.remote_fs_root_path, metadata.remote_fs_objects, settings->s3_max_single_read_retries, disk_read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)