diff --git a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp index 97e98fb3a3e..78b9b9e3446 100644 --- a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp +++ b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp @@ -71,8 +71,8 @@ std::unique_ptr DiskAzureBlobStorage::readFile( LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path)); auto reader_impl = std::make_unique( - path, blob_container_client, metadata, settings->max_single_read_retries, - settings->max_single_download_retries, read_settings); + blob_container_client, metadata.remote_fs_root_path, metadata.remote_fs_objects, + settings->max_single_read_retries, settings->max_single_download_retries, read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { @@ -109,7 +109,7 @@ std::unique_ptr DiskAzureBlobStorage::writeFile( readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; }); }; - return std::make_unique(std::move(buffer), std::move(create_metadata_callback), path); + return std::make_unique(std::move(buffer), std::move(create_metadata_callback), blob_path); } diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index f3039d9af2e..2f8929982e3 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -166,9 +166,9 @@ std::unique_ptr DiskWebServer::readFile(const String & p remote_path = remote_path.string().substr(url.size()); RemoteMetadata meta(path, remote_path); - meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size)); + meta.remote_fs_objects.emplace_back(remote_path, iter->second.size); - auto web_impl = std::make_unique(path, url, meta, getContext(), read_settings); + auto web_impl = std::make_unique(url, meta.remote_fs_root_path, meta.remote_fs_objects, getContext(), read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index f78ecd2669a..a3817a85a36 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -82,7 +82,7 @@ std::unique_ptr DiskHDFS::readFile(const String & path, "Read from file by path: {}. Existing HDFS objects: {}", backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size()); - auto hdfs_impl = std::make_unique(path, config, remote_fs_root_path, metadata, read_settings); + auto hdfs_impl = std::make_unique(config, remote_fs_root_path, remote_fs_root_path, metadata.remote_fs_objects, read_settings); auto buf = std::make_unique(std::move(hdfs_impl)); return std::make_unique(std::move(buf), settings->min_bytes_for_seek); } @@ -91,8 +91,8 @@ std::unique_ptr DiskHDFS::readFile(const String & path, std::unique_ptr DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &) { /// Path to store new HDFS object. - auto file_name = getRandomName(); - auto hdfs_path = remote_fs_root_path + file_name; + std::string file_name = getRandomName(); + std::string hdfs_path = fs::path(remote_fs_root_path) / file_name; LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), hdfs_path); @@ -106,7 +106,7 @@ std::unique_ptr DiskHDFS::writeFile(const String & path readOrCreateUpdateAndStoreMetadata(path, mode, false, [file_name, count] (Metadata & metadata) { metadata.addObject(file_name, count); return true; }); }; - return std::make_unique(std::move(hdfs_buffer), std::move(create_metadata_callback), path); + return std::make_unique(std::move(hdfs_buffer), std::move(create_metadata_callback), hdfs_path); } diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index fb1c0ddc378..b475ae1ee94 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -122,7 +122,8 @@ void IDiskRemote::Metadata::load() remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); } assertChar('\n', *buf); - remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size}; + remote_fs_objects[i].relative_path = remote_fs_object_path; + remote_fs_objects[i].bytes_size = remote_fs_object_size; } readIntText(ref_count, *buf); @@ -638,7 +639,7 @@ String IDiskRemote::getUniqueId(const String & path) const auto metadata = readMetadata(path); String id; if (!metadata.remote_fs_objects.empty()) - id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first; + id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path; return id; } diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index a8a299391bf..aa78468c7bb 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -13,7 +13,6 @@ #include #include - namespace CurrentMetrics { extern const Metric DiskSpaceReservedForMerge; @@ -22,6 +21,24 @@ namespace CurrentMetrics namespace DB { +/// Path to blob with it's size +struct BlobPathWithSize +{ + std::string relative_path; + uint64_t bytes_size; + + BlobPathWithSize() = default; + BlobPathWithSize(const BlobPathWithSize & other) = default; + + BlobPathWithSize(const std::string & relative_path_, uint64_t bytes_size_) + : relative_path(relative_path_) + , bytes_size(bytes_size_) + {} +}; + +/// List of blobs with their sizes +using BlobsPathToSize = std::vector; + /// Helper class to collect paths into chunks of maximum size. /// For s3 it is Aws::vector, for hdfs it is std::vector. class RemoteFSPathKeeper @@ -191,10 +208,8 @@ using RemoteDiskPtr = std::shared_ptr; /// Minimum info, required to be passed to ReadIndirectBufferFromRemoteFS struct RemoteMetadata { - using PathAndSize = std::pair; - /// Remote FS objects paths and their sizes. - std::vector remote_fs_objects; + std::vector remote_fs_objects; /// URI const String & remote_fs_root_path; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 7014b21e8b4..16a57b83771 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -43,7 +43,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S auto remote_file_reader_creator = [=, this]() { return std::make_unique( - client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, + client_ptr, bucket, fs::path(common_path_prefix) / path, max_single_read_retries, settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true); }; @@ -83,11 +83,14 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const #endif -ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const ReadSettings & settings_, const String & path_) +ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather( + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, + const ReadSettings & settings_) : ReadBuffer(nullptr, 0) - , metadata(metadata_) + , common_path_prefix(common_path_prefix_) + , blobs_to_read(blobs_to_read_) , settings(settings_) - , canonical_path(path_) , log(&Poco::Logger::get("ReadBufferFromRemoteFSGather")) { } @@ -119,9 +122,9 @@ void ReadBufferFromRemoteFSGather::initialize() { /// One clickhouse file can be split into multiple files in remote fs. auto current_buf_offset = file_offset_of_buffer_end; - for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) + for (size_t i = 0; i < blobs_to_read.size(); ++i) { - const auto & [file_path, size] = metadata.remote_fs_objects[i]; + const auto & [file_path, size] = blobs_to_read[i]; if (size > current_buf_offset) { @@ -138,7 +141,7 @@ void ReadBufferFromRemoteFSGather::initialize() current_buf_offset -= size; } - current_buf_idx = metadata.remote_fs_objects.size(); + current_buf_idx = blobs_to_read.size(); current_buf = nullptr; } @@ -168,12 +171,12 @@ bool ReadBufferFromRemoteFSGather::nextImpl() bool ReadBufferFromRemoteFSGather::moveToNextBuffer() { /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) + if (current_buf_idx + 1 >= blobs_to_read.size()) return false; ++current_buf_idx; - const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx]; + const auto & [path, size] = blobs_to_read[current_buf_idx]; current_buf = createImplementationBuffer(path, size); return true; @@ -202,7 +205,7 @@ bool ReadBufferFromRemoteFSGather::readImpl() if (!result) result = current_buf->next(); - if (metadata.remote_fs_objects.size() == 1) + if (blobs_to_read.size() == 1) { file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd(); } @@ -255,8 +258,8 @@ String ReadBufferFromRemoteFSGather::getFileName() const size_t ReadBufferFromRemoteFSGather::getFileSize() const { size_t size = 0; - for (const auto & object : metadata.remote_fs_objects) - size += object.second; + for (const auto & object : blobs_to_read) + size += object.bytes_size; return size; } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 25bfe0b7e16..d12513cba1f 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -26,9 +26,9 @@ friend class ReadIndirectBufferFromRemoteFS; public: ReadBufferFromRemoteFSGather( - const RemoteMetadata & metadata_, - const ReadSettings & settings_, - const String & path_); + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, + const ReadSettings & settings_); String getFileName() const; @@ -57,7 +57,9 @@ public: protected: virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0; - RemoteMetadata metadata; + std::string common_path_prefix; + + BlobsPathToSize blobs_to_read; ReadSettings settings; @@ -89,8 +91,6 @@ private: */ size_t bytes_to_ignore = 0; - String canonical_path; - Poco::Logger * log; }; @@ -101,13 +101,13 @@ class ReadBufferFromS3Gather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromS3Gather( - const String & path_, std::shared_ptr client_ptr_, const String & bucket_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, size_t max_single_read_retries_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_, path_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , client_ptr(std::move(client_ptr_)) , bucket(bucket_) , max_single_read_retries(max_single_read_retries_) @@ -130,13 +130,13 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS { public: ReadBufferFromAzureBlobStorageGather( - const String & path_, std::shared_ptr blob_container_client_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, size_t max_single_read_retries_, size_t max_single_download_retries_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_, path_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , blob_container_client(blob_container_client_) , max_single_read_retries(max_single_read_retries_) , max_single_download_retries(max_single_download_retries_) @@ -157,12 +157,12 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromWebServerGather( - const String & path_, const String & uri_, - RemoteMetadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, ContextPtr context_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_, path_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , uri(uri_) , context(context_) { @@ -182,12 +182,12 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromHDFSGather( - const String & path_, const Poco::Util::AbstractConfiguration & config_, const String & hdfs_uri_, - IDiskRemote::Metadata metadata_, + const std::string & common_path_prefix_, + const BlobsPathToSize & blobs_to_read_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_, settings_, path_) + : ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_) , config(config_) { const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2); diff --git a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp index 9b604341da9..dca2fb17ba7 100644 --- a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp @@ -12,10 +12,10 @@ namespace DB WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS( std::unique_ptr impl_, CreateMetadataCallback && create_callback_, - const String & metadata_file_path_) + const String & remote_path_) : WriteBufferFromFileDecorator(std::move(impl_)) , create_metadata_callback(std::move(create_callback_)) - , metadata_file_path(metadata_file_path_) + , remote_path(remote_path_) { } diff --git a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.h b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.h index 25a93e2fe07..84bd2b99c7e 100644 --- a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.h +++ b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.h @@ -18,17 +18,17 @@ public: WriteIndirectBufferFromRemoteFS( std::unique_ptr impl_, CreateMetadataCallback && create_callback_, - const String & metadata_file_path_); + const String & remote_path_); ~WriteIndirectBufferFromRemoteFS() override; - String getFileName() const override { return metadata_file_path; } + String getFileName() const override { return remote_path; } private: void finalizeImpl() override; CreateMetadataCallback create_metadata_callback; - String metadata_file_path; + String remote_path; }; } diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index d879953bd9e..b6171a41dfb 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -237,7 +237,7 @@ std::unique_ptr DiskS3::readFile(const String & path, co } auto s3_impl = std::make_unique( - path, settings->client, bucket, metadata, + settings->client, bucket, metadata.remote_fs_root_path, metadata.remote_fs_objects, settings->s3_max_single_read_retries, disk_read_settings); if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) @@ -280,7 +280,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, auto s3_buffer = std::make_unique( settings->client, bucket, - remote_fs_root_path + blob_name, + fs::path(remote_fs_root_path) / blob_name, settings->s3_min_upload_part_size, settings->s3_upload_part_size_multiply_factor, settings->s3_upload_part_size_multiply_parts_count_threshold, @@ -293,7 +293,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_name, count] (Metadata & metadata) { metadata.addObject(blob_name, count); return true; }); }; - return std::make_unique(std::move(s3_buffer), std::move(create_metadata_callback), path); + return std::make_unique(std::move(s3_buffer), std::move(create_metadata_callback), fs::path(remote_fs_root_path) / blob_name); } void DiskS3::createHardLink(const String & src_path, const String & dst_path)