diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index 7e2163ce4d3..ef8c8859d46 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -2,9 +2,9 @@ #include #include -#include "ReadIndirectBufferFromHDFS.h" #include "WriteIndirectBufferFromHDFS.h" #include +#include #include #include #include @@ -22,6 +22,36 @@ namespace ErrorCodes extern const int UNKNOWN_FORMAT; } +/// Reads data from HDFS using stored paths in metadata. +class ReadIndirectBufferFromHDFS final : public ReadIndirectBufferFromRemoteFS +{ +public: + ReadIndirectBufferFromHDFS( + const Poco::Util::AbstractConfiguration & config_, + const String & hdfs_uri_, + DiskHDFS::Metadata metadata_, + size_t buf_size_) + : ReadIndirectBufferFromRemoteFS(metadata_) + , config(config_) + , buf_size(buf_size_) + { + const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2); + hdfs_directory = hdfs_uri_.substr(begin_of_path); + hdfs_uri = hdfs_uri_.substr(0, begin_of_path); + } + + std::unique_ptr createReadBuffer(const String & path) override + { + return std::make_unique(hdfs_uri, hdfs_directory + path, config, buf_size); + } + +private: + const Poco::Util::AbstractConfiguration & config; + String hdfs_uri; + String hdfs_directory; + size_t buf_size; +}; + DiskHDFS::DiskHDFS( const String & disk_name_, diff --git a/src/Disks/HDFS/ReadIndirectBufferFromHDFS.h b/src/Disks/HDFS/ReadIndirectBufferFromHDFS.h deleted file mode 100644 index fd7cb816db1..00000000000 --- a/src/Disks/HDFS/ReadIndirectBufferFromHDFS.h +++ /dev/null @@ -1,142 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int CANNOT_SEEK_THROUGH_FILE; -} - - -/// Reads data from HDFS using stored paths in metadata. -class ReadIndirectBufferFromHDFS final : public ReadBufferFromFileBase -{ -public: - ReadIndirectBufferFromHDFS( - const Poco::Util::AbstractConfiguration & config_, - const String & hdfs_uri_, - DiskHDFS::Metadata metadata_, - size_t buf_size_) - : config(config_) - , metadata(std::move(metadata_)) - , buf_size(buf_size_) - { - const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2); - hdfs_directory = hdfs_uri_.substr(begin_of_path); - hdfs_uri = hdfs_uri_.substr(0, begin_of_path); - } - - off_t seek(off_t offset_, int whence) override - { - if (whence == SEEK_CUR) - { - /// If position within current working buffer - shift pos. - if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position) - { - pos += offset_; - return getPosition(); - } - else - { - absolute_position += offset_; - } - } - else if (whence == SEEK_SET) - { - /// If position within current working buffer - shift pos. - if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size() - && size_t(offset_) < absolute_position) - { - pos = working_buffer.end() - (absolute_position - offset_); - return getPosition(); - } - else - { - absolute_position = offset_; - } - } - else - throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); - - current_buf = initialize(); - pos = working_buffer.end(); - - return absolute_position; - } - - off_t getPosition() override { return absolute_position - available(); } - - std::string getFileName() const override { return metadata.metadata_file_path; } - - -private: - std::unique_ptr initialize() - { - size_t offset = absolute_position; - - for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) - { - current_buf_idx = i; - const auto & [file_name, size] = metadata.remote_fs_objects[i]; - - if (size > offset) - { - auto buf = std::make_unique(hdfs_uri, hdfs_directory + file_name, config, buf_size); - buf->seek(offset, SEEK_SET); - return buf; - } - - offset -= size; - } - - return nullptr; - } - - bool nextImpl() override - { - /// Find first available buffer that fits to given offset. - if (!current_buf) - current_buf = initialize(); - - /// If current buffer has remaining data - use it. - if (current_buf && current_buf->next()) - { - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - return true; - } - - /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) - return false; - - ++current_buf_idx; - - const auto & file_name = metadata.remote_fs_objects[current_buf_idx].first; - current_buf = std::make_unique(hdfs_uri, hdfs_directory + file_name, config, buf_size); - - current_buf->next(); - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - - return true; - } - - const Poco::Util::AbstractConfiguration & config; - String hdfs_uri; - String hdfs_directory; - - DiskHDFS::Metadata metadata; - size_t buf_size; - - size_t absolute_position = 0; - size_t current_buf_idx = 0; - std::unique_ptr current_buf; -}; - -} diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index c1903fb0c1e..01c49e38098 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -7,8 +7,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -69,7 +69,7 @@ void throwIfError(const Aws::Utils::Outcome & response) } /// Reads data from S3 using stored paths in metadata. -class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase +class ReadIndirectBufferFromS3 final : public ReadIndirectBufferFromRemoteFS { public: ReadIndirectBufferFromS3( @@ -78,112 +78,24 @@ public: DiskS3::Metadata metadata_, size_t s3_max_single_read_retries_, size_t buf_size_) - : client_ptr(std::move(client_ptr_)) + : ReadIndirectBufferFromRemoteFS(metadata_) + , client_ptr(std::move(client_ptr_)) , bucket(bucket_) - , metadata(std::move(metadata_)) , s3_max_single_read_retries(s3_max_single_read_retries_) , buf_size(buf_size_) { } - off_t seek(off_t offset_, int whence) override + std::unique_ptr createReadBuffer(const String & path) override { - if (whence == SEEK_CUR) - { - /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position) - { - pos += offset_; - return getPosition(); - } - else - { - absolute_position += offset_; - } - } - else if (whence == SEEK_SET) - { - /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size() - && size_t(offset_) < absolute_position) - { - pos = working_buffer.end() - (absolute_position - offset_); - return getPosition(); - } - else - { - absolute_position = offset_; - } - } - else - throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); - - current_buf = initialize(); - pos = working_buffer.end(); - - return absolute_position; + return std::make_unique(client_ptr, bucket, metadata.remote_fs_root_path + path, s3_max_single_read_retries, buf_size); } - off_t getPosition() override { return absolute_position - available(); } - - std::string getFileName() const override { return metadata.metadata_file_path; } - private: - std::unique_ptr initialize() - { - size_t offset = absolute_position; - for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) - { - current_buf_idx = i; - const auto & [path, size] = metadata.remote_fs_objects[i]; - if (size > offset) - { - auto buf = std::make_unique(client_ptr, bucket, metadata.remote_fs_root_path + path, s3_max_single_read_retries, buf_size); - buf->seek(offset, SEEK_SET); - return buf; - } - offset -= size; - } - return nullptr; - } - - bool nextImpl() override - { - /// Find first available buffer that fits to given offset. - if (!current_buf) - current_buf = initialize(); - - /// If current buffer has remaining data - use it. - if (current_buf && current_buf->next()) - { - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - return true; - } - - /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) - return false; - - ++current_buf_idx; - const auto & path = metadata.remote_fs_objects[current_buf_idx].first; - current_buf = std::make_unique(client_ptr, bucket, metadata.remote_fs_root_path + path, s3_max_single_read_retries, buf_size); - current_buf->next(); - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - - return true; - } - std::shared_ptr client_ptr; const String & bucket; - DiskS3::Metadata metadata; size_t s3_max_single_read_retries; size_t buf_size; - - size_t absolute_position = 0; - size_t current_buf_idx = 0; - std::unique_ptr current_buf; }; /// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. diff --git a/src/Disks/tests/gtest_disk_hdfs.cpp b/src/Disks/tests/gtest_disk_hdfs.cpp index 7b04feb1416..27de458c92b 100644 --- a/src/Disks/tests/gtest_disk_hdfs.cpp +++ b/src/Disks/tests/gtest_disk_hdfs.cpp @@ -4,7 +4,7 @@ #include "gtest_disk.h" /// To enable tests set to 1. It is set to 0, because there is not HDFS instance in CI. -#define RUN_HDFS_TEST 0 +#define RUN_HDFS_TEST 1 #if RUN_HDFS_TEST diff --git a/src/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/IO/ReadIndirectBufferFromRemoteFS.cpp new file mode 100644 index 00000000000..f3dd77063fa --- /dev/null +++ b/src/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -0,0 +1,120 @@ +#include "ReadIndirectBufferFromRemoteFS.h" +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_SEEK_THROUGH_FILE; +} + + +template +ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( + IDiskRemote::Metadata metadata_) + : metadata(std::move(metadata_)) +{ +} + +template +off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) +{ + if (whence == SEEK_CUR) + { + /// If position within current working buffer - shift pos. + if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position) + { + pos += offset_; + return getPosition(); + } + else + { + absolute_position += offset_; + } + } + else if (whence == SEEK_SET) + { + /// If position within current working buffer - shift pos. + if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size() + && size_t(offset_) < absolute_position) + { + pos = working_buffer.end() - (absolute_position - offset_); + return getPosition(); + } + else + { + absolute_position = offset_; + } + } + else + throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + current_buf = initialize(); + pos = working_buffer.end(); + + return absolute_position; +} + + +template +std::unique_ptr ReadIndirectBufferFromRemoteFS::initialize() +{ + size_t offset = absolute_position; + for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) + { + current_buf_idx = i; + const auto & [file_path, size] = metadata.remote_fs_objects[i]; + if (size > offset) + { + auto buf = createReadBuffer(file_path); + buf->seek(offset, SEEK_SET); + return buf; + } + offset -= size; + } + return nullptr; +} + + +template +bool ReadIndirectBufferFromRemoteFS::nextImpl() +{ + /// Find first available buffer that fits to given offset. + if (!current_buf) + current_buf = initialize(); + + /// If current buffer has remaining data - use it. + if (current_buf && current_buf->next()) + { + working_buffer = current_buf->buffer(); + absolute_position += working_buffer.size(); + return true; + } + + /// If there is no available buffers - nothing to read. + if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) + return false; + + ++current_buf_idx; + const auto & path = metadata.remote_fs_objects[current_buf_idx].first; + + current_buf = createReadBuffer(path); + current_buf->next(); + + working_buffer = current_buf->buffer(); + absolute_position += working_buffer.size(); + + return true; +} + + +template +class ReadIndirectBufferFromRemoteFS; + +template +class ReadIndirectBufferFromRemoteFS; + +} diff --git a/src/IO/ReadIndirectBufferFromRemoteFS.h b/src/IO/ReadIndirectBufferFromRemoteFS.h new file mode 100644 index 00000000000..1c9ee3ef926 --- /dev/null +++ b/src/IO/ReadIndirectBufferFromRemoteFS.h @@ -0,0 +1,39 @@ +#include +#include +#include + + +namespace DB +{ + +/// Reads data from S3/HDFS using stored paths in metadata. +template +class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase +{ +public: + ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_); + + off_t seek(off_t offset_, int whence) override; + + off_t getPosition() override { return absolute_position - available(); } + + String getFileName() const override { return metadata.metadata_file_path; } + + virtual std::unique_ptr createReadBuffer(const String & path) = 0; + +protected: + IDiskRemote::Metadata metadata; + +private: + std::unique_ptr initialize(); + + bool nextImpl() override; + + size_t absolute_position = 0; + + size_t current_buf_idx = 0; + + std::unique_ptr current_buf; +}; + +}