From 5196075fd9c7acc344f551e90d7808bbad0a92e5 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 19 May 2021 12:03:00 +0000 Subject: [PATCH] Better --- src/Disks/HDFS/DiskHDFS.cpp | 135 ++++++---------------------- src/Disks/HDFS/DiskHDFS.h | 32 +++---- src/Disks/IDiskRemote.cpp | 95 +++++++++++++++++++- src/Disks/IDiskRemote.h | 52 ++++++++++- src/Disks/S3/DiskS3.cpp | 130 +-------------------------- src/Disks/S3/DiskS3.h | 22 ++--- src/Disks/S3/registerDiskS3.cpp | 1 - src/Disks/tests/gtest_disk_hdfs.cpp | 17 ++-- 8 files changed, 200 insertions(+), 284 deletions(-) diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index e92033b5b4f..defc30ab654 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -26,14 +26,14 @@ namespace ErrorCodes DiskHDFS::DiskHDFS( const String & disk_name_, const String & hdfs_root_path_, + SettingsPtr settings_, const String & metadata_path_, - const Poco::Util::AbstractConfiguration & config_, - size_t min_bytes_for_seek_) - : IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS") + const Poco::Util::AbstractConfiguration & config_) + : IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskS3") , config(config_) - , min_bytes_for_seek(min_bytes_for_seek_) , hdfs_builder(createHDFSBuilder(hdfs_root_path_, config)) , hdfs_fs(createHDFSFS(hdfs_builder.get())) + , settings(std::move(settings_)) { } @@ -47,7 +47,7 @@ std::unique_ptr DiskHDFS::readFile(const String & path, backQuote(metadata_path + path), metadata.remote_fs_objects.size()); auto reader = std::make_unique(config, remote_fs_root_path, metadata, buf_size); - return std::make_unique(std::move(reader), min_bytes_for_seek); + return std::make_unique(std::move(reader), settings->min_bytes_for_seek); } @@ -91,120 +91,34 @@ std::unique_ptr DiskHDFS::writeFile(const String & path } -void DiskHDFS::removeFromRemoteFS(const Metadata & metadata) +void DiskHDFS::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) { - for (const auto & [hdfs_object_path, _] : metadata.remote_fs_objects) + if (!fs_paths_keeper.empty()) { - /// Add path from root to file name - const size_t begin_of_path = remote_fs_root_path.find('/', remote_fs_root_path.find("//") + 2); - const String hdfs_path = remote_fs_root_path.substr(begin_of_path) + hdfs_object_path; - - int res = hdfsDelete(hdfs_fs.get(), hdfs_path.c_str(), 0); - if (res == -1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); - } -} - - -void DiskHDFS::removeSharedFile(const String & path, bool keep_in_remote_fs) -{ - removeMeta(path, keep_in_remote_fs); -} - - -void DiskHDFS::removeSharedRecursive(const String & path, bool keep_in_remote_fs) -{ - removeMetaRecursive(path, keep_in_remote_fs); -} - - -void DiskHDFS::removeFileIfExists(const String & path) -{ - if (Poco::File(metadata_path + path).exists()) - removeMeta(path, /* keep_in_remote_fs */ false); -} - - -void DiskHDFS::removeRecursive(const String & path) -{ - checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - - Poco::File file(metadata_path + path); - if (file.isFile()) - { - removeFile(path); - } - else - { - for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) - removeRecursive(it->path()); - file.remove(); - } -} - - -void DiskHDFS::removeMeta(const String & path, bool keep_in_remote_fs) -{ - Poco::File file(metadata_path + path); - - if (!file.isFile()) - throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path); - - try - { - auto metadata = readMeta(path); - - /// If there is no references - delete content from remote FS. - if (metadata.ref_count == 0) + for (const auto & chunk : fs_paths_keeper) { - file.remove(); + for (const auto & hdfs_object_path : chunk) + { + const String hdfs_path = hdfs_object_path.GetKey(); + const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2); - if (!keep_in_remote_fs) - removeFromRemoteFS(metadata); - } - else /// In other case decrement number of references, save metadata and delete file. - { - --metadata.ref_count; - metadata.save(); - file.remove(); + /// Add path from root to file name + int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0); + if (res == -1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); + } } } - catch (const Exception & e) - { - /// If it's impossible to read meta - just remove it from FS. - if (e.code() == ErrorCodes::UNKNOWN_FORMAT) - { - LOG_WARNING( - log, - "Metadata file {} can't be read by reason: {}. Removing it forcibly.", - backQuote(path), - e.nested() ? e.nested()->message() : e.message()); - - file.remove(); - } - else - throw; - } } -void DiskHDFS::removeMetaRecursive(const String & path, bool keep_in_remote_fs) +namespace { - checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - - Poco::File file(metadata_path + path); - if (file.isFile()) - { - removeMeta(path, keep_in_remote_fs); - } - else - { - for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) - removeMetaRecursive(it->path(), keep_in_remote_fs); - file.remove(); - } +std::unique_ptr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) +{ + return std::make_unique(config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024)); +} } - void registerDiskHDFS(DiskFactory & factory) { @@ -224,8 +138,9 @@ void registerDiskHDFS(DiskFactory & factory) String metadata_path = context_->getPath() + "disks/" + name + "/"; return std::make_shared( - name, uri, metadata_path, config, - config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024)); + name, uri, + getSettings(config, config_prefix), + metadata_path, config); }; factory.registerDiskType("hdfs", creator); diff --git a/src/Disks/HDFS/DiskHDFS.h b/src/Disks/HDFS/DiskHDFS.h index 28266d4afd0..92e3c922ecd 100644 --- a/src/Disks/HDFS/DiskHDFS.h +++ b/src/Disks/HDFS/DiskHDFS.h @@ -9,24 +9,31 @@ namespace DB { +struct DiskHDFSSettings +{ + size_t min_bytes_for_seek; + DiskHDFSSettings(int min_bytes_for_seek_) : min_bytes_for_seek(min_bytes_for_seek_) {} +}; + /** * Storage for persisting data in HDFS and metadata on the local disk. * Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file) * that contains HDFS object key with actual data. */ -class DiskHDFS : public IDiskRemote +class DiskHDFS final : public IDiskRemote { friend class DiskHDFSReservation; public: + using SettingsPtr = std::unique_ptr; DiskHDFS( const String & disk_name_, const String & hdfs_root_path_, + SettingsPtr settings_, const String & metadata_path_, - const Poco::Util::AbstractConfiguration & config_, - size_t min_bytes_for_seek_); + const Poco::Util::AbstractConfiguration & config_); DiskType::Type getType() const override { return DiskType::Type::HDFS; } @@ -40,32 +47,17 @@ public: std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; - void removeSharedFile(const String & path, bool keep_in_remote_fs) override; - - void removeSharedRecursive(const String & path, bool) override; - - void removeFileIfExists(const String & path) override; - - void removeRecursive(const String & path) override; - - void removeFile(const String & path) override { removeSharedFile(path, false); } + void removeFromRemoteFS(const RemoteFSPathKeeper & keys) override; private: - void removeFromRemoteFS(const Metadata & metadata); - - void removeMetaRecursive(const String & path, bool keep_in_remote_fs); - - void removeMeta(const String & path, bool keep_in_remote_fs); - String getRandomName() { return toString(UUIDHelpers::generateV4()); } const Poco::Util::AbstractConfiguration & config; - size_t min_bytes_for_seek; HDFSBuilderWrapper hdfs_builder; HDFSFSPtr hdfs_fs; - std::mutex copying_mutex; + SettingsPtr settings; }; } diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index 66acf15d4fe..abde1d3a563 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -22,6 +23,7 @@ namespace ErrorCodes extern const int UNKNOWN_FORMAT; extern const int FILE_ALREADY_EXISTS; extern const int PATH_ACCESS_DENIED;; + extern const int CANNOT_DELETE_DIRECTORY; } @@ -173,6 +175,66 @@ IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const } +void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_keeper) +{ + LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path)); + + Poco::File file(metadata_path + path); + + if (!file.isFile()) + throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path); + + try + { + auto metadata = readMeta(path); + + /// If there is no references - delete content from remote FS. + if (metadata.ref_count == 0) + { + file.remove(); + for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects) + fs_paths_keeper.addPath(remote_fs_root_path + remote_fs_object_path); + } + else /// In other case decrement number of references, save metadata and delete file. + { + --metadata.ref_count; + metadata.save(); + file.remove(); + } + } + catch (const Exception & e) + { + /// If it's impossible to read meta - just remove it from FS. + if (e.code() == ErrorCodes::UNKNOWN_FORMAT) + { + LOG_WARNING(log, + "Metadata file {} can't be read by reason: {}. Removing it forcibly.", + backQuote(path), e.nested() ? e.nested()->message() : e.message()); + file.remove(); + } + else + throw; + } +} + + +void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeper & fs_paths_keeper) +{ + checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. + + Poco::File file(metadata_path + path); + if (file.isFile()) + { + removeMeta(path, fs_paths_keeper); + } + else + { + for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) + removeMetaRecursive(it->path(), fs_paths_keeper); + file.remove(); + } +} + DiskPtr DiskRemoteReservation::getDisk(size_t i) const { if (i != 0) @@ -224,10 +286,10 @@ IDiskRemote::IDiskRemote( const String & log_name_, std::unique_ptr executor_) : IDisk(std::move(executor_)) + , log(&Poco::Logger::get(log_name_)) , name(name_) , remote_fs_root_path(remote_fs_root_path_) , metadata_path(metadata_path_) - , log(&Poco::Logger::get(log_name_)) { } @@ -282,6 +344,35 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path) } +void IDiskRemote::removeFileIfExists(const String & path) +{ + RemoteFSPathKeeper fs_paths_keeper; + if (Poco::File(metadata_path + path).exists()) + { + removeMeta(path, fs_paths_keeper); + removeFromRemoteFS(fs_paths_keeper); + } +} + + +void IDiskRemote::removeSharedFile(const String & path, bool keep_remote_fs) +{ + RemoteFSPathKeeper fs_paths_keeper; + removeMeta(path, fs_paths_keeper); + if (!keep_remote_fs) + removeFromRemoteFS(fs_paths_keeper); +} + + +void IDiskRemote::removeSharedRecursive(const String & path, bool keep_remote_fs) +{ + RemoteFSPathKeeper fs_paths_keeper; + removeMetaRecursive(path, fs_paths_keeper); + if (!keep_remote_fs) + removeFromRemoteFS(fs_paths_keeper); +} + + void IDiskRemote::setReadOnly(const String & path) { /// We should store read only flag inside metadata file (instead of using FS flag), @@ -375,7 +466,7 @@ bool IDiskRemote::tryReserve(UInt64 bytes) std::lock_guard lock(reservation_mutex); if (bytes == 0) { - LOG_DEBUG(log, "Reserving 0 bytes on s3 disk {}", backQuote(name)); + LOG_DEBUG(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name)); ++reservation_count; return true; } diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index f3b19a76bb3..02e0c7f082e 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -4,11 +4,40 @@ #include "Disks/DiskFactory.h" #include "Disks/Executor.h" #include +#include +#include +#include +#include +#include +#include +#include namespace DB { +/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API) +class RemoteFSPathKeeper : public std::list> +{ +public: + void addPath(const String & path) + { + if (empty() || back().size() >= chunk_limit) + { /// add one more chunk + push_back(value_type()); + back().reserve(chunk_limit); + } + + Aws::S3::Model::ObjectIdentifier obj; + obj.SetKey(path); + back().push_back(obj); + } + +private: + /// limit for one DeleteObject request + /// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html + const static size_t chunk_limit = 1000; +}; /// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS) class IDiskRemote : public IDisk @@ -26,9 +55,9 @@ public: struct Metadata; - const String & getName() const override { return name; } + const String & getName() const final override { return name; } - const String & getPath() const override { return metadata_path; } + const String & getPath() const final override { return metadata_path; } Metadata readMeta(const String & path) const; @@ -56,6 +85,16 @@ public: void replaceFile(const String & from_path, const String & to_path) override; + void removeFile(const String & path) override { removeSharedFile(path, false); } + + void removeFileIfExists(const String & path) override; + + void removeRecursive(const String & path) override { removeSharedRecursive(path, false); } + + void removeSharedFile(const String & path, bool keep_in_remote_fs) override; + + void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override; + void listFiles(const String & path, std::vector & file_names) override; void setReadOnly(const String & path) override; @@ -82,13 +121,20 @@ public: ReservationPtr reserve(UInt64 bytes) override; + virtual void removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) = 0; + protected: + Poco::Logger * log; const String name; const String remote_fs_root_path; + const String metadata_path; - Poco::Logger * log; private: + void removeMeta(const String & path, RemoteFSPathKeeper & keys); + + void removeMetaRecursive(const String & path, RemoteFSPathKeeper & keys); + bool tryReserve(UInt64 bytes); UInt64 reserved_bytes = 0; diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index f74da9bee42..c1903fb0c1e 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -16,18 +16,12 @@ #include #include #include -#include #include #include #include +#include #include -#include -#include -#include -#include -#include - #include @@ -41,36 +35,10 @@ namespace ErrorCodes extern const int CANNOT_SEEK_THROUGH_FILE; extern const int UNKNOWN_FORMAT; extern const int BAD_ARGUMENTS; - extern const int CANNOT_DELETE_DIRECTORY; extern const int LOGICAL_ERROR; } -/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API) -class DiskS3::AwsS3KeyKeeper : public std::list> -{ -public: - void addKey(const String & key); - -private: - /// limit for one DeleteObject request - /// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html - const static size_t chunk_limit = 1000; -}; - -void DiskS3::AwsS3KeyKeeper::addKey(const String & key) -{ - if (empty() || back().size() >= chunk_limit) - { /// add one more chunk - push_back(value_type()); - back().reserve(chunk_limit); - } - - Aws::S3::Model::ObjectIdentifier obj; - obj.SetKey(key); - back().push_back(obj); -} - String getRandomName() { std::uniform_int_distribution distribution('a', 'z'); @@ -400,76 +368,13 @@ std::unique_ptr DiskS3::writeFile(const String & path, return std::make_unique(std::move(s3_buffer), std::move(metadata), s3_path); } -void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) +void DiskS3::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) { - LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path)); - - Poco::File file(metadata_path + path); - - if (!file.isFile()) - throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path); - - try - { - auto metadata = readMeta(path); - - /// If there is no references - delete content from S3. - if (metadata.ref_count == 0) - { - file.remove(); - - for (const auto & [s3_object_path, _] : metadata.remote_fs_objects) - keys.addKey(remote_fs_root_path + s3_object_path); - } - else /// In other case decrement number of references, save metadata and delete file. - { - --metadata.ref_count; - metadata.save(); - file.remove(); - } - } - catch (const Exception & e) - { - /// If it's impossible to read meta - just remove it from FS. - if (e.code() == ErrorCodes::UNKNOWN_FORMAT) - { - LOG_WARNING( - log, - "Metadata file {} can't be read by reason: {}. Removing it forcibly.", - backQuote(path), - e.nested() ? e.nested()->message() : e.message()); - - file.remove(); - } - else - throw; - } -} - -void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys) -{ - checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - - Poco::File file(metadata_path + path); - if (file.isFile()) - { - removeMeta(path, keys); - } - else - { - for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) - removeMetaRecursive(it->path(), keys); - file.remove(); - } -} - -void DiskS3::removeAws(const AwsS3KeyKeeper & keys) -{ - if (!keys.empty()) + if (!fs_paths_keeper.empty()) { auto settings = current_settings.get(); - for (const auto & chunk : keys) + for (const auto & chunk : fs_paths_keeper) { Aws::S3::Model::Delete delkeys; delkeys.SetObjects(chunk); @@ -484,36 +389,9 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys) } } -void DiskS3::removeFileIfExists(const String & path) -{ - AwsS3KeyKeeper keys; - if (Poco::File(metadata_path + path).exists()) - { - removeMeta(path, keys); - removeAws(keys); - } -} - -void DiskS3::removeSharedFile(const String & path, bool keep_s3) -{ - AwsS3KeyKeeper keys; - removeMeta(path, keys); - if (!keep_s3) - removeAws(keys); -} - -void DiskS3::removeSharedRecursive(const String & path, bool keep_s3) -{ - AwsS3KeyKeeper keys; - removeMetaRecursive(path, keys); - if (!keep_s3) - removeAws(keys); -} - void DiskS3::createHardLink(const String & src_path, const String & dst_path) { auto settings = current_settings.get(); - createHardLink(src_path, dst_path, settings->send_metadata); } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index b8688ae30aa..f55fca010ff 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -2,7 +2,6 @@ #include #include -#include #include "Disks/DiskFactory.h" #include "Disks/Executor.h" @@ -41,22 +40,22 @@ struct DiskS3Settings int list_object_keys_size; }; + /** * Storage for persisting data in S3 and metadata on the local disk. * Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file) * that contains S3 object key with actual data. */ -class DiskS3 : public IDiskRemote +class DiskS3 final : public IDiskRemote { public: using ObjectMetadata = std::map; using Futures = std::vector>; + using SettingsPtr = std::unique_ptr; using GetDiskSettings = std::function; friend class DiskS3Reservation; - - class AwsS3KeyKeeper; struct RestoreInformation; DiskS3( @@ -80,16 +79,11 @@ public: size_t buf_size, WriteMode mode) override; + void removeFromRemoteFS(const RemoteFSPathKeeper & keeper) final override; + void moveFile(const String & from_path, const String & to_path, bool send_metadata); void moveFile(const String & from_path, const String & to_path) override; - void removeFile(const String & path) override { removeSharedFile(path, false); } - void removeFileIfExists(const String & path) override; - void removeRecursive(const String & path) override { removeSharedRecursive(path, false); } - - void removeSharedFile(const String & path, bool keep_s3) override; - void removeSharedRecursive(const String & path, bool keep_s3) override; - void createHardLink(const String & src_path, const String & dst_path) override; void createHardLink(const String & src_path, const String & dst_path, bool send_metadata); @@ -113,10 +107,6 @@ public: void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override; private: - void removeMeta(const String & path, AwsS3KeyKeeper & keys); - void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys); - void removeAws(const AwsS3KeyKeeper & keys); - void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); /// Converts revision to binary string with leading zeroes (64 bit). static String revisionToString(UInt64 revision); @@ -151,8 +141,8 @@ private: static String pathToDetached(const String & source_path); const String bucket; - MultiVersion current_settings; + MultiVersion current_settings; /// Gets disk settings from context. GetDiskSettings settings_getter; diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 767e8890a01..c4632883d37 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -225,4 +225,3 @@ void registerDiskS3(DiskFactory & factory) void registerDiskS3(DiskFactory &) {} #endif - diff --git a/src/Disks/tests/gtest_disk_hdfs.cpp b/src/Disks/tests/gtest_disk_hdfs.cpp index 14397931920..7b04feb1416 100644 --- a/src/Disks/tests/gtest_disk_hdfs.cpp +++ b/src/Disks/tests/gtest_disk_hdfs.cpp @@ -11,7 +11,7 @@ #include #include -const String hdfs_uri = "hdfs://192.168.112.2:9000/disk_test/"; +const String hdfs_uri = "hdfs://172.20.0.2:9000/disk_test/"; const String metadata_path = "/home/kssenii/metadata/"; const String config_path = "/home/kssenii/ClickHouse/programs/server/config.xml"; const String file_name = "test.txt"; @@ -20,7 +20,8 @@ const String file_name = "test.txt"; TEST(DiskTestHDFS, RemoveFileHDFS) { Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); - auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); DB::HDFSBuilderWrapper builder = DB::createHDFSBuilder(hdfs_uri, *config); DB::HDFSFSPtr fs = DB::createHDFSFS(builder.get()); @@ -43,7 +44,8 @@ TEST(DiskTestHDFS, RemoveFileHDFS) TEST(DiskTestHDFS, WriteReadHDFS) { Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); - auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); { auto out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite); @@ -64,7 +66,8 @@ TEST(DiskTestHDFS, WriteReadHDFS) TEST(DiskTestHDFS, RewriteFileHDFS) { Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); - auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); for (size_t i = 1; i <= 10; ++i) { @@ -88,7 +91,8 @@ TEST(DiskTestHDFS, RewriteFileHDFS) TEST(DiskTestHDFS, AppendFileHDFS) { Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); - auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); { std::unique_ptr out = disk.writeFile(file_name, 1024, DB::WriteMode::Append); @@ -117,7 +121,8 @@ TEST(DiskTestHDFS, AppendFileHDFS) TEST(DiskTestHDFS, SeekHDFS) { Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); - auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); { std::unique_ptr out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);