diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp index 01271d5342b..7bed3d246ed 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp @@ -48,7 +48,8 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer( const ReadSettings & settings_, size_t buffer_size_, AsyncReadCountersPtr async_read_counters_, - FilesystemReadPrefetchesLogPtr prefetches_log_) + FilesystemReadPrefetchesLogPtr prefetches_log_, + bool enable_read_at) : ReadBufferFromFileBase(0, nullptr, 0) , impl(std::move(impl_)) , read_settings(settings_) @@ -59,6 +60,7 @@ AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer( , log(getLogger("AsynchronousBoundedReadBuffer")) , async_read_counters(async_read_counters_) , prefetches_log(prefetches_log_) + , supports_read_at(enable_read_at && impl->supportsReadAt()) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); } @@ -89,6 +91,7 @@ std::future AsynchronousBoundedReadBuffer::readAsyn request.offset = file_offset_of_buffer_end; request.priority = Priority{read_settings.priority.value + priority.value}; request.ignore = bytes_to_ignore; + request.supports_read_at = supports_read_at; return reader.submit(request); } @@ -100,6 +103,7 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data, request.size = size; request.offset = file_offset_of_buffer_end; request.ignore = bytes_to_ignore; + request.supports_read_at = supports_read_at; return reader.execute(request); } @@ -236,7 +240,10 @@ bool AsynchronousBoundedReadBuffer::nextImpl() pos = working_buffer.begin(); } - file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + if (!impl->supportsReadAt()) + file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + else + file_offset_of_buffer_end += bytes_to_ignore + result.size - result.offset; chassert(file_offset_of_buffer_end <= impl->getFileSize()); diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.h b/src/Disks/IO/AsynchronousBoundedReadBuffer.h index 7664cc4d386..626c75d003a 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.h +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.h @@ -29,7 +29,8 @@ public: const ReadSettings & settings_, size_t buffer_size_, AsyncReadCountersPtr async_read_counters_ = nullptr, - FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr); + FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr, + bool enable_read_at = false); ~AsynchronousBoundedReadBuffer() override; @@ -74,6 +75,8 @@ private: AsyncReadCountersPtr async_read_counters; FilesystemReadPrefetchesLogPtr prefetches_log; + bool supports_read_at; + struct LastPrefetchInfo { std::chrono::system_clock::time_point submit_time; diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 2df087e941f..e2ac4fd1dad 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -38,6 +38,11 @@ namespace CurrentMetrics extern const Metric ThreadPoolRemoteFSReaderThreadsScheduled; } +namespace ErrorCodes +{ + extern const int ATTEMPT_TO_READ_AFTER_EOF; +} + namespace DB { @@ -77,6 +82,7 @@ std::future ThreadPoolRemoteFSReader::submit(Reques auto * fd = assert_cast(request.descriptor.get()); auto & reader = fd->getReader(); + if (!request.supports_read_at) { ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); /// `seek` have to be done before checking `isContentCached`, and `set` have to be done prior to `seek` @@ -125,38 +131,75 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b auto read_counters = fd->getReadCounters(); std::optional increment = read_counters ? std::optional(read_counters) : std::nullopt; + auto watch = std::make_unique(CLOCK_REALTIME); + IAsynchronousReader::Result read_result; + if (!request.supports_read_at) { - ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); - if (!seek_performed) { - reader.set(request.buf, request.size); - reader.seek(request.offset, SEEK_SET); + ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); + if (!seek_performed) + { + reader.set(request.buf, request.size); + reader.seek(request.offset, SEEK_SET); + } + + if (request.ignore) + { + ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); + reader.ignore(request.ignore); + } } - if (request.ignore) + watch->start(); + bool result = reader.available(); + if (!result) + result = reader.next(); + + watch->stop(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); + + if (result) { - ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); - reader.ignore(request.ignore); + chassert(reader.buffer().begin() == request.buf); + chassert(reader.buffer().end() <= request.buf + request.size); + read_result.size = reader.buffer().size(); + read_result.offset = reader.offset(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); } } - - auto watch = std::make_unique(CLOCK_REALTIME); - - bool result = reader.available(); - if (!result) - result = reader.next(); - - watch->stop(); - ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); - - IAsynchronousReader::Result read_result; - if (result) + else { - chassert(reader.buffer().begin() == request.buf); - chassert(reader.buffer().end() <= request.buf + request.size); - read_result.size = reader.buffer().size(); - read_result.offset = reader.offset(); - ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); + size_t curr_offset = request.offset; + if (request.ignore) + { + ProfileEventTimeIncrement elapsed(ProfileEvents::ThreadpoolReaderPrepareMicroseconds); + size_t bytes_to_ignore = request.ignore; + while (bytes_to_ignore) + { + size_t size = std::min(bytes_to_ignore, request.size); + size_t n = reader.readBigAt(request.buf, size, curr_offset, nullptr); + if (!n) + break; + + bytes_to_ignore -= n; + curr_offset += n; + } + + if (bytes_to_ignore) + throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof"); + } + + watch->start(); + size_t n = reader.readBigAt(request.buf, request.size, curr_offset, nullptr); + watch->stop(); + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds()); + + if (n) + { + read_result.size = n; + read_result.offset = 0; + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); + } } read_result.execution_watch = std::move(watch); diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index 815a7b2774e..a69342b9d3d 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -50,6 +50,7 @@ public: char * buf = nullptr; Priority priority; size_t ignore = 0; + bool supports_read_at = false; }; struct Result diff --git a/src/IO/examples/read_buffer_from_hdfs.cpp b/src/IO/examples/read_buffer_from_hdfs.cpp index 91139ad94eb..8ca042499b0 100644 --- a/src/IO/examples/read_buffer_from_hdfs.cpp +++ b/src/IO/examples/read_buffer_from_hdfs.cpp @@ -1,26 +1,78 @@ #include -#include +#include +#include #include #include +#include #include #include -#include - #include +#include +#include using namespace DB; -int main() +int main(int /*argc*/, char ** argv) { - setenv("LIBHDFS3_CONF", "/path/to/hdfs-site.xml", true); /// NOLINT - String hdfs_uri = "hdfs://cluster_name"; - String hdfs_file_path = "/path/to/hdfs/file"; ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + auto shared_context = SharedContextHolder(Context::createShared()); + auto global_context = Context::createGlobal(shared_context.get()); + global_context->makeGlobalContext(); + global_context->setConfig(config); + + getIOThreadPool().initialize(100, 0, 100); + + + setenv("LIBHDFS3_CONF", "/path/to/config", true); /// NOLINT + String hdfs_uri = "hdfs://clustername"; + String hdfs_file_path = "/path/to/file"; ReadSettings read_settings; - ReadBufferFromHDFS read_buffer(hdfs_uri, hdfs_file_path, *config, read_settings, 2097152UL, false); + + auto get_read_buffer = [&](bool async, bool prefetch, bool read_at) + { + auto rb = std::make_shared(hdfs_uri, hdfs_file_path, *config, read_settings, 0, true); + + ReadBufferPtr res; + if (async) + { + std::cout << "use async" << std::endl; + if (prefetch) + std::cout << "use prefetch" << std::endl; + if (read_at) + std::cout << "use read_at" << std::endl; + + read_settings.remote_fs_prefetch = prefetch; + res = std::make_shared( + getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, rb, read_at); + } + else + res = rb; + + return res; + }; + + auto wrap_parallel_if_needed = [&](ReadBufferPtr input, bool parallel) -> ReadBufferPtr + { + if (parallel) + { + std::cout << "use parallel" << std::endl; + return wrapInParallelReadBufferIfSupported( + *input, threadPoolCallbackRunnerUnsafe(getIOThreadPool().get(), "ParallelRead"), 4, 10 * 1024 * 1024, 1529522144); + } + else + return input; + }; + + bool async = (std::stoi(std::string(argv[1])) != 0); + bool prefetch = (std::stoi(std::string(argv[2])) != 0); + bool read_at = (std::stoi(std::string(argv[3])) != 0); + bool parallel = (std::stoi(std::string(argv[4])) != 0); + std::cout << "async: " << async << ", prefetch: " << prefetch << ", read_at: " << read_at << ", parallel: " << parallel << std::endl; + auto holder = get_read_buffer(async, prefetch, read_at); + auto rb = wrap_parallel_if_needed(holder, parallel); String download_path = "./download"; WriteBufferFromFile write_buffer(download_path); - copyData(read_buffer, write_buffer); + copyData(*rb, write_buffer); return 0; } diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp index 5f57a4714f1..7c6ab76e3a4 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.cpp @@ -36,7 +36,7 @@ namespace ErrorCodes } AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS( - IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_) + IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr impl_, bool enable_read_at) : BufferWithOwnMemory(settings_.remote_fs_buffer_size) , reader(reader_) , base_priority(settings_.priority) @@ -44,6 +44,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS( , prefetch_buffer(settings_.remote_fs_buffer_size) , read_until_position(impl->getFileSize()) , use_prefetch(settings_.remote_fs_prefetch) + , supports_read_at(enable_read_at && impl->supportsReadAt()) , log(getLogger("AsynchronousReadBufferFromHDFS")) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); @@ -73,9 +74,23 @@ std::future AsynchronousReadBufferFromHDFS::asyncRe request.offset = file_offset_of_buffer_end; request.priority = Priority{base_priority.value + priority.value}; request.ignore = 0; + request.supports_read_at = supports_read_at; return reader.submit(request); } +IAsynchronousReader::Result AsynchronousReadBufferFromHDFS::readInto(char * data, size_t size, Priority priority) +{ + IAsynchronousReader::Request request; + request.descriptor = std::make_shared(*impl, nullptr); + request.buf = data; + request.size = size; + request.offset = file_offset_of_buffer_end; + request.priority = Priority{base_priority.value + priority.value}; + request.ignore = 0; + request.supports_read_at = supports_read_at; + return reader.execute(request); +} + void AsynchronousReadBufferFromHDFS::prefetch(Priority priority) { interval_watch.restart(); @@ -115,6 +130,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() size_t size = 0; size_t bytes_read = 0; + IAsynchronousReader::Result result; if (prefetch_future.valid()) { ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads); @@ -123,7 +139,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; - auto result = prefetch_future.get(); + result = prefetch_future.get(); size = result.size; offset = result.offset; LOG_TEST(log, "Current size: {}, offset: {}", size, offset); @@ -147,7 +163,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() { ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); - auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get(); + result = readInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY); size = result.size; auto offset = result.offset; @@ -164,7 +180,11 @@ bool AsynchronousReadBufferFromHDFS::nextImpl() } } - file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + if (!supports_read_at) + file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); + else + file_offset_of_buffer_end += result.size; + prefetch_future = {}; if (use_prefetch && bytes_read) @@ -270,6 +290,21 @@ size_t AsynchronousReadBufferFromHDFS::getFileOffsetOfBufferEnd() const return file_offset_of_buffer_end; } +bool AsynchronousReadBufferFromHDFS::supportsRightBoundedReads() const +{ + return true; +} + +void AsynchronousReadBufferFromHDFS::setReadUntilPosition(size_t position) +{ + read_until_position = position; +} + +void AsynchronousReadBufferFromHDFS::setReadUntilEnd() +{ + read_until_position = impl->getFileSize(); +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h index 9846d74453b..945ef8a9a77 100644 --- a/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h +++ b/src/Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h @@ -27,10 +27,17 @@ public: AsynchronousReadBufferFromHDFS( IAsynchronousReader & reader_, const ReadSettings & settings_, - std::shared_ptr impl_); + std::shared_ptr impl_, + bool enable_read_at = false); ~AsynchronousReadBufferFromHDFS() override; + bool supportsRightBoundedReads() const override; + + void setReadUntilPosition(size_t position) override; + + void setReadUntilEnd() override; + off_t seek(off_t offset_, int whence) override; void prefetch(Priority priority) override; @@ -51,6 +58,7 @@ private: bool hasPendingDataToRead(); std::future asyncReadInto(char * data, size_t size, Priority priority); + IAsynchronousReader::Result readInto(char * data, size_t size, Priority priority); IAsynchronousReader & reader; Priority base_priority; @@ -61,6 +69,7 @@ private: size_t file_offset_of_buffer_end = 0; std::optional read_until_position; bool use_prefetch; + bool supports_read_at; LoggerPtr log; diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp index 91cee553d33..e597c84359d 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.cpp @@ -165,6 +165,19 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemorysupportsRightBoundedReads(); +} + +void ReadBufferFromHDFS::setReadUntilPosition(size_t position) +{ + impl->setReadUntilPosition(position); +} + +void ReadBufferFromHDFS::setReadUntilEnd() +{ + impl->setReadUntilEnd(); +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h index 5c9b5d73d58..4a1d74cd628 100644 --- a/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h @@ -34,6 +34,12 @@ public: ~ReadBufferFromHDFS() override; + bool supportsRightBoundedReads() const override; + + void setReadUntilPosition(size_t position) override; + + void setReadUntilEnd() override; + bool nextImpl() override; off_t seek(off_t offset_, int whence) override;