From 2f6770c9a014cffa4b67906ab4351280f3401fe8 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 20 May 2021 10:44:02 +0000 Subject: [PATCH] Unite writeBuffers --- src/CMakeLists.txt | 1 + src/Disks/HDFS/DiskHDFS.cpp | 16 ++-- src/Disks/HDFS/WriteIndirectBufferFromHDFS.h | 82 -------------------- src/Disks/S3/DiskS3.cpp | 53 +------------ src/Disks/tests/gtest_disk_hdfs.cpp | 2 +- src/IO/WriteIndirectBufferFromRemoteFS.cpp | 63 +++++++++++++++ src/IO/WriteIndirectBufferFromRemoteFS.h | 32 ++++++++ 7 files changed, 109 insertions(+), 140 deletions(-) delete mode 100644 src/Disks/HDFS/WriteIndirectBufferFromHDFS.h create mode 100644 src/IO/WriteIndirectBufferFromRemoteFS.cpp create mode 100644 src/IO/WriteIndirectBufferFromRemoteFS.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3556e5828b0..5cf6439744b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -426,6 +426,7 @@ endif() if (USE_HDFS) dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY}) dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) + target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) endif() if (USE_AWS_S3) diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index ef8c8859d46..e7da612c573 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -2,9 +2,9 @@ #include #include -#include "WriteIndirectBufferFromHDFS.h" #include #include +#include #include #include #include @@ -59,7 +59,7 @@ DiskHDFS::DiskHDFS( SettingsPtr settings_, const String & metadata_path_, const Poco::Util::AbstractConfiguration & config_) - : IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskS3") + : IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS") , config(config_) , hdfs_builder(createHDFSBuilder(hdfs_root_path_, config)) , hdfs_fs(createHDFSFS(hdfs_builder.get())) @@ -92,9 +92,14 @@ std::unique_ptr DiskHDFS::writeFile(const String & path LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + hdfs_path); - return std::make_unique( - config, hdfs_path, file_name, metadata, buf_size, - mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND); /// Single O_WRONLY in libhdfs adds O_TRUNC + /// Single O_WRONLY in libhdfs adds O_TRUNC + auto hdfs_buffer = std::make_unique(hdfs_path, + config, buf_size, + mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND); + + return std::make_unique>(std::move(hdfs_buffer), + std::move(metadata), + file_name); } @@ -153,5 +158,4 @@ void registerDiskHDFS(DiskFactory & factory) factory.registerDiskType("hdfs", creator); } - } diff --git a/src/Disks/HDFS/WriteIndirectBufferFromHDFS.h b/src/Disks/HDFS/WriteIndirectBufferFromHDFS.h deleted file mode 100644 index 6662a3dc161..00000000000 --- a/src/Disks/HDFS/WriteIndirectBufferFromHDFS.h +++ /dev/null @@ -1,82 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - -/// Stores data in HDFS and adds the object key (HDFS path) and object size to metadata file on local FS. -/// If the file does not exist, it will be created before writing. -class WriteIndirectBufferFromHDFS final : public WriteBufferFromFileBase -{ -public: - WriteIndirectBufferFromHDFS( - const Poco::Util::AbstractConfiguration & config_, - const String & hdfs_name_, - const String & hdfs_path_, - DiskHDFS::Metadata metadata_, - size_t buf_size_, - int flags_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) - , impl(WriteBufferFromHDFS(hdfs_name_, config_, buf_size_, flags_)) - , metadata(std::move(metadata_)) - , hdfs_path(hdfs_path_) - { - } - - ~WriteIndirectBufferFromHDFS() override - { - try - { - finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - void finalize() override - { - if (finalized) - return; - - next(); - impl.finalize(); - - metadata.addObject(hdfs_path, count()); - metadata.save(); - - finalized = true; - } - - void sync() override - { - if (finalized) - metadata.save(true); - } - - std::string getFileName() const override { return metadata.metadata_file_path; } - -private: - void nextImpl() override - { - /// Transfer current working buffer to WriteBufferFromHDFS. - impl.swap(*this); - - /// Write actual data to HDFS. - impl.next(); - - /// Return back working buffer. - impl.swap(*this); - } - - WriteBufferFromHDFS impl; - bool finalized = false; - DiskHDFS::Metadata metadata; - String hdfs_path; -}; - -} diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 01c49e38098..d508bcf38e3 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -9,10 +9,10 @@ #include #include #include +#include #include #include #include -#include #include #include #include @@ -98,55 +98,6 @@ private: size_t buf_size; }; -/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. -class WriteIndirectBufferFromS3 final : public WriteBufferFromFileDecorator -{ -public: - WriteIndirectBufferFromS3( - std::unique_ptr impl_, - DiskS3::Metadata metadata_, - String & s3_path_) - : WriteBufferFromFileDecorator(std::move(impl_)) - , metadata(std::move(metadata_)) - , s3_path(s3_path_) { } - - virtual ~WriteIndirectBufferFromS3() override - { - try - { - WriteIndirectBufferFromS3::finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - void finalize() override - { - if (finalized) - return; - - WriteBufferFromFileDecorator::finalize(); - - metadata.addObject(s3_path, count()); - metadata.save(); - } - - void sync() override - { - if (finalized) - metadata.save(true); - } - - std::string getFileName() const override { return metadata.metadata_file_path; } - -private: - DiskS3::Metadata metadata; - String s3_path; -}; - - /// Runs tasks asynchronously using thread pool. class AsyncExecutor : public Executor { @@ -277,7 +228,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, std::move(object_metadata), buf_size); - return std::make_unique(std::move(s3_buffer), std::move(metadata), s3_path); + return std::make_unique>(std::move(s3_buffer), std::move(metadata), s3_path); } void DiskS3::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) diff --git a/src/Disks/tests/gtest_disk_hdfs.cpp b/src/Disks/tests/gtest_disk_hdfs.cpp index 27de458c92b..7b04feb1416 100644 --- a/src/Disks/tests/gtest_disk_hdfs.cpp +++ b/src/Disks/tests/gtest_disk_hdfs.cpp @@ -4,7 +4,7 @@ #include "gtest_disk.h" /// To enable tests set to 1. It is set to 0, because there is not HDFS instance in CI. -#define RUN_HDFS_TEST 1 +#define RUN_HDFS_TEST 0 #if RUN_HDFS_TEST diff --git a/src/IO/WriteIndirectBufferFromRemoteFS.cpp b/src/IO/WriteIndirectBufferFromRemoteFS.cpp new file mode 100644 index 00000000000..2b9c4f628b8 --- /dev/null +++ b/src/IO/WriteIndirectBufferFromRemoteFS.cpp @@ -0,0 +1,63 @@ +#include "WriteIndirectBufferFromRemoteFS.h" +#include +#include + + +namespace DB +{ + +/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS. +template +WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS( + std::unique_ptr impl_, + IDiskRemote::Metadata metadata_, + const String & remote_fs_path_) + : WriteBufferFromFileDecorator(std::move(impl_)) + , metadata(std::move(metadata_)) + , remote_fs_path(remote_fs_path_) +{ +} + + +template +WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS() +{ + try + { + WriteIndirectBufferFromRemoteFS::finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +template +void WriteIndirectBufferFromRemoteFS::finalize() +{ + if (finalized) + return; + + WriteBufferFromFileDecorator::finalize(); + + metadata.addObject(remote_fs_path, count()); + metadata.save(); +} + + +template +void WriteIndirectBufferFromRemoteFS::sync() +{ + if (finalized) + metadata.save(true); +} + + +template +class WriteIndirectBufferFromRemoteFS; + +template +class WriteIndirectBufferFromRemoteFS; + +} diff --git a/src/IO/WriteIndirectBufferFromRemoteFS.h b/src/IO/WriteIndirectBufferFromRemoteFS.h new file mode 100644 index 00000000000..246376c5366 --- /dev/null +++ b/src/IO/WriteIndirectBufferFromRemoteFS.h @@ -0,0 +1,32 @@ +#include +#include +#include + +namespace DB +{ + +/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS. +template +class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator +{ +public: + WriteIndirectBufferFromRemoteFS( + std::unique_ptr impl_, + IDiskRemote::Metadata metadata_, + const String & remote_fs_path_); + + virtual ~WriteIndirectBufferFromRemoteFS() override; + + void finalize() override; + + void sync() override; + + String getFileName() const override { return metadata.metadata_file_path; } + +private: + IDiskRemote::Metadata metadata; + + String remote_fs_path; +}; + +}