From 42c7383a126bd6293d7b09584d859eae1ea25435 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 11 Nov 2024 16:19:52 +0800 Subject: [PATCH 1/3] support supportsRightBoundedReads for ReadBufferFromHDFS and AsynchronousReadBufferFromHDFS --- .../HDFS/AsynchronousReadBufferFromHDFS.cpp | 15 ++++++++++ .../HDFS/AsynchronousReadBufferFromHDFS.h | 6 ++++ .../ObjectStorage/HDFS/ReadBufferFromHDFS.cpp | 28 +++++++++++++++++++ .../ObjectStorage/HDFS/ReadBufferFromHDFS.h | 6 ++++ 4 files changed, 55 insertions(+) diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp index 5f57a4714f1..792c9384e10 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp @@ -270,6 +270,21 @@ size_t AsynchronousReadBufferFromHDFS::getFileOffsetOfBufferEnd() const return file_offset_of_buffer_end; } +bool AsynchronousReadBufferFromHDFS::supportsRightBoundedReads() const +{ + return true; +} + +void AsynchronousReadBufferFromHDFS::setReadUntilPosition(size_t position) +{ + read_until_position = position; +} + +void AsynchronousReadBufferFromHDFS::setReadUntilEnd() +{ + read_until_position = impl->getFileSize(); +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h index 9846d74453b..c1112157985 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h @@ -31,6 +31,12 @@ public: ~AsynchronousReadBufferFromHDFS() override; + bool supportsRightBoundedReads() const override; + + void setReadUntilPosition(size_t position) override; + + void setReadUntilEnd() override; + off_t seek(off_t offset_, int whence) override; void prefetch(Priority priority) override; diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp index 91cee553d33..e597c84359d 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp @@ -165,6 +165,19 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemorysupportsRightBoundedReads(); +} + +void ReadBufferFromHDFS::setReadUntilPosition(size_t position) +{ + impl->setReadUntilPosition(position); +} + +void ReadBufferFromHDFS::setReadUntilEnd() +{ + impl->setReadUntilEnd(); +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h index 5c9b5d73d58..4a1d74cd628 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h @@ -34,6 +34,12 @@ public: ~ReadBufferFromHDFS() override; + bool supportsRightBoundedReads() const override; + + void setReadUntilPosition(size_t position) override; + + void setReadUntilEnd() override; + bool nextImpl() override; off_t seek(off_t offset_, int whence) override; From 7ca3a7b103e64532e6b1626ecf8f60a0143e720f Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 11 Nov 2024 17:28:08 +0800 Subject: [PATCH 2/3] utilize pread in AsynchronousReadBufferxxx --- .../IO/AsynchronousBoundedReadBuffer.cpp | 11 ++- src/Disks/IO/AsynchronousBoundedReadBuffer.h | 5 +- src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 91 ++++++++++++++----- src/IO/AsynchronousReader.h | 1 + src/IO/examples/read_buffer_from_hdfs.cpp | 69 ++++++++++++-- .../HDFS/AsynchronousReadBufferFromHDFS.cpp | 28 +++++- .../HDFS/AsynchronousReadBufferFromHDFS.h | 5 +- 7 files changed, 169 insertions(+), 41 deletions(-) diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp index b24b95af85c..6147325b508 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp @@ -47,7 +47,8 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer( IAsynchronousReader & reader_, const ReadSettings & settings_, AsyncReadCountersPtr async_read_counters_, - FilesystemReadPrefetchesLogPtr prefetches_log_) + FilesystemReadPrefetchesLogPtr prefetches_log_, + bool enable_read_at) : ReadBufferFromFileBase(0, nullptr, 0) , impl(std::move(impl_)) , read_settings(settings_) @@ -57,6 +58,7 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer( , log(getLogger("AsynchronousBoundedReadBuffer")) , async_read_counters(async_read_counters_) , prefetches_log(prefetches_log_) + , supports_read_at(enable_read_at && impl->supportsReadAt()) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); } @@ -87,6 +89,7 @@ std::future AsynchronousBoundedReadBuffer::readAsyn request.offset = file_offset_of_buffer_end; request.priority = Priority{read_settings.priority.value + priority.value}; request.ignore = bytes_to_ignore; + request.supports_read_at = supports_read_at; return reader.submit(request); } @@ -98,6 +101,7 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data, request.size = size; request.offset = file_offset_of_buffer_end; request.ignore = bytes_to_ignore; + request.supports_read_at = supports_read_at; return reader.execute(request); } @@ -234,7 +238,10 @@ bool AsynchronousBoundedReadBuffer::nextImpl() pos = working_buffer.begin(); } - file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + if (!impl->supportsReadAt()) + file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + else + file_offset_of_buffer_end += bytes_to_ignore + result.size - result.offset; chassert(file_offset_of_buffer_end <= impl->getFileSize()); diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.h b/src/Disks/IO/AsynchronousBoundedReadBuffer.h index 3dc8fcc39cb..647e0b4bb40 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.h +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.h @@ -28,7 +28,8 @@ public: IAsynchronousReader & reader_, const ReadSettings & settings_, AsyncReadCountersPtr async_read_counters_ = nullptr, - FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr); + FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr, + bool enable_read_at = false); ~AsynchronousBoundedReadBuffer() override; @@ -72,6 +73,8 @@ private: AsyncReadCountersPtr async_read_counters; FilesystemReadPrefetchesLogPtr prefetches_log; + bool supports_read_at; + struct LastPrefetchInfo { std::chrono::system_clock::time_point submit_time; diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 2df087e941f..e2ac4fd1dad 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -38,6 +38,11 @@ namespace CurrentMetrics extern const Metric ThreadPoolRemoteFSReaderThreadsScheduled; } +namespace ErrorCodes +{ + extern const int ATTEMPT_TO_READ_AFTER_EOF; +} + namespace DB { @@ -77,6 +82,7 @@ std::future ThreadPoolRemoteFSReader::submit(Reques auto * fd = assert_cast(request.descriptor.get()); auto & reader = fd->getReader(); + if (!request.supports_read_at) { ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); /// `seek` have to be done before checking `isContentCached`, and `set` have to be done prior to `seek` @@ -125,38 +131,75 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b auto read_counters = fd->getReadCounters(); std::optional increment = read_counters ? std::optional(read_counters) : std::nullopt; + auto watch = std::make_unique(CLOCK_REALTIME); + IAsynchronousReader::Result read_result; + if (!request.supports_read_at) { - ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); - if (!seek_performed) { - reader.set(request.buf, request.size); - reader.seek(request.offset, SEEK_SET); + ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); + if (!seek_performed) + { + reader.set(request.buf, request.size); + reader.seek(request.offset, SEEK_SET); + } + + if (request.ignore) + { + ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); + reader.ignore(request.ignore); + } } - if (request.ignore) + watch->start(); + bool result = reader.available(); + if (!result) + result = reader.next(); + + watch->stop(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); + + if (result) { - ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); - reader.ignore(request.ignore); + chassert(reader.buffer().begin() == request.buf); + chassert(reader.buffer().end() <= request.buf + request.size); + read_result.size = reader.buffer().size(); + read_result.offset = reader.offset(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); } } - - auto watch = std::make_unique(CLOCK_REALTIME); - - bool result = reader.available(); - if (!result) - result = reader.next(); - - watch->stop(); - ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); - - IAsynchronousReader::Result read_result; - if (result) + else { - chassert(reader.buffer().begin() == request.buf); - chassert(reader.buffer().end() <= request.buf + request.size); - read_result.size = reader.buffer().size(); - read_result.offset = reader.offset(); - ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); + size_t curr_offset = request.offset; + if (request.ignore) + { + ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); + size_t bytes_to_ignore = request.ignore; + while (bytes_to_ignore) + { + size_t size = std::min(bytes_to_ignore, request.size); + size_t n = reader.readBigAt(request.buf, size, curr_offset, nullptr); + if (!n) + break; + + bytes_to_ignore -= n; + curr_offset += n; + } + + if (bytes_to_ignore) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof"); + } + + watch->start(); + size_t n = reader.readBigAt(request.buf, request.size, curr_offset, nullptr); + watch->stop(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); + + if (n) + { + read_result.size = n; + read_result.offset = 0; + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); + } } read_result.execution_watch = std::move(watch); diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index 815a7b2774e..a69342b9d3d 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -50,6 +50,7 @@ public: char * buf = nullptr; Priority priority; size_t ignore = 0; + bool supports_read_at = false; }; struct Result diff --git a/src/IO/examples/read_buffer_from_hdfs.cpp b/src/IO/examples/read_buffer_from_hdfs.cpp index 91139ad94eb..99024e31d35 100644 --- a/src/IO/examples/read_buffer_from_hdfs.cpp +++ b/src/IO/examples/read_buffer_from_hdfs.cpp @@ -1,26 +1,77 @@ #include -#include +#include +#include #include #include +#include #include #include -#include - #include +#include +#include using namespace DB; -int main() +int main(int /*argc*/, char ** argv) { - setenv("LIBHDFS3_CONF", "/path/to/hdfs-site.xml", true); /// NOLINT - String hdfs_uri = "hdfs://cluster_name"; - String hdfs_file_path = "/path/to/hdfs/file"; ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + auto shared_context = SharedContextHolder(Context::createShared()); + auto global_context = Context::createGlobal(shared_context.get()); + global_context->makeGlobalContext(); + global_context->setConfig(config); + + getIOThreadPool().initialize(100, 0, 100); + + + setenv("LIBHDFS3_CONF", "/path/to/config", true); /// NOLINT + String hdfs_uri = "hdfs://clustername"; + String hdfs_file_path = "/path/to/file"; ReadSettings read_settings; - ReadBufferFromHDFS read_buffer(hdfs_uri, hdfs_file_path, *config, read_settings, 2097152UL, false); + + auto get_read_buffer = [&](bool async, bool prefetch, bool read_at) -> ReadBufferPtr + { + auto rb = std::make_shared(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true); + + if (async) + { + std::cout << "use async" << std::endl; + if (prefetch) + std::cout << "use prefetch" << std::endl; + if (read_at) + std::cout << "use read_at" << std::endl; + + read_settings.remote_fs_prefetch = prefetch; + return std::make_shared( + getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, rb, read_at); + } + else + { + return rb; + } + }; + + auto wrap_parallel_if_needed = [&](ReadBufferPtr input, bool parallel) -> ReadBufferPtr + { + if (parallel) + { + std::cout << "use parallel" << std::endl; + return wrapInParallelReadBufferIfSupported( + *input, threadPoolCallbackRunnerUnsafe(getIOThreadPool().get(), "ParallelRead"), 4, 10 * 1024 * 1024, 1529522144); + } + else + return input; + }; + + bool async = (std::atoi(argv[1]) != 0); + bool prefetch = (std::atoi(argv[2]) != 0); + bool read_at = (std::atoi(argv[3]) != 0); + bool parallel = (std::atoi(argv[4]) != 0); + std::cout << "async: " << async << ", prefetch: " << prefetch << ", read_at: " << read_at << ", parallel: " << parallel << std::endl; + auto holder = get_read_buffer(async, prefetch, read_at); + auto rb = wrap_parallel_if_needed(holder, parallel); String download_path = "./download"; WriteBufferFromFile write_buffer(download_path); - copyData(read_buffer, write_buffer); + copyData(*rb, write_buffer); return 0; } diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp index 792c9384e10..7c6ab76e3a4 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp @@ -36,7 +36,7 @@ namespace ErrorCodes } AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS( - IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_) + IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_, bool enable_read_at) : BufferWithOwnMemory(settings_.remote_fs_buffer_size) , reader(reader_) , base_priority(settings_.priority) @@ -44,6 +44,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS( , prefetch_buffer(settings_.remote_fs_buffer_size) , read_until_position(impl->getFileSize()) , use_prefetch(settings_.remote_fs_prefetch) + , supports_read_at(enable_read_at && impl->supportsReadAt()) , log(getLogger("AsynchronousReadBufferFromHDFS")) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); @@ -73,9 +74,23 @@ std::future AsynchronousReadBufferFromHDFS::asyncRe request.offset = file_offset_of_buffer_end; request.priority = Priority{base_priority.value + priority.value}; request.ignore = 0; + request.supports_read_at = supports_read_at; return reader.submit(request); } +IAsynchronousReader::Result AsynchronousReadBufferFromHDFS::readInto(char * data, size_t size, Priority priority) +{ + IAsynchronousReader::Request request; + request.descriptor = std::make_shared(*impl, nullptr); + request.buf = data; + request.size = size; + request.offset = file_offset_of_buffer_end; + request.priority = Priority{base_priority.value + priority.value}; + request.ignore = 0; + request.supports_read_at = supports_read_at; + return reader.execute(request); +} + void AsynchronousReadBufferFromHDFS::prefetch(Priority priority) { interval_watch.restart(); @@ -115,6 +130,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() size_t size = 0; size_t bytes_read = 0; + IAsynchronousReader::Result result; if (prefetch_future.valid()) { ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads); @@ -123,7 +139,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; - auto result = prefetch_future.get(); + result = prefetch_future.get(); size = result.size; offset = result.offset; LOG_TEST(log, "Current size: {}, offset: {}", size, offset); @@ -147,7 +163,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() { ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); - auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get(); + result = readInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY); size = result.size; auto offset = result.offset; @@ -164,7 +180,11 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() } } - file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + if (!supports_read_at) + file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + else + file_offset_of_buffer_end += result.size; + prefetch_future = {}; if (use_prefetch && bytes_read) diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h index c1112157985..945ef8a9a77 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h @@ -27,7 +27,8 @@ public: AsynchronousReadBufferFromHDFS( IAsynchronousReader & reader_, const ReadSettings & settings_, - std::shared_ptr impl_); + std::shared_ptr impl_, + bool enable_read_at = false); ~AsynchronousReadBufferFromHDFS() override; @@ -57,6 +58,7 @@ private: bool hasPendingDataToRead(); std::future asyncReadInto(char * data, size_t size, Priority priority); + IAsynchronousReader::Result readInto(char * data, size_t size, Priority priority); IAsynchronousReader & reader; Priority base_priority; @@ -67,6 +69,7 @@ private: size_t file_offset_of_buffer_end = 0; std::optional read_until_position; bool use_prefetch; + bool supports_read_at; LoggerPtr log; From ea6a66a2c0d8966f6643cc8582e254a7bf7151f3 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 18 Nov 2024 15:27:22 +0800 Subject: [PATCH 3/3] fix bulding --- src/IO/examples/read_buffer_from_hdfs.cpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/IO/examples/read_buffer_from_hdfs.cpp b/src/IO/examples/read_buffer_from_hdfs.cpp index 99024e31d35..8ca042499b0 100644 --- a/src/IO/examples/read_buffer_from_hdfs.cpp +++ b/src/IO/examples/read_buffer_from_hdfs.cpp @@ -28,10 +28,11 @@ int main(int /*argc*/, char ** argv) String hdfs_file_path = "/path/to/file"; ReadSettings read_settings; - auto get_read_buffer = [&](bool async, bool prefetch, bool read_at) -> ReadBufferPtr + auto get_read_buffer = [&](bool async, bool prefetch, bool read_at) { auto rb = std::make_shared(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true); + ReadBufferPtr res; if (async) { std::cout << "use async" << std::endl; @@ -41,13 +42,13 @@ int main(int /*argc*/, char ** argv) std::cout << "use read_at" << std::endl; read_settings.remote_fs_prefetch = prefetch; - return std::make_shared( + res = std::make_shared( getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, rb, read_at); } else - { - return rb; - } + res = rb; + + return res; }; auto wrap_parallel_if_needed = [&](ReadBufferPtr input, bool parallel) -> ReadBufferPtr @@ -62,10 +63,10 @@ int main(int /*argc*/, char ** argv) return input; }; - bool async = (std::atoi(argv[1]) != 0); - bool prefetch = (std::atoi(argv[2]) != 0); - bool read_at = (std::atoi(argv[3]) != 0); - bool parallel = (std::atoi(argv[4]) != 0); + bool async = (std::stoi(std::string(argv[1])) != 0); + bool prefetch = (std::stoi(std::string(argv[2])) != 0); + bool read_at = (std::stoi(std::string(argv[3])) != 0); + bool parallel = (std::stoi(std::string(argv[4])) != 0); std::cout << "async: " << async << ", prefetch: " << prefetch << ", read_at: " << read_at << ", parallel: " << parallel << std::endl; auto holder = get_read_buffer(async, prefetch, read_at); auto rb = wrap_parallel_if_needed(holder, parallel);